Skip to content

Commit

Permalink
change minimal python version to 3.0 (I hope):
Browse files Browse the repository at this point in the history
 - remove typehints
 - remove f-strings
 - dict -> OrderedDict
  • Loading branch information
baskiton committed Apr 20, 2023
1 parent fb34dd4 commit d04beeb
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 56 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ To detail see the [documentation][documentation]
`setns` required execution from **ROOT**!

## Requirements
* Python 3.8+
* Linux 3.0+
* Python 3

## Installing
### Using PIP
Expand Down
4 changes: 3 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ Welcome to pysetns's documentation!
`man setns <https://man7.org/linux/man-pages/man2/setns.2.html>`_ for an introduction to ``setns`` and namespaces.

.. note::
Required ``python 3.8+``
Required:
* Linux 3.0+
* Python 3

.. warning::
Required execution from **ROOT**
Expand Down
16 changes: 8 additions & 8 deletions examples/mountpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,33 @@
import pysetns


def foo(ns: pysetns.Namespace):
path = f'/proc/{ns.target_pid}/mounts'
def foo(ns):
path = '/proc/%s/mounts' % ns.target_pid
print(pid, pysetns.get_ns_string(ns.namespaces))

if not os.path.exists('/proc'):
err_code = errno.EAGAIN if ns.namespaces & pysetns.NS_MNT else errno.ENOENT
print(f'[NS] "/proc" is not found. retry', file=sys.stderr)
print('[NS] "/proc" is not found. retry', file=sys.stderr)
return err_code
if not os.path.exists(path):
path = '/proc/self/mounts'
if not os.path.exists(path):
print(f'[NS] Path is not exist for pid={ns.target_pid}: "{path}"', file=sys.stderr)
print('[NS] Path is not exist for pid=%s: "%s"' % (ns.target_pid, path), file=sys.stderr)
return errno.ENOENT
for m in open(path).readlines():
dev, mntp, tfs, opts, freq, passno = m.split()
print(f'[NS] {mntp=}')
print('[NS] mntp=%s', mntp)


def bar(pid, namespaces):
ns = pysetns.Namespace(pid, namespaces, keep_caps=True)
ns.enter(foo, ns)
if ns.errors:
print(f'NS errors: {ns.errors}')
for nst, msg in ns.errors.items():
print('[NS] ERROR: <%s> %s' % (pysetns.get_ns_string(nst).upper(), msg), file=sys.stderr)
return ns.retry


if __name__ == '__main__':
pid = 37
pid = os.getpid()
if bar(pid, pysetns.NS_MNT | pysetns.NS_PID | pysetns.NS_USER):
bar(pid, pysetns.NS_PID | pysetns.NS_USER)
97 changes: 53 additions & 44 deletions pysetns/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import errno
import os

from typing import Callable, Union, Iterable
from collections import OrderedDict as OD

from . import ext


NS_INVALID = ext.NS_INVALID
NS_TIME = ext.CLONE_NEWTIME # time namespace (since Linux 5.8)
NS_MNT = ext.CLONE_NEWNS # mount namespace group (since Linux 3.8)
NS_CGROUP = ext.CLONE_NEWCGROUP # cgroup namespace (since Linux 4.6)
Expand All @@ -22,35 +23,37 @@
NS_NET = ext.CLONE_NEWNET # network namespace (since Linux 3.0)
NS_ALL = NS_MNT | NS_USER | NS_PID | NS_NET | NS_UTS | NS_IPC | NS_CGROUP | NS_TIME

_NS_NAMES = {
_NS_NAMES = OD((
# Order is very important here! NS_USER should always be last!
NS_CGROUP: 'cgroup',
NS_IPC: 'ipc',
NS_UTS: 'uts',
NS_NET: 'net',
NS_PID: 'pid',
NS_MNT: 'mnt',
NS_TIME: 'time',
NS_USER: 'user',
}

(NS_CGROUP, 'cgroup'),
(NS_IPC, 'ipc'),
(NS_UTS, 'uts'),
(NS_NET, 'net'),
(NS_PID, 'pid'),
(NS_MNT, 'mnt'),
(NS_TIME, 'time'),
(NS_USER, 'user'),
))
if NS_INVALID in _NS_NAMES:
_NS_NAMES.pop(NS_INVALID)

_OFLAGS = os.O_RDONLY | os.O_NONBLOCK | os.O_NOCTTY | ext.O_CLOEXEC

def get_ns_string(ns_types: int) -> str:

def get_ns_string(ns_types):
"""
Represents namespace types `ns_types` in string view
:type ns_types: int
:rtype: str
"""

return '|'.join(v for k, v in _NS_NAMES.items() if k & ns_types)


class UserNamespaceWarning(Warning):
def __init__(self, gid: int, uid: int, pid: Union[int, str]):
self.message = (f'Further work will be under the *USER* namespace '
f'with rights of the user {gid}:{uid} of pid {pid}!')
def __init__(self, gid, uid, pid):
self.message = ('Further work will be under the *USER* namespace '
'with rights of the user %s:%s of pid %s!' % (gid, uid, pid))
super(UserNamespaceWarning, self).__init__(self.message)


Expand All @@ -59,12 +62,12 @@ class Namespace:
Namespace object
"""

def __init__(self, target_pid: Union[int, str], ns_types: int = NS_ALL,
target_gid: int = None, target_uid: int = None,
do_fork: bool = False, true_user: bool = False,
keep_caps: bool = False):
def __init__(self, target_pid, ns_types=NS_ALL, target_gid=None, target_uid=None,
do_fork=False, true_user=False, keep_caps=False):
"""
:type target_pid: int | str
:param target_pid: The pid of the process whose namespace you want to access
:type ns_types: int
:param ns_types: Namespace types to be accessed. These are bitwise. :const:`NS_ALL` included all of this:
- :const:`NS_TIME` - time namespace (since Linux 5.8)
Expand All @@ -76,16 +79,21 @@ def __init__(self, target_pid: Union[int, str], ns_types: int = NS_ALL,
- :const:`NS_PID` - pid namespace (since Linux 3.8)
- :const:`NS_NET` - network namespace (since Linux 3.0)
:type target_gid: int | None
:param target_gid:
:type target_uid: int | None
:param target_uid: The GID and UID of the user you want to access in :const:`NS_USER` as.
If None, the GID and UID of the process owner will be used
:type do_fork: bool
:param do_fork: Enter into the namespace in a separate process. If `ns_types` includes :const:`NS_USER`
or :const:`NS_PID`, entering into the namespace will be done in a separate process
and `do_fork` value is ignored
:type true_user: bool
:param true_user: If False (default), entering into :const:`NS_USER` will be done by simply switching
to target GID and UID (`target_gid`, `target_uid`), otherwise through a system call,
but then returning from the namespace will not be possible and the program will need to be terminated,
and in this case the :class:`UserNamespaceWarning` exception will be raised
:type keep_caps: bool
:param keep_caps: Preserve root capabilities if you need to perform an action on behalf of a user
with administrator rights. Only relevant if `ns_types` includes :const:`NS_USER`
Expand All @@ -99,20 +107,20 @@ def __init__(self, target_pid: Union[int, str], ns_types: int = NS_ALL,
self.errors = {}
self.retry = False
self.target_pid = target_pid
proc_st = os.stat(f'/proc/{self.target_pid}')
proc_st = os.stat('/proc/%s' % self.target_pid)
self.target_gid = proc_st.st_gid if (target_gid is None) else target_gid
self.target_uid = proc_st.st_uid if (target_uid is None) else target_uid
self.true_user = true_user
self.keep_caps = keep_caps

self.parent_ns_files = {nstype: self._get_nsfd('self', name)
for nstype, name in _NS_NAMES.items()
if nstype & ns_types}
self.parent_ns_files = OD((nstype, self._get_nsfd('self', name))
for nstype, name in _NS_NAMES.items()
if nstype & ns_types)
if ns_types & NS_USER and self._disallow_user_ns(target_pid):
self._close_fds(self.parent_ns_files.pop(NS_USER))
self.target_ns_files = {nstype: self._get_nsfd(target_pid, name)
for nstype, name in _NS_NAMES.items()
if self.parent_ns_files.get(nstype, -1) != -1}
self.target_ns_files = OD((nstype, self._get_nsfd(target_pid, name))
for nstype, name in _NS_NAMES.items()
if self.parent_ns_files.get(nstype, -1) != -1)
self.namespaces = ns_types & NS_USER
for t_nstype, t_fd in self.target_ns_files.items():
if t_fd != -1:
Expand All @@ -122,7 +130,7 @@ def __init__(self, target_pid: Union[int, str], ns_types: int = NS_ALL,
self.do_fork = do_fork or self.namespaces & (NS_USER | NS_PID)
self.fork = -1

def enter(self, target: Callable, *args, **kwargs) -> None:
def enter(self, target, *args, **kwargs):
"""
Enter into **namespace** and execute `target` function with its `args` and `kwargs`.
Exiting namespaces will happen automatically. But if this needs to be done inside the `target` function,
Expand All @@ -131,6 +139,8 @@ def enter(self, target: Callable, *args, **kwargs) -> None:
in the format ``{ns_type: error}``, and if it was not the only `ns_type`, work will continue.
Errors caused by the operation of the `target` function will be ignored, so take care of them yourself.
:type target: Callable
:rtype: None
:raise: :class:`UserNamespaceWarning` on exiting when `true_user` parameter of the
:class:`Namespace` is ``True``
"""
Expand Down Expand Up @@ -179,12 +189,14 @@ def enter(self, target: Callable, *args, **kwargs) -> None:
finally:
self.exit(exitcode)

def exit(self, errcode: int = 0) -> None:
def exit(self, errcode=0):
"""
Exit from **namespace** and set the `errcode` if required. You usually don't need to call this method
yourself. If the `errcode` is set to 11 (:const:`EAGAIN`), the ``Namespace.retry`` attribute
will be set to ``True``.
:type errcode: int
:rtype: None
:raise: :class:`UserNamespaceWarning` when `true_user` parameter of the :class:`Namespace` is ``True``
"""

Expand Down Expand Up @@ -212,11 +224,11 @@ def exit(self, errcode: int = 0) -> None:
self._close_fds(fd)
self.parent_ns_files[ns] = -1
self.namespaces &= ~ns
e.strerror = f'{e.strerror} when exiting from "{_NS_NAMES[ns]}" namespace. ' \
f'Further work is impossible!'
e.strerror = '%s when exiting from "%s" namespace. ' \
'Further work is impossible!' % (e.strerror, _NS_NAMES[ns])
raise
self._close_fds(self.parent_ns_files.values())
self._close_fds(self.target_ns_files.values())
self._close_fds(*self.parent_ns_files.values())
self._close_fds(*self.target_ns_files.values())
if uerr:
self.parent_ns_files.clear()
self.target_ns_files.clear()
Expand All @@ -226,17 +238,17 @@ def exit(self, errcode: int = 0) -> None:
self.target_ns_files = dict.fromkeys(self.target_ns_files, -1)

@staticmethod
def _get_nsfd(pid: Union[int, str], ns_str: str) -> int:
def _get_nsfd(pid, ns_str):
try:
return os.open(f'/proc/{pid}/ns/{ns_str}', _OFLAGS)
return os.open('/proc/%s/ns/%s' % (pid, ns_str), _OFLAGS)
except OSError:
return -1

@staticmethod
def _disallow_user_ns(target_pid) -> bool:
def _disallow_user_ns(target_pid):
# It is not permitted to use setns(2) to reenter the caller's current user namespace
pp = f'/proc/{os.getpid()}'
tp = f'/proc/{target_pid}'
pp = '/proc/%s' % os.getpid()
tp = '/proc/%s' % target_pid
if os.path.isdir(pp) and os.path.isdir(tp):
try:
parent_ino = os.stat(os.path.join(pp, 'ns/user')).st_ino
Expand All @@ -249,12 +261,9 @@ def _disallow_user_ns(target_pid) -> bool:
raise OSError(errno.ESRCH, os.strerror(errno.ESRCH))

@staticmethod
def _close_fds(fds: Union[int, Iterable]) -> None:
if isinstance(fds, Iterable):
for fd in fds:
Namespace._close_fds(fd)
else:
def _close_fds(*fds):
for fd in fds:
try:
os.close(fds)
os.close(fd)
except OSError:
pass
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ def get_file_content(rel_path):
return open(os.path.join(_cur_dir, rel_path)).read()


ext_opts = {}
if sys.version_info >= (3, 2):
ext_opts['py_limited_api'] = True

ext = Extension(
'pysetns.ext',
sources=['src/ext.c'],
py_limited_api=True,
**ext_opts
)

add_opts = {'setup_requires': []}
Expand Down
2 changes: 1 addition & 1 deletion src/ext.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#define _GNU_SOURCE
#define Py_LIMITED_API 0x03080000f0
#define Py_LIMITED_API 3
#include <Python.h>
#include <fcntl.h>
#include <linux/sched.h>
Expand Down

0 comments on commit d04beeb

Please sign in to comment.