In [84]:
import ast

llm_generated_code = """
y = "hi";
x = 1 + y
aasd
"""
syntax = ast.parse(llm_generated_code)
syntax.body[1].value.right.id

'y'

In [83]:
# 'append',
#  'clear',
#  'copy',
#  'count',
#  'extend',
#  'index',
#  'insert',
#  'pop',
#  'remove',
#  'reverse',
#  'sort']
[attr for attr in dir(syntax.body[1].value.right) if not attr.startswith("_")]


['col_offset', 'ctx', 'end_col_offset', 'end_lineno', 'id', 'lineno']

In [99]:
# Look at the contexts in your assignment
assign = syntax.body[1]  # x = 1 + y

print("Target context:", assign.targets[0].ctx.__class__.__name__)
# print("Left operand context:", assign.value.left.ctx.__class__.__name__) 
print("Right operand context:", assign.value.right.ctx.__class__.__name__)

Target context: Store
Right operand context: Load


In [96]:
assign.targets[0].ctx

<ast.Store at 0x104e20580>

In [298]:
test_code = """
a = -x
b = x < y < z
c = not b
c = -z
print("hello")
result = f(x,y=a)
if a > b:
    print("a")
    prinasdft("b")
"""
test_syntax = ast.parse(test_code)

plus_z = test_syntax.body[6]
plus_z.body[1].value.func.id

'prinasdft'

In [296]:
[attr for attr in dir(plus_z.body[1].value.func) if not attr.startswith("_")]

['col_offset', 'ctx', 'end_col_offset', 'end_lineno', 'id', 'lineno']

In [13]:
chained_code = "result = a and a"
parse = ast.parse(chained_code)
woah = ast.parse(chained_code).body[0].value
print(ast.dump(ast.parse(parse).body[0].value, indent=2))

BoolOp(
  op=And(),
  values=[
    Name(id='a', ctx=Load()),
    Name(id='a', ctx=Load())])


In [188]:
chained_code = "result = a + b + c"
parse = ast.parse(chained_code)
woah = ast.parse(chained_code).body[0].value
print(ast.dump(ast.parse(parse).body[0].value, indent=2))

BinOp(
  left=BinOp(
    left=Name(id='a', ctx=Load()),
    op=Add(),
    right=Name(id='b', ctx=Load())),
  op=Add(),
  right=Name(id='c', ctx=Load()))


In [150]:
# Let's make sure we're working with the right AST
print("test_syntax is:", test_syntax)
print("test_syntax type:", type(test_syntax).__name__)
print("test_syntax body length:", len(test_syntax.body))

# Let's also check if there are multiple ASTs in your notebook
print("All variables in current namespace:")
import sys
current_frame = sys._getframe(0)
for name, value in current_frame.f_locals.items():
    if isinstance(value, ast.Module):
        print(f"  {name}: {value}")

test_syntax is: <ast.Module object at 0x10d9afa30>
test_syntax type: Module
test_syntax body length: 2
All variables in current namespace:


RuntimeError: dictionary changed size during iteration

In [40]:
import ast
from multiprocessing import Value 
GLOBAL_MAX = 10_000


CNT = 0

def _tick():
    global CNT
    CNT += 1
    if CNT >= GLOBAL_MAX:
        raise ValueError
        
class OperationCounter(ast.NodeTransformer):
    def __init__(self):
        self.cnt = 0

    def wrap_tick(self, node):
        # wraps node with a tick wrapper
        return ast.Subscript(
            value = ast.Tuple(
                elts = [
                    ast.Call(func = ast.Name("_tick", ctx = ast.Load()), args = [], keywords=[]),
                    node
                ],
                ctx = ast.Load()
            ),
            slice = ast.Constant(value = 1),
            ctx = ast.Load()
        )
        
    def visit_BinOp(self, node):
        new_node =  self.generic_visit(node)
        # since this is a subclass of nodeTransformer (write)
        # we must return a node, ideally the one modified by generic visit
        
        return self.wrap_tick(new_node)


code = "x = a + b * c + 5"
vars = {
    "a": 5,
    "b": 2,
    "c": 3,
    "_tick": _tick
}
tree = ast.parse(code)
counter = OperationCounter()
# call the generic visit method since it will lookup any node
# specific functions and call them

new_tree = counter.visit(tree)

print(ast.unparse(new_tree))
# x = (_tick(), (_tick(), a + (_tick(), b * c)[1])[1] + 5)[1]

# add missing elements

ast.fix_missing_locations(new_tree)
# compile to bytecode

compiled_code = compile(new_tree,"my_ast", mode="exec")
exec_code = exec(compiled_code, vars)
print(vars["x"])

x = (_tick(), (_tick(), a + (_tick(), b * c)[1])[1] + 5)[1]
16


In [112]:
tree.body, ast.Expr()

([<ast.Assign at 0x10a89aa40>], <ast.Expr at 0x109ad5810>)

In [199]:
from ast import NodeTransformer


def show_types(tree):
    for node in ast.walk(tree):
        print(type(node).__name__)
    print(ast.dump(tree, indent = 2))


loop_cnt = 0
def _tick_loops():
    global loop_cnt
    loop_cnt  += 1
    if loop_cnt > 10_000:
        raise ValueError("Please dont my family ")


src = """
while True:
    print("hi")
"""

class LoopInstruments(ast.NodeTransformer):
    def visit_While(self, node):
        new_node = self.generic_visit(node)

        # node is of while type

        instr_expr = ast.Expr(value = ast.Call(
            ast.Name(id = "_tick_loops", ctx = ast.Load()), args = [], keywords = []
        ))

        new_node.body = [instr_expr] + new_node.body

        ast.fix_missing_locations(new_node)

        return new_node


tree = ast.parse(src)

# instrument it

instrumenter = LoopInstruments()
new_tree = instrumenter.visit(tree)
# we want to insert a _tick 
show_types(new_tree)
ns = {
    "_tick_loops": _tick_loops,
}
exec(compile(new_tree, "src", 'exec'), ns)

Module
While
Constant
Expr
Expr
Call
Call
Name
Name
Constant
Load
Load
Module(
  body=[
    While(
      test=Constant(value=True),
      body=[
        Expr(
          value=Call(
            func=Name(id='_tick_loops', ctx=Load()),
            args=[],
            keywords=[])),
        Expr(
          value=Call(
            func=Name(id='print', ctx=Load()),
            args=[
              Constant(value='hi')],
            keywords=[]))],
      orelse=[])],
  type_ignores=[])
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi


ValueError: Please dont my family 

In [192]:
print(dir(globals()["__builtins__"]))



In [204]:
import time
perf = time.perf_counter()
i = 0
while i < 1000:
    i += 1
float(perf) // 1_000_000

1.0

In [167]:
src = """
def square(x):
    return x * x
"""
entry_func = "neehao"
args = (5,)


tree = ast.parse(src)

final = ast.Assign(
    targets = [
        ast.Name(id="result", ctx = ast.Store())
    ],
    value = ast.Call(
        # this needs to be a named reference to the variable 
        # that is loaded from the string contained in entry func
        func = ast.Name(id = entry_func, ctx = ast.Load()),
        # this needs to be a list ie ast.Constant
        # but we don't know if it is a constant a priori...
        # hmmm
        args = [ast.Constant(value = arg) for arg in args],
        keywords = [],
    )
)

#need to wrap final and tree together so they both run
# tree is a module
# it can contain expressions, assigns, functiondefs, etc in body
# lets combine the bodies?
new_body = tree.body + [final]
# new_body
# make a module from it
new_tree = ast.Module(new_body)
new_tree = ast.fix_missing_locations(new_tree)
new_tree.type_ignores = []
state = {}
exec(compile(new_tree, "<ast>", "exec"), state)
state['result']

NameError: name 'neehao' is not defined

In [None]:
# CNT = 0


def run_one(code):
    results = {}
    MAX = 1_000
    def _tick():
        nonlocal CNT
        CNT += 1
        if CNT > MAX:
            raise ValueError("Too many")
    CNT = 0
    state = {"_tick": _tick}
    try:
        exec(code, state)
    except Exception as e:
        print(e)
        print("hi")
        results["status"] = str(e)
    return results

src = """
def spin():2
    for i in range(10_000):
        _tick()
spin()
"""
run_one(src)


Too many
hi


{'status': 'Too many'}

In [None]:
import threading
from threading import Timer
Timer.interval()
[attr for attr in dir(Timer) if not attr.startswith("__")]

['_bootstrap',
 '_bootstrap_inner',
 '_delete',
 '_initialized',
 '_reset_internal_locks',
 '_set_ident',
 '_set_native_id',
 '_set_tstate_lock',
 '_stop',
 '_wait_for_tstate_lock',
 'cancel',
 'daemon',
 'getName',
 'ident',
 'isDaemon',
 'is_alive',
 'join',
 'name',
 'native_id',
 'run',
 'setDaemon',
 'setName',
 'start']

In [224]:
import ast

ast.dump(ast.parse("globals.__builtins__"))

"Module(body=[Expr(value=Attribute(value=Name(id='globals', ctx=Load()), attr='__builtins__', ctx=Load()))], type_ignores=[])"

In [83]:
g = {}
exec("x = 42", g, g)
g.get("x"), g.get("__builtins__"), g.keys()

(42,
 {'__name__': 'builtins',
  '__doc__': "Built-in functions, types, exceptions, and other objects.\n\nThis module provides direct access to all 'built-in'\nidentifiers of Python; for example, builtins.len is\nthe full name for the built-in function len().\n\nThis module is not normally accessed explicitly by most\napplications, but can be useful in modules that provide\nobjects with the same name as a built-in value, but in\nwhich the built-in of that name is also needed.",
  '__package__': '',
  '__loader__': _frozen_importlib.BuiltinImporter,
  '__spec__': ModuleSpec(name='builtins', loader=<class '_frozen_importlib.BuiltinImporter'>, origin='built-in'),
  '__build_class__': <function __build_class__>,
  '__import__': <function __import__(name, globals=None, locals=None, fromlist=(), level=0)>,
  'abs': <function abs(x, /)>,
  'all': <function all(iterable, /)>,
  'any': <function any(iterable, /)>,
  'ascii': <function ascii(obj, /)>,
  'bin': <function bin(number, /)>,
  'break

In [109]:
len(dir(__builtins__))

160

In [100]:
print("len(dir({})) =", len(dir({})))                    # A: attributes of a *dict* object
import builtins
print("len(dir(builtins)) =", len(dir(builtins)))        # B: attributes in the *builtins module*

# now inside exec, print the type and a count that adapts to dict vs module
g = {}
exec("import builtins\nimport types\n"
     "t = type(__builtins__)\n"
     "print('inside exec: type(__builtins__)=', t.__name__)\n"
     "print('count=', len(__builtins__) if isinstance(__builtins__, dict) else len(dir(__builtins__)))",
     g, g)


len(dir({})) = 46
len(dir(builtins)) = 160
inside exec: type(__builtins__)= dict
count= 160


In [None]:
import ast
from multiprocessing import Value 
GLOBAL_MAX = 10_000

# closure approach

def increment_tick():
    cnt = 0
    def _tick():
        cnt += 1
        if cnt >= GLOBAL_MAX:
            raise ValueError
    return _tick
        
class OperationCounter(ast.NodeTransformer):
    def __init__(self):
        self.cnt = 0

    def wrap_tick(self, node):
        # wraps node with a tick wrapper
        return ast.Subscript(
            value = ast.Tuple(
                elts = [
                    ast.Call(func = ast.Name("_tick", ctx = ast.Load()), args = [], keywords=[]),
                    node
                ],
                ctx = ast.Load()
            ),
            slice = ast.Constant(value = 1, ctx = ast.Load()),
            ctx = ast.Load()
        )
        
    def visit_BinOp(self, node):
        new_node =  self.generic_visit(node)
        # since this is a subclass of nodeTransformer (write)
        # we must return a node, ideally the one modified by generic visit
        
        return self.wrap_tick(new_node)


code = "x = a + b * c + 5"
tree = ast.parse(code)
counter = OperationCounter()
# call the generic visit method since it will lookup any node
# specific functions and call them


_tick = increment_tick()

new_module = counter.visit(tree)

print(ast.dump(ast.parse(new_module), indent = 2))

Module(
  body=[
    If(
      test=BinOp(
        left=Name(id='a', ctx=Load()),
        op=Add(),
        right=Name(id='b', ctx=Load())),
      body=[
        Expr(
          value=Call(
            func=Name(id='print', ctx=Load()),
            args=[
              Name(id='a', ctx=Load())],
            keywords=[]))],
      orelse=[])],
  type_ignores=[])


In [None]:
ast.Expr(
    value = ast.Subscript(
        # load the tuple as a value
        value=ast.Tuple(
            elts = [
                # load the function for calling
                ast.Call(func=ast.Name(id="_tick", ctx = ast.Load())),
                node
            ],
            ctx = ast.Load()
        ),
        slice = ast.Constant(value = 1),
        ctx = ast.Load()
    )
)


In [None]:
from multiprocessing import Process, Queue
import multiprocessing
Process


p = Process()
q = Queue()
q.put("a")

print([attr for attr in dir(p) if not attr.startswith("__")])

TypeError: Queue.put() missing 1 required positional argument: 'obj'

In [233]:
import inspect
print(inspect.getsource(q.put))

    def put(self, obj, block=True, timeout=None):
        if self._closed:
            raise ValueError(f"Queue {self!r} is closed")
        if not self._sem.acquire(block, timeout):
            raise Full

        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._notempty.notify()



In [235]:
q.put("a")

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__setstate__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_after_fork',
 '_buffer',
 '_close',
 '_closed',
 '_feed',
 '_finalize_close',
 '_finalize_join',
 '_ignore_epipe',
 '_joincancelled',
 '_jointhread',
 '_maxsize',
 '_notempty',
 '_on_queue_feeder_error',
 '_opid',
 '_poll',
 '_reader',
 '_recv_bytes',
 '_reset',
 '_rlock',
 '_sem',
 '_send_bytes',
 '_start_thread',
 '_thread',
 '_wlock',
 '_writer',
 'cancel_join_thread',
 'close',
 'empty',
 'full',
 'get',
 'get_nowait',
 'join_thread',
 'put',
 'put_nowait',
 'qsize']

In [18]:

from concurrent.futures import ProcessPoolExecutor, as_completed
from pdb import run

global def run_a_func():
    print("hi")
    return "a"

with ProcessPoolExecutor(max_workers = 5) as pool:
    fut_to_index = {}
    for i in range(5):
        fut_to_index[pool.submit(run_a_func)] = i

res = [{}] * 5 

# this will keep iterating / hanging until they are done!
for fut in as_completed(fut_to_index.keys()):
    result = fut.result()
    idx = fut_to_index[fut]
    res[idx] = {"result": result}

print({"results": res})


SyntaxError: invalid syntax (1455220334.py, line 4)

In [15]:
res = [{}] * 5 
res

[{}, {}, {}, {}, {}]