In [None]:
#| default_exp core

# safepython

In [None]:
#| export
from fastcore.utils import *
from fastcore.xtras import asdict
from fastcore.xdg import xdg_config_home
from inspect import currentframe,Parameter,signature
from contextvars import ContextVar

import json,importlib,linecache,re,inspect,uuid,ast,warnings,collections,time,asyncio,urllib.parse,dataclasses,shlex,urllib
import zlib,unicodedata,binascii,enum,secrets,pickle,contextlib,types,keyword,httpx
import heapq, bisect, html, struct, decimal, fractions, pprint, fnmatch, base64
import random, statistics, difflib, csv, string, textwrap, hashlib, copy, datetime as dt_mod
import xml.etree.ElementTree as ET,ipaddress,colorsys,cmath,traceback,sys,shutil
from datetime import datetime
from urllib.parse import quote,unquote,urlencode
from io import StringIO,BytesIO
from collections import Counter,deque

In [None]:
#| export
from fastcore.imports import __llmtools__
from RestrictedPython import utility_builtins, safe_builtins,limited_builtins
from RestrictedPython.transformer import RestrictingNodeTransformer, INSPECT_ATTRIBUTES, copy_locations
from restrictedpython_async import *

## Helpers and setup

In [None]:
#| export
# ContextVar fallback for when stack walking fails (e.g. inside asyncio.gather)
_rp_globals = ContextVar('_rp_globals', default=None)

def _find_frame_dict(sentinel:str):
    "Find the globals dict containing sentinel, or calling frame's globals if no sentinel"
    frame = currentframe().f_back.f_back
    if not sentinel: return frame.f_globals
    while frame:
        if sentinel in frame.f_globals: return frame.f_globals
        frame = frame.f_back
    # Fall back to RunPython globals stored in ContextVar (e.g. from asyncio.gather)
    rpg = _rp_globals.get()
    if rpg and sentinel in rpg: return rpg
    return globals()


`_find_frame_dict` walks the call stack looking for a frame whose globals contain `sentinel`. This lets `RunPython` find the caller's namespace without requiring an explicit globals dict. If no sentinel is found, it falls back to its own module globals.

In [None]:
_test_sentinel = True
d = _find_frame_dict('_test_sentinel')
assert '_test_sentinel' in d
d2 = _find_frame_dict('nonexistent_sentinel_xyz')
assert d2 is not None

In [None]:
#| export
def find_var(var:str):
    "Search for var in all frames of the call stack"
    return _find_frame_dict(var)[var]

In [None]:
find_var('_test_sentinel')

True

In [None]:
#| export
__pytools__ = {'pyrun'}

def allow(*c):
    for o in c:
        if isinstance(o, dict):
            __pytools__.update({k.__name__ for k in o})
            __pytools__.update({f'{k.__name__}.{m}' for k,v in o.items() for m in v})
        else: __pytools__.add(o)

`__pytools__` is the set of callable names that the sandbox allows. `allow` registers new entries — it accepts bare strings (for module-qualified names like `'numpy.array'`) or dicts mapping a class/module to a list of method names (which generates `'ClassName.method'` keys).

In [None]:
assert 'pyrun' in __pytools__
allow('my_test_func')
assert 'my_test_func' in __pytools__
allow({str: ['zfill']})
assert 'str.zfill' in __pytools__
__pytools__.discard('my_test_func')
assert 'str.zfill' in __pytools__

## Write policies

In [None]:
#| export
def chk_dest(p, ok_dests):
    resolved = str(Path(p).resolve())
    if not any(resolved == (rd := str(Path(d).resolve())) or resolved.startswith(rd + '/') for d in ok_dests):
        raise PermissionError(f"Write to '{p}' not allowed; permitted: {ok_dests}")

`chk_dest` resolves a path and verifies it falls under one of the allowed destination prefixes. Raises `PermissionError` if not. Used by all `WritePolicy` subclasses.

In [None]:
chk_dest('/tmp/foo.txt', ['/tmp'])
try: chk_dest('/etc/passwd', ['/tmp'])
except PermissionError: print("Correctly blocked /etc/passwd")

Correctly blocked /etc/passwd


In [None]:
#| export
class WritePolicy:
    "Base for write destination policies"
    def check(self, obj, args, kwargs, ok_dests): raise NotImplementedError

class PosWritePolicy(WritePolicy):
    "Check positional/keyword arg is an allowed write destination"
    def __init__(self, pos=0, kw=None): store_attr()
    def check(self, obj, args, kwargs, ok_dests):
        p = kwargs.get(self.kw) if self.kw and self.kw in kwargs else args[self.pos] if self.pos < len(args) else None
        if p is not None: chk_dest(p, ok_dests)

class PathWritePolicy(WritePolicy):
    "Check resolved Path self, optionally also target args"
    def __init__(self, target_pos=None, target_kw=None): store_attr()
    def check(self, obj, args, kwargs, ok_dests):
        chk_dest(obj, ok_dests)
        if self.target_pos is not None and self.target_pos < len(args): chk_dest(args[self.target_pos], ok_dests)
        if self.target_kw and self.target_kw in kwargs: chk_dest(kwargs[self.target_kw], ok_dests)

class OpenWritePolicy(WritePolicy):
    "Check open() only when mode is writable"
    def check(self, obj, args, kwargs, ok_dests):
        mode = kwargs.get('mode', args[1] if len(args) > 1 else 'r')
        if any(c in mode for c in 'wax+'): chk_dest(args[0] if args else kwargs.get('file'), ok_dests)

Three `WritePolicy` subclasses handle different write-checking patterns.

In [None]:
pp = PosWritePolicy(1, 'dst')
pp.check(None, ['src', '/tmp/ok'], {}, ['/tmp'])
try: pp.check(None, ['src', '/root/bad'], {}, ['/tmp'])
except PermissionError: print("PosWritePolicy blocked /root/bad")

pwp = PathWritePolicy()
pwp.check(Path('/tmp/f.txt'), [], {}, ['/tmp'])
try: pwp.check(Path('/etc/f.txt'), [], {}, ['/tmp'])
except PermissionError: print("PathWritePolicy blocked /etc/f.txt")

owp = OpenWritePolicy()
owp.check(None, ['/tmp/f.txt', 'w'], {}, ['/tmp'])
owp.check(None, ['/etc/passwd', 'r'], {}, ['/tmp'])
try: owp.check(None, ['/root/f.txt', 'w'], {}, ['/tmp'])
except PermissionError: print("OpenWritePolicy blocked write to /root/f.txt")

PosWritePolicy blocked /root/bad
PathWritePolicy blocked /etc/f.txt
OpenWritePolicy blocked write to /root/f.txt


In [None]:
#| export
__pytools_write__ = {}

def allow_write(policies):
    "Register write policies for method/function names"
    __pytools_write__.update(policies)

`__pytools_write__` maps qualified callable names (like `'Path.write_text'`) to `WritePolicy` objects. `allow_write` registers these policies. When `ok_dests` is set, `_safe_getattr` checks this registry and wraps matching callables with `_WriteChecked` to enforce destination validation before the call.

In [None]:
allow_write({'test.Method': WritePolicy()})
assert 'test.Method' in __pytools_write__
del __pytools_write__['test.Method']

In [None]:
#| export
class _WriteChecked:
    "Wrap a method to enforce its WritePolicy before calling"
    def __init__(self, obj, method, policy, ok_dests): self.obj,self.method,self.policy,self.ok_dests = obj,method,policy,ok_dests
    def __call__(self, *args, **kwargs):
        self.policy.check(self.obj, args, kwargs, self.ok_dests)
        return self.method(*args, **kwargs)

`_WriteChecked` wraps a method so that its `WritePolicy` is enforced before the actual call. Returned by `_safe_getattr` when a callable matches a `__pytools_write__` entry and `ok_dests` is set.

In [None]:
wc = _WriteChecked(Path('/tmp'), Path.exists, PathWritePolicy(), ['/tmp'])
assert callable(wc)
wc2 = _WriteChecked(Path('/etc'), Path('/etc').exists, PathWritePolicy(), ['/tmp'])
try: wc2()
except PermissionError: print("WriteChecked correctly blocked /etc")

WriteChecked correctly blocked /etc


In [None]:
#| export
_open_policy = OpenWritePolicy()

def _safe_open(ok_dests):
    def _open(*args, **kwargs):
        _open_policy.check(None, args, kwargs, ok_dests)
        return open(*args, **kwargs)
    return _open

`_safe_open` returns a closure that checks `OpenWritePolicy` before delegating to the real `open`. Only injected into the sandbox builtins when `ok_dests` is set — otherwise the default `open` (which is already excluded from `all_builtins`) is not available.

In [None]:
so = _safe_open(['/tmp'])
f = so('/tmp/test_safe_open.txt', 'w')
f.write('test'); f.close()
so('/etc/passwd', 'r').close()
try: so('/etc/bad.txt', 'w')
except PermissionError: print("_safe_open correctly blocked write to /etc")

_safe_open correctly blocked write to /etc


## Builtins and wrappers

In [None]:
#| export
all_builtins = safe_builtins | utility_builtins | limited_builtins | async_builtins | dict(
    dict=dict, list=list, set=set, tuple=tuple, frozenset=frozenset,
    __import__=__import__
)

`all_builtins` merges RestrictedPython's `safe_builtins`, `utility_builtins`, `limited_builtins`, and async support, then adds the core container types (`dict`, `list`, `set`, `tuple`, `frozenset`) and `__import__`. This is the builtins dict passed to the sandbox — anything not in here is inaccessible as a builtin.

In [None]:
assert all_builtins['dict'] is dict
assert all_builtins['list'] is list
assert '__import__' in all_builtins
assert 'eval' not in all_builtins
assert 'exec' not in all_builtins
list(all_builtins.keys())[:5]

['__build_class__', 'None', 'False', 'True', 'abs']

In [None]:
#| export
def _make_safe_getattr(ok_dests=None):
    def _safe_getattr(obj, name):
        val = getattr(obj, name)
        if callable(val):
            keys = [f"{cls.__name__}.{name}" for cls in type(obj).__mro__]
            keys += [f"{cls.__module__}.{cls.__qualname__}.{name}" for cls in type(obj).__mro__ if hasattr(cls, '__module__')]
            obj_name = getattr(obj, '__name__', None)
            if obj_name: keys.append(f"{obj_name}.{name}")
            if ok_dests is not None:
                for k in keys:
                    if k in __pytools_write__: return _WriteChecked(obj, val, __pytools_write__[k], ok_dests)
            if not any(k in (__llmtools__|__pytools__) for k in keys): raise AttributeError(f"Cannot access callable: {name}")
        return val
    return _safe_getattr

`_make_safe_getattr` returns a closure over `ok_dests` that intercepts every attribute access. For callables, it checks `__pytools_write__` first (wrapping with `_WriteChecked` if matched), then falls back to the `__llmtools__|__pytools__` allow-set. Non-callables pass through unchecked.

In [None]:
ga = _make_safe_getattr(ok_dests=['/tmp'])
assert ga('hello', 'zfill')(10) == '00000hello'

In [None]:
#| export
class _DirectPrint:
    def __init__(self, *a, **kw): pass
    def _call_print(self, *a, **kw): print(*a, **kw)
    def __call__(self, *a, **kw): print(*a, **kw)

`_DirectPrint` is a no-op wrapper that RestrictedPython's `_print_` and `_print` hooks delegate to. It simply calls the real `print`, bypassing RestrictedPython's default print interception.

In [None]:
#| export
class _Uncallable:
    def __init__(self, o, name):
        functools.update_wrapper(self, o)
        self._o,self._name = o,name
    def __call__(self, *a, **kw): raise PermissionError(f"Calling `{self._name}` is not permitted")
    def __getattr__(self, name): return getattr(self._o, name)
    def __repr__(self): return repr(self._o)

def _callable_ok(k, v, _ok):
    if k.endswith('_') or k in _ok: return True
    mod,qn = getattr(v, '__module__', None), getattr(v, '__qualname__', None)
    return bool(mod and qn and f"{mod}.{qn}" in _ok)

`_Uncallable` wraps a callable to raise `PermissionError` on call, while still exposing its non-callable attributes (like `__name__`). This lets the sandbox expose objects for inspection without letting users invoke them.

`_callable_ok` checks whether a callable should be allowed — it's ok if its name ends with `_` (user-exported), is in the allow-set directly, or its `module.qualname` is registered.

In [None]:
uc = _Uncallable(len, 'len')
assert repr(uc) == repr(len)
try: uc([1,2,3])
except PermissionError: print("_Uncallable correctly blocked call to len")

_ok = {'test_func_'}
assert _callable_ok('test_func_', lambda: None, _ok)
assert not _callable_ok('secret', lambda: None, _ok)
assert not _callable_ok('_private', lambda: None, _ok)

_Uncallable correctly blocked call to len


In [None]:
#| export
ALLOWED_DUNDERS = {'__name__', '__module__', '__doc__', '__qualname__', '__file__'}

class SafeTransformer(RestrictingNodeTransformer):
    def visit_Attribute(self, node):
        if node.attr.startswith('_') and node.attr != '_' and node.attr not in ALLOWED_DUNDERS:
            self.error(node, f'"{node.attr}" is an invalid attribute name because it starts with "_".')
        if node.attr.endswith('__roles__'):
            self.error(node, f'"{node.attr}" is an invalid attribute name because it ends with "__roles__".')
        if node.attr in INSPECT_ATTRIBUTES:
            self.error(node, f'"{node.attr}" is a restricted name, that is forbidden to access in RestrictedPython.')
        if isinstance(node.ctx, ast.Load):
            node = self.node_contents_visit(node)
            new_node = ast.Call(func=ast.Name('_getattr_', ast.Load()), args=[node.value, ast.Constant(node.attr)], keywords=[])
            copy_locations(new_node, node)
            return new_node
        elif isinstance(node.ctx, (ast.Store, ast.Del)):
            node = self.node_contents_visit(node)
            new_value = ast.Call(func=ast.Name('_write_', ast.Load()), args=[node.value], keywords=[])
            copy_locations(new_value, node.value)
            node.value = new_value
            return node
        else: raise NotImplementedError(f"Unknown ctx type: {type(node.ctx)}")

`SafeTransformer` extends RestrictedPython's `RestrictingNodeTransformer` to rewrite attribute access. Loads become `_getattr_(obj, name)` calls (enabling callable checks), stores/deletes become `_write_(obj).attr = val` (enabling mutation control). Private attrs (starting with `_`) are blocked except for a curated `ALLOWED_DUNDERS` set.

## Main implementation

In [None]:
#| export
async def _run_python(code:str, g=None, ok_dests=None, concise=True):
    _rp_globals.set(g)
    _ok = __llmtools__|__pytools__
    tools = {k:(v if not callable(v) or _callable_ok(k,v,_ok) else _Uncallable(v,k))
        for k,v in g.items() if not k.startswith('_')}
    def unpack(a,*args): return list(a)
    builtins = dict(all_builtins)
    if ok_dests is not None:
        safe_open = _safe_open(ok_dests)
        builtins['open'] = safe_open
    rg = dict(__builtins__=builtins, _getattr_=_make_safe_getattr(ok_dests),
              _getitem_=lambda o,k: o[k], _getiter_=iter, _apply_ = lambda f, *a, **kw: f(*a, **kw),
              _print_=_DirectPrint, _print=_DirectPrint(),
              _unpack_sequence_=unpack, _iter_unpack_sequence_=unpack,
              enumerate=enumerate, sorted=sorted, reversed=reversed, max=max, min=min, **tools)
    if ok_dests is not None: rg['open'] = safe_open
    loc,errs = {},[]
    sout, serr = StringIO(), StringIO()
    async def run(src, is_exec=True):
        try:
            comp = compile_restricted(src, '<tool>', 'exec' if is_exec else 'eval', policy=SafeTransformer)
            res = eval(comp, rg, loc)
            if inspect.iscoroutine(res): res = await res
            return res
        except SyntaxError as e: errs.append(f'SyntaxError: {e}')
        except NameError as e: errs.append(f'`{e.name}` is not available in this sandbox; ask the user to add it to the available tools')
    def _export(): g.update({k:v for k,v in loc.items() if k.endswith('_') and not k.startswith('_')})
    def _result(res=None):
        _export()
        d = {}
        if (out := sout.getvalue()): d['stdout'] = out
        if (err := serr.getvalue()): d['stderr'] = err
        if errs: d['errors'] = '\n'.join(errs)
        if res is not None: d['result'] = res
        if concise and len(d)==1: # only one part
            if 'stdout' in d: return d['stdout']
            if 'result' in d: return d['result']
        return d or None
    tree = ast.parse(code)
    with contextlib.redirect_stdout(sout), contextlib.redirect_stderr(serr), warnings.catch_warnings():
        warnings.filterwarnings('ignore', category=SyntaxWarning)
        if tree.body and isinstance(tree.body[-1], ast.Expr):
            last = tree.body.pop()
            if tree.body:
                await run(ast.unparse(ast.Module(tree.body, [])))
                if errs: return _result()
            res = await run(ast.unparse(ast.Expression(last.value)), False)
            return _result(res)
        await run(code)
        return _result()

`_run_python` is the core sandbox executor. It compiles code with `SafeTransformer`, sets up the restricted globals (builtins, getattr hook, tools), handles the last-expression-as-return-value pattern, captures stdout/stderr, and exports `_`-suffixed locals back to the caller's namespace.

In [None]:
#| export
class RunPython:
    def __init__(self, g=None, sentinel=None, ok_dests=None):
        self.g, self.ok_dests, self.sentinel = g, ok_dests, sentinel

    @property
    def __doc__(self):
        tools = ', '.join(sorted(__llmtools__|__pytools__))
        return f"""Execute restricted Python with access to LLM tools, returning dict of last expression, stdout, stderr, and errors.
            If `concise`, then if just 'stdout' or 'result' returned, return only that without creating a dict.
            `import` works in the usual way. All non-callable globals and non-callable attrs are usable.
            Callable globals are also usable if their name ends with `_` (but not `_`-prefixed).
            - This is an easy way for users to expose extra functions: `def my_helper_(...)`
            Callable object attrs are only accessible if `ClassName.method` is registered as a tool.
            Multiline code blocks can be used, including defining functions and variables, for use within the call.
            In addition most builtins are available, plus these symbols: {tools}

            **NB**: If `code` creates symbols that end with `_`, they will be exported by to the calling namespace.
            - This is how you can use symbols that either human or AI can use again later.
            Examples: `len([1,2,3])` (builtin); `add_msg(content="hi")` (tool); `df.shape` (non-callable attr);
            `[x**2 for x in range(5)]` (last expression returned); `sorted(my_dict.items())` (builtin + non-callable attr)"""

    async def __call__(self, code:str, concise:bool=True):
        if not self.g: self.g = _find_frame_dict(self.sentinel)
        return await _run_python(code, g=self.g or globals(), ok_dests=self.ok_dests, concise=concise)

`RunPython` is the public API. It captures the caller's globals via `_find_frame_dict`, optionally takes `ok_dests` for write-checking, and generates its docstring dynamically from the current `__llmtools__|__pytools__` set so the LLM always sees an up-to-date tool list.

In [None]:
pyrun = RunPython()

In [None]:
await pyrun('[]')

[]

In [None]:
await pyrun("print('tt')")

'tt\n'

In [None]:
await pyrun("print('tt')", concise=False)

{'stdout': 'tt\n'}

In [None]:
# Unpacking is allowed
await pyrun("""
a = [1,2,3]
print(*a)
""")

'1 2 3\n'

In [None]:
def f(): warnings.warn('a warning')
allow('f')
await pyrun('print("asdf"); f(); 1+1')

{'stdout': 'asdf\n',
 'result': 2}

## Standard allows

In [None]:
#| export
def safe_type(o:object):
    "Same as `type(o)`"
    return type(o)

In [None]:
#| export
_io_meths = ['getvalue', 'read', 'write', 'seek']

In [None]:
#| export
from io import TextIOWrapper,BufferedWriter,BufferedRandom,FileIO

In [None]:
#| export
_file_meths = ['read','readline','readlines','write','writelines','seek','tell','close','flush']
allow({TextIOWrapper: _file_meths, BufferedWriter: _file_meths, BufferedRandom: _file_meths, FileIO: _file_meths})

In [None]:
#| export
allow({
    re: ['search', 'findall', 'sub', 'match', 'compile', 'split', 'escape', 'fullmatch', 'subn'],
    json: ['loads', 'dumps', 'load'],
    math: ['sqrt', 'floor', 'ceil', 'log', 'log2', 'log10', 'gcd', 'isnan', 'isinf',
        'exp', 'sin', 'cos', 'tan', 'atan2', 'radians', 'degrees', 'factorial', 'comb', 'perm', 'prod', 'isclose',
        'fsum', 'hypot', 'isfinite', 'copysign'],
    collections: ['Counter', 'defaultdict', 'deque', 'namedtuple', 'OrderedDict', 'ChainMap'],
    tuple: ['index', 'count'],
    float: ['is_integer', 'fromhex'],
    Counter: ['most_common'],
    dict: ['keys', 'values', 'items', 'get', 'update', 'pop', 'setdefault', 'copy'],
    list: ['append', 'copy', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort', 'count'],
    set: ['add', 'discard', 'intersection', 'union', 'difference', 'update',
        'symmetric_difference', 'issubset', 'issuperset', 'copy', 'pop', 'remove'],
    str: ['split', 'join', 'replace', 'strip', 'lstrip', 'rstrip', 'startswith', 'endswith', 'lower', 'upper',
        'find', 'count', 'format', 'isdigit', 'isalpha', 'title', 'encode', 'splitlines', 'removeprefix', 'removesuffix',
        'zfill', 'center', 'ljust', 'rjust', 'maketrans', 'translate', 'casefold', 'partition', 'rpartition'],
    bytes: ['decode', 'fromhex', 'hex'],
    int: ['to_bytes', 'from_bytes', 'bit_length'],
    Path: ['read_text', 'glob', 'iterdir', 'exists', 'read_bytes', 'is_file', 'is_dir', 'stat', 'resolve',
        'with_suffix', 'with_name', 'relative_to', 'match', 'joinpath'],
    asyncio: ['gather'], copy: ['deepcopy'], httpx: ['get', 'options'],
    itertools: ['chain', 'islice', 'groupby', 'product', 'permutations', 'combinations', 'accumulate', 'starmap', 'zip_longest',
        'pairwise', 'takewhile', 'dropwhile', 'filterfalse', 'compress', 'count', 'repeat', 'cycle', 'tee', 'batched'],
    functools: ['reduce', 'partial', 'lru_cache', 'cache', 'wraps', 'cmp_to_key', 'total_ordering'],
    textwrap: ['dedent', 'indent', 'wrap', 'shorten', 'fill'],
    datetime: ['now', 'fromisoformat', 'strftime', 'strptime', 'isoformat'],
    dt_mod: ['timedelta', 'date', 'time', 'timezone'],
    operator: ['itemgetter', 'attrgetter', 'add', 'mul', 'sub', 'truediv', 'neg', 'contains',
        'getitem', 'mod', 'eq', 'ne', 'lt', 'gt', 'or_', 'and_', 'not_', 'pow', 'floordiv', 'xor'],
    frozenset: ['intersection', 'union', 'difference', 'symmetric_difference', 'issubset', 'issuperset', 'copy'],
    StringIO: _io_meths, BytesIO: _io_meths,
    }, 'urlencode', 'quote', 'unquote', 'string', 'safe_type'
)

In [None]:
#| export
allow({
    os.path: ['join', 'basename', 'dirname', 'splitext', 'exists', 'isfile', 'isdir', 'abspath',
        'relpath', 'expanduser', 'normpath'],
    base64: ['b64encode', 'b64decode', 'urlsafe_b64encode', 'urlsafe_b64decode'],
    hashlib: ['md5', 'sha256'],
    random: ['choice', 'randint', 'sample', 'shuffle', 'uniform', 'random'],
    statistics: ['mean', 'median', 'stdev'],
    difflib: ['unified_diff', 'ndiff'],
    csv: ['reader', 'DictReader'],
    heapq: ['nlargest', 'nsmallest', 'heappush', 'heappop'],
    bisect: ['bisect_left', 'bisect_right', 'insort'],
    html: ['escape', 'unescape'],
    struct: ['pack', 'unpack'],
    fnmatch: ['fnmatch', 'filter'],
    time: ['time', 'perf_counter'],
    urllib.parse: ['urlparse', 'parse_qs', 'parse_qsl', 'urlunparse', 'urljoin', 'quote_plus', 'unquote_plus'],
    dataclasses: ['dataclass', 'field', 'asdict', 'fields', 'replace', 'is_dataclass'],
    shlex: ['split', 'quote'],
    zlib: ['compress', 'decompress', 'crc32'],
    unicodedata: ['name', 'lookup', 'category', 'normalize'],
    binascii: ['hexlify', 'unhexlify'],
    enum: ['Enum', 'IntEnum'],
    secrets: ['token_hex', 'token_urlsafe'],
    deque: ['appendleft', 'popleft', 'rotate', 'extendleft'],
    ast: ['literal_eval', 'parse', 'dump', 'walk', 'unparse'],
    pickle: ['loads', 'dumps'],
    contextlib: ['suppress', 'contextmanager'],
    inspect: ['getsource', 'getsourcefile', 'getsourcelines', 'getmodule', 'getdoc', 'getmembers',
        'signature', 'isclass', 'isfunction', 'ismethod', 'ismodule', 'getfile'],
    keyword: ['iskeyword', 'kwlist'],
    ET: ['fromstring', 'tostring'],
    ET.Element: ['findall', 'find', 'get', 'iter'],
    ipaddress: ['ip_address', 'ip_network'],
    colorsys: ['rgb_to_hsv', 'hsv_to_rgb', 'rgb_to_hls'],
    cmath: ['phase', 'polar', 'rect', 'sqrt'],
    decimal: ['Decimal'], fractions: ['Fraction'],
    uuid: ['uuid4'], pprint: ['pformat'], types: ['SimpleNamespace'],
    traceback: ['format_exc'], sys: ['getsizeof'], warnings: ['warn'],
})

In [None]:
#| export
_path_wp = PathWritePolicy()
_dst1 = PosWritePolicy(1, 'dst')

allow_write({
    'Path.write_text': _path_wp, 'Path.write_bytes': _path_wp, 'Path.mkdir': _path_wp, 'Path.touch': _path_wp,
    'Path.unlink': _path_wp, 'Path.rmdir': _path_wp, 'Path.chmod': _path_wp, 'Path.symlink_to': _path_wp, 'Path.hardlink_to': _path_wp,
    'Path.rename': PathWritePolicy(target_pos=0, target_kw='target'),
    'Path.replace': PathWritePolicy(target_pos=0, target_kw='target'),
    'shutil.copy': _dst1, 'shutil.copy2': _dst1, 'shutil.copytree': _dst1, 'shutil.move': _dst1,
    'shutil.rmtree': PosWritePolicy(0, 'path'),
})

## Config

`safepyrun` loads an optional user config from `{xdg_config_home}/safepyrun/config.py` at import time, after all defaults are registered. This lets users permanently extend the sandbox allowlists without modifying the package. The config file is executed with all `safepyrun.core` globals already available — no imports needed. This includes `allow`, `allow_write`, `WritePolicy`, `PathWritePolicy`, `PosWritePolicy`, `OpenWritePolicy`, and all standard library modules already imported by the module.

Example `~/.config/safepyrun/config.py` (Linux) or `~/Library/Application Support/safepyrun/config.py` (macOS):

```python
# Add pandas tools
allow({pandas.DataFrame: ['head', 'describe', 'info', 'shape']})

# Allow writing to ~/data
allow_write({'Path.write_text': PathWritePolicy()})
```

If the config file has errors, a warning is emitted and the defaults remain intact.

In [None]:
#| export
_cfg_py = xdg_config_home() / 'safepyrun' / 'config.py'
if _cfg_py.exists():
    try: exec(_cfg_py.read_text(), {k:v for k,v in globals().items() if not k.startswith('_')})
    except Exception as e: warnings.warn(f"Failed to load {_cfg_py}: {e}")

## Examples

In [None]:
await pyrun('''
a = {"b":1}
list(a.items())
''')

[('b', 1)]

In [None]:
await pyrun('Path().exists()')

True

In [None]:
await pyrun("os.path.join('/foo', 'bar', 'baz.py')")

'/foo/bar/baz.py'

In [None]:
await pyrun('a_=3')
a_

3

In [None]:
await pyrun('''aa_='33' ''')
await pyrun('''len(aa_) ''')

2

In [None]:
def g(): ...

In [None]:
await pyrun('inspect.getsource(g)')

'def g(): ...\n'

In [None]:
try: await pyrun('g()')
except PermissionError: print("Correct exception raised")
else: raise Exception("No exception")

Correct exception raised


In [None]:
await pyrun('re.compile("a")')

re.compile(r'a', re.UNICODE)

In [None]:
from re import compile

In [None]:
await pyrun('compile("a")')

re.compile(r'a', re.UNICODE)

In [None]:
await pyrun('''
dict(a=safe_type(1))
''')

{'a': int}

In [None]:
await pyrun("""
async def agen():
    for x in [1,2]: yield x
res = []
async for x in agen(): res.append(x)
res
""")

[1, 2]

In [None]:
await pyrun('''
import asyncio
async def fetch(n): return n * 10
print(string.ascii_letters)
await asyncio.gather(fetch(1), fetch(2), fetch(3))
''')

{'stdout': 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\n',
 'result': [10, 20, 30]}

In [None]:
import numpy as np

In [None]:
allow('numpy.array', 'numpy.ndarray.sum')
await pyrun('import numpy as np; np.array([1,2,3]).sum()')

6

### Write policy examples

In [None]:
pyrun2 = RunPython(ok_dests=['/tmp'])

In [None]:
await pyrun2("Path('/tmp/test_write.txt').write_text('hello')")

5

In [None]:
try: await pyrun2("Path('/etc/evil.txt').write_text('bad')")
except PermissionError as e: print(f'Blocked: {e}')

Blocked: Write to '/etc/evil.txt' not allowed; permitted: ['/tmp']


In [None]:
await pyrun2("open('/tmp/test_open.txt', 'w').write('hi')")

2

In [None]:
try: await pyrun2("open('/root/bad.txt', 'w')")
except PermissionError as e: print(f'Blocked: {e}')

Blocked: Write to '/root/bad.txt' not allowed; permitted: ['/tmp']


In [None]:
await pyrun2("open('/etc/passwd', 'r').read(10)")

'##\n# User '

In [None]:
await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/tmp/test_copy.txt')")

'/tmp/test_copy.txt'

In [None]:
try: await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/root/bad.txt')")
except PermissionError as e: print(f'Blocked: {e}')

Blocked: Write to '/root/bad.txt' not allowed; permitted: ['/tmp']


In [None]:
try: await pyrun("Path('/tmp/test.txt').write_text('nope')")
except AttributeError as e: print(f'No ok_dests: {e}')

No ok_dests: Cannot access callable: write_text


In [None]:
pyrun_cwd = RunPython(ok_dests=['.'])

# Writing to cwd should work
await pyrun_cwd("Path('test_cwd_ok.txt').write_text('hello')")

5

In [None]:
Path('test_cwd_ok.txt').unlink(missing_ok=True)

In [None]:
# Writing to /tmp should be blocked (not in ok_dests)
try: await pyrun_cwd("Path('/tmp/nope.txt').write_text('bad')")
except PermissionError: print("Blocked /tmp as expected")

Blocked /tmp as expected


In [None]:
# Parent traversal should be blocked
try: await pyrun_cwd("Path('../escape.txt').write_text('bad')")
except PermissionError: print("Blocked ../ as expected")

Blocked ../ as expected


In [None]:
# Sneaky traversal via subdir/../../ should also be blocked
try: await pyrun_cwd("Path('subdir/../../escape.txt').write_text('bad')")
except PermissionError: print("Blocked subdir/../../ as expected")

Blocked subdir/../../ as expected


In [None]:
try: await pyrun_cwd("open('../bad_open.txt', 'w')")
except PermissionError: print("Blocked open ../ as expected")

Blocked open ../ as expected


## export -

In [None]:
#| hide
from nbdev import nbdev_export
nbdev_export()