In [726]:
import ast
def print_ast(src):
    print(ast.dump(src, indent=4))
def print_code(src):
    print(ast.unparse(ast.fix_missing_locations(src)))

In [727]:
def simple_function(x):
    y = 3 * x
    print(y)

In [728]:
import inspect

In [729]:
tree = ast.parse(inspect.getsource(simple_function))

In [730]:
print_ast(tree)

Module(
    body=[
        FunctionDef(
            name='simple_function',
            args=arguments(
                posonlyargs=[],
                args=[
                    arg(arg='x')],
                kwonlyargs=[],
                kw_defaults=[],
                defaults=[]),
            body=[
                Assign(
                    targets=[
                        Name(id='y', ctx=Store())],
                    value=BinOp(
                        left=Constant(value=3),
                        op=Mult(),
                        right=Name(id='x', ctx=Load()))),
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Name(id='y', ctx=Load())],
                        keywords=[]))],
            decorator_list=[])],
    type_ignores=[])


In [731]:
print_ast(tree.body[0])

FunctionDef(
    name='simple_function',
    args=arguments(
        posonlyargs=[],
        args=[
            arg(arg='x')],
        kwonlyargs=[],
        kw_defaults=[],
        defaults=[]),
    body=[
        Assign(
            targets=[
                Name(id='y', ctx=Store())],
            value=BinOp(
                left=Constant(value=3),
                op=Mult(),
                right=Name(id='x', ctx=Load()))),
        Expr(
            value=Call(
                func=Name(id='print', ctx=Load()),
                args=[
                    Name(id='y', ctx=Load())],
                keywords=[]))],
    decorator_list=[])


In [732]:
import random

We create a PythonMutator class that links to other Mutator classes, the PythonMutator class tying all of them together with helper functions to call those class' methods. We can later use it to add probabilities etc. to each mutation.

In [733]:
class PythonMutator:
    def visit_Module(self, src):
        return self.generic_visit(src)
    
    def visit_FunctionDef(self, src):
        return self.generic_visit(src)
    
    def visit_BinOp(self, src):
        return self.generic_visit(src)
    
    def visit_Assign(self, src):
        return self.generic_visit(src)
    
    def visit_Call(self, src):
        return self.generic_visit(src)
    
    def visit_Name(self, src):
        return self.generic_visit(src)
    
    def visit_Constant(self, src):
        return self.generic_visit(src)

    def expand_constants(self, src, trials=3):
        return ExprMutator().modify_value(src, trials)
    
    def swap_numbers(self, src):
        return ExprMutator().commute_value(src)

Now we need to define modify_value that can replace a given constant with an equivalent arithmetic expression, and swap_numbers that will swap the children of a + or * node.

In [734]:
op_map = [("+", ast.Add()), ("*", ast.Mult()), ("/", ast.Div()), ("-", ast.Sub())]

In [735]:
class ExprMutator(ast.NodeTransformer):
    EXPAND = 1
    COMMUTE = 2

    def __init__(self):
        self.transform = False
        self.trials = 0
        self.mode = self.EXPAND

    def modify_value(self, src, trials):
        self.mode = self.EXPAND
        self.trials = trials
        return self._modify_value(src)
    
    def _modify_value(self, src):
        if self.trials == 0: return src
        self.transform = True
        self.visit(random.choice(src.body))
        return self._modify_value(src)

We need the mode so we can swap between traversing a path and swapping children. Depth allows us to control how many numbers we want to go and replace with expressions.

In [736]:
class ExprMutator(ExprMutator):    
    def commute_value(self, n):
        self.mode = self.COMMUTE
        return self.visit(n)

Now come the real functions. The visits to Constant or BinOp nodes are what will truly handle the functionality.

In [737]:
class ExprMutator(ExprMutator):
    def visit_Constant(self, src):
        if isinstance(src.value, int) and self.transform and self.mode == self.EXPAND:
            while True:
                try:
                    op = random.randint(0, 3)
                    other = random.randint(-10000, 10000)
                    assert eval("(" + str(src.value) + op_map[3-op][0] + str(other) + ")" + op_map[op][0] + str(other)) == src.value
                    break
                except ZeroDivisionError: continue
                except AssertionError: continue
            self.trials -= 1
            self.transform = False
            return ast.BinOp(left = ast.Constant(value=eval("(" + str(src.value) + op_map[3-op][0] + str(other) + ")")), op = op_map[op][1], right = ast.Constant(value=other))
            
        return src

    def visit_BinOp(self, src):
        if self.mode == self.EXPAND:
            if random.randint(1, 2) == 1:
                src.left = self.visit(src.left)
            else:
                src.right = self.visit(src.right)
            return src
        
        if self.mode == self.COMMUTE:
            if isinstance(src.op, ast.Add) or isinstance(src.op, ast.Mult):
                src.left, src.right = src.right, src.left
                
            return self.generic_visit(src)

In [738]:
print_ast(ExprMutator().modify_value(ast.Module(body=[ast.Expr(value=ast.Constant(value=1))]), trials=2))

Module(
    body=[
        Expr(
            value=BinOp(
                left=Constant(value=-2537),
                op=Div(),
                right=BinOp(
                    left=Constant(value=-6693),
                    op=Sub(),
                    right=Constant(value=-4156))))])


In [739]:
print_code(PythonMutator().expand_constants(ast.parse("x+1")))

x + (-12581 - -6335 - -0.9402468392534618 * 6644)


In [740]:
from copy import deepcopy

In [741]:
print_code(PythonMutator().swap_numbers(ast.parse("x + 0.00042426813746287653 * (-5.193317422434368 * 1257 + 8885)")))

(8885 + 1257 * -5.193317422434368) * 0.00042426813746287653 + x


In [742]:
tree_two = deepcopy(tree)
for i in range(5):
    if random.randint(1, 5) == 1: PythonMutator().swap_numbers(tree_two)
    else: PythonMutator().expand_constants(tree_two)
new_code = ast.unparse(tree_two)
print(new_code)

def simple_function(x):
    y = (0.7118055555555556 * ((-82486999 - 5705) / -9877) + (-48422259 - -901) / (20836993 / (-61945929 / -8139 * 0.3359611089212981))) * x
    print(y)


In [743]:
simple_function(456)

1368


In [744]:
exec(new_code)

In [745]:
simple_function(456)

1368.0


Clearly the output remains the same inspite of our changes. Next, we look into transforming range-based for loops into while loops.

In [746]:
for_tree = ast.parse('''for i in range(10, 1, -2):
                        print(i)''')
print_ast(for_tree)

Module(
    body=[
        For(
            target=Name(id='i', ctx=Store()),
            iter=Call(
                func=Name(id='range', ctx=Load()),
                args=[
                    Constant(value=10),
                    Constant(value=1),
                    UnaryOp(
                        op=USub(),
                        operand=Constant(value=2))],
                keywords=[]),
            body=[
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Name(id='i', ctx=Load())],
                        keywords=[]))],
            orelse=[])],
    type_ignores=[])


In [747]:
while_tree = ast.parse('''
i = 10
while i > 1:
    print(i)
    i += -2''')
print_ast(while_tree)

Module(
    body=[
        Assign(
            targets=[
                Name(id='i', ctx=Store())],
            value=Constant(value=10)),
        While(
            test=Compare(
                left=Name(id='i', ctx=Load()),
                ops=[
                    Gt()],
                comparators=[
                    Constant(value=1)]),
            body=[
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Name(id='i', ctx=Load())],
                        keywords=[])),
                AugAssign(
                    target=Name(id='i', ctx=Store()),
                    op=Add(),
                    value=UnaryOp(
                        op=USub(),
                        operand=Constant(value=2)))],
            orelse=[])],
    type_ignores=[])


In [748]:
src = for_tree.body[0]
def analyze_for(node):
    args = node.iter.args
    if len(args) == 1:
        return [ast.Constant(value=0), ast.Lt(), args[0], ast.Constant(value=1)]
    elif len(args) == 2:
        return [args[0], ast.Lt(), args[1], ast.Constant(value=1)]
    else:
        step = eval(ast.unparse(args[2]))
        if step < 0:
            return [args[0], ast.Gt(), args[1], args[2]]
        else:
            return [args[0], ast.Lt(), args[1], args[2]]
        
while_args = analyze_for(src)
print_ast(
    ast.Assign(targets=[src.target], value=while_args[0])
    ) 
print_ast(
    ast.While(test=ast.Compare(left=ast.Name(id=src.target.id, ctx=ast.Load()), ops=[while_args[1]], comparators=[while_args[2]]), \
              body=src.body + [ast.AugAssign(target=src.target, op=ast.Add(), value=while_args[3])], orelse=src.orelse)
)

Assign(
    targets=[
        Name(id='i', ctx=Store())],
    value=Constant(value=10))
While(
    test=Compare(
        left=Name(id='i', ctx=Load()),
        ops=[
            Gt()],
        comparators=[
            Constant(value=1)]),
    body=[
        Expr(
            value=Call(
                func=Name(id='print', ctx=Load()),
                args=[
                    Name(id='i', ctx=Load())],
                keywords=[])),
        AugAssign(
            target=Name(id='i', ctx=Store()),
            op=Add(),
            value=UnaryOp(
                op=USub(),
                operand=Constant(value=2)))],
    orelse=[])


In [749]:
class PythonMutator(PythonMutator):
    def transform_for(self, src):
        return ForMutator().visit(src)

In [750]:
print_ast(ast.parse("for i in delays: print(i)"))
print_ast(ast.parse("for i in range(2): print(i)"))

Module(
    body=[
        For(
            target=Name(id='i', ctx=Store()),
            iter=Name(id='delays', ctx=Load()),
            body=[
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Name(id='i', ctx=Load())],
                        keywords=[]))],
            orelse=[])],
    type_ignores=[])
Module(
    body=[
        For(
            target=Name(id='i', ctx=Store()),
            iter=Call(
                func=Name(id='range', ctx=Load()),
                args=[
                    Constant(value=2)],
                keywords=[]),
            body=[
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Name(id='i', ctx=Load())],
                        keywords=[]))],
            orelse=[])],
    type_ignores=[])


In [751]:
class ForMutator(ast.NodeTransformer):
    def visit_For(self, src):  
        try: while_args = analyze_for(src)
        except: return src

        return [ast.Assign(targets=[src.target], value=while_args[0]), \
                ast.While(test=ast.Compare(left=ast.Name(id=src.target.id, ctx=ast.Load()), ops=[while_args[1]], comparators=[while_args[2]]), \
                          body=src.body + [ast.AugAssign(target=src.target, op=ast.Add(), value=while_args[3])], orelse=src.orelse)]

In [752]:
for_tree_two = deepcopy(for_tree)
print(ast.unparse(for_tree))
print("====")
print_code(PythonMutator().transform_for(for_tree_two))

for i in range(10, 1, -2):
    print(i)
====
i = 10
while i > 1:
    print(i)
    i += -2


That takes care of for-loops based on ranges. <b>What about iterators?</b>

In [753]:
print_ast(ast.parse(
'''
L = [1, 4, "hello"]
for i in [len(str(x)) for x in L]:
    print(i)
'''
))

Module(
    body=[
        Assign(
            targets=[
                Name(id='L', ctx=Store())],
            value=List(
                elts=[
                    Constant(value=1),
                    Constant(value=4),
                    Constant(value='hello')],
                ctx=Load())),
        For(
            target=Name(id='i', ctx=Store()),
            iter=ListComp(
                elt=Call(
                    func=Name(id='len', ctx=Load()),
                    args=[
                        Call(
                            func=Name(id='str', ctx=Load()),
                            args=[
                                Name(id='x', ctx=Load())],
                            keywords=[])],
                    keywords=[]),
                generators=[
                    comprehension(
                        target=Name(id='x', ctx=Store()),
                        iter=Name(id='L', ctx=Load()),
                        ifs=[],
                        is_async=0)

In [754]:
print_ast(ast.parse('x,y = 3 + 5, 3 + 5'))

Module(
    body=[
        Assign(
            targets=[
                Tuple(
                    elts=[
                        Name(id='x', ctx=Store()),
                        Name(id='y', ctx=Store())],
                    ctx=Store())],
            value=Tuple(
                elts=[
                    BinOp(
                        left=Constant(value=3),
                        op=Add(),
                        right=Constant(value=5)),
                    BinOp(
                        left=Constant(value=3),
                        op=Add(),
                        right=Constant(value=5))],
                ctx=Load()))],
    type_ignores=[])


In [755]:
print_ast(ast.parse('''
tmp1, tmp2 = 3 + 5, 3 + 5
x, y = tmp1, tmp2'''))

Module(
    body=[
        Assign(
            targets=[
                Tuple(
                    elts=[
                        Name(id='tmp1', ctx=Store()),
                        Name(id='tmp2', ctx=Store())],
                    ctx=Store())],
            value=Tuple(
                elts=[
                    BinOp(
                        left=Constant(value=3),
                        op=Add(),
                        right=Constant(value=5)),
                    BinOp(
                        left=Constant(value=3),
                        op=Add(),
                        right=Constant(value=5))],
                ctx=Load())),
        Assign(
            targets=[
                Tuple(
                    elts=[
                        Name(id='x', ctx=Store()),
                        Name(id='y', ctx=Store())],
                    ctx=Store())],
            value=Tuple(
                elts=[
                    Name(id='tmp1', ctx=Load()),
                    Name(id='

In [756]:
class AssignMutator(ast.NodeTransformer):
    def __init__(self):
        self.in_assign = False

    def visit_Name(self, src):
        if self.in_assign: return ast.Name(id = '_' + str(random.randint(1087345, 196871238674)), ctx = src.ctx)
        return src
     
    def visit_Assign(self, src):
        self.in_assign = True
        new_target = self.visit(deepcopy(src.targets[0]))
        self.in_assign = False
        return [ast.Assign(targets = [new_target], value=src.value), ast.Assign(targets=src.targets, value=NameHandler().get_name(deepcopy(new_target)))]

The naive method is to copy all targets and rename them on a line above. 

List and Dict subscripts are an issue for this. For example the following AST:

Assign(
    targets=[
        Subscript(
            value=Name(id='gates', ctx=Load()),
            slice=Subscript(
                value=Name(id='inps', ctx=Load()),
                slice=Constant(value=0),
                ctx=Load()),
            ctx=Store())],
    value=Call(
        func=Name(id='float', ctx=Load()),
        args=[
            Subscript(
                value=Name(id='inps', ctx=Load()),
                slice=Constant(value=1),
                ctx=Load())],
        keywords=[]
        )
)

representing 

gates[inps[0]] = float(inps[1])

gets transformed to

xxx[yyy[0]] = float(inps[1])
gates[inps[0]] = xxx[yyy[0]]

but this is problematic because xxx and yyy aren't declared as lists, which they need to be.

To get around this, we need to transform each target to a single variable node.

x, y = 5, 3

must be transformed to

tmp = 5, 3
x, y = tmp

instead.

In [757]:
class AssignMutator(ast.NodeTransformer):
    def visit_Assign(self, src):
        new_target = ast.Name(id = '_' + str(random.randint(1087345, 196871238674)), ctx = ast.Store())
        return [ast.Assign(targets = [new_target], value=src.value), ast.Assign(targets=src.targets, value=NameHandler().get_name(deepcopy(new_target)))]

In [758]:
class NameHandler(ast.NodeTransformer):
    def get_name(self, src, mode="LOAD"):
        self.mode = mode
        return self.visit(src)

    def visit_Name(self, src):
        if self.mode == "LOAD":
            return ast.Name(id = src.id, ctx = ast.Load())

In [759]:
print_ast(AssignMutator().visit(ast.parse('x,y = 3 + 5, 5 + 3')))

Module(
    body=[
        Assign(
            targets=[
                Name(id='_74411287236', ctx=Store())],
            value=Tuple(
                elts=[
                    BinOp(
                        left=Constant(value=3),
                        op=Add(),
                        right=Constant(value=5)),
                    BinOp(
                        left=Constant(value=5),
                        op=Add(),
                        right=Constant(value=3))],
                ctx=Load())),
        Assign(
            targets=[
                Tuple(
                    elts=[
                        Name(id='x', ctx=Store()),
                        Name(id='y', ctx=Store())],
                    ctx=Store())],
            value=Name(id='_74411287236', ctx=Load()))],
    type_ignores=[])


In [760]:
print_ast(ast.parse('x=y=5'))

Module(
    body=[
        Assign(
            targets=[
                Name(id='x', ctx=Store()),
                Name(id='y', ctx=Store())],
            value=Constant(value=5))],
    type_ignores=[])


In [761]:
print_ast(AssignMutator().visit(ast.parse('x=y=5')))

Module(
    body=[
        Assign(
            targets=[
                Name(id='_29192084756', ctx=Store())],
            value=Constant(value=5)),
        Assign(
            targets=[
                Name(id='x', ctx=Store()),
                Name(id='y', ctx=Store())],
            value=Name(id='_29192084756', ctx=Load()))],
    type_ignores=[])


In [762]:
print_code(AssignMutator().visit(ast.parse('x,y = 3 + 5, 5 + 3')))

_61941644309 = (3 + 5, 5 + 3)
x, y = _61941644309


In [763]:
print_code(AssignMutator().visit(ast.parse('x=y=5')))

_111371291445 = 5
x = y = _111371291445


In [764]:
class PythonMutator(PythonMutator):
    def transform_assign(self, src):
        return AssignMutator().visit(src)

In [765]:
tree = ast.parse(r'''
with open("circuit.txt", "r") as F:
    circuit = F.readlines() # read circuit file into a list
with open("gate_delays.txt", "r") as F:
    delays = F.readlines() # read gate delays into a list

gates = {-1: 0} # prepare dictionary to allow simpler access of gate delays
nodes = {} # prepare dictionary to store node data
out_nodes = [] # prepare list to store names of output nodes
flag1 = flag2 = flag3 = False # prep for processing circuit later

# loop to assign delay value to each kind of gate
for i in delays:
    x = i.strip() # ignore trailing whitespace
    if x[:2] == "//": continue # ignoring whitespace followed by //
    if len(x) == 0: continue # ignoring blank lines or whitespace-only lines
    inps = x.split() # separate line into words
    gates[inps[0]] = float(inps[1]) # assign corresponding delay values with key as gate name

for i in circuit:
    x = i.strip() # ignore trailing whitespace
    if x[:2] == "//": continue # ignoring whitespace followed by //
    if len(x) == 0: continue # ignoring blank lines or whitespace-only lines
    inps = x.split() # separate line into words
    if inps[0] == "PRIMARY_INPUTS": # handling input signal data
        for j in inps[1:]:
            nodes[j] = [0, [], -1] # initializing data with 0 value of delay, no nodes feeding in, associated with no gate  
        flag1 = True # flag to say input signals have been read
        continue
    if inps[0] == "INTERNAL_SIGNALS": # handling internal signal data
        for j in inps[1:]:
            nodes[j] = [0, [], -1] # initializing data with 0 value of delay, no nodes feeding in, associated with no gate
        flag2 = True # flag to say internal signals have been read
        continue
    if inps[0] == "PRIMARY_OUTPUTS": # handling output signal data
        for j in inps[1:]:
            nodes[j] = [0, [], -1] # initializing data with 0 value of delay, no nodes feeding in, associated with no gate
        out_nodes.extend(inps[1:]) # list of output nodes
        flag3 = True # flag to say output signals have been read
        continue
    if flag1 and flag2 and flag3: break # break the loop if all 3 conditions are met before loop termination

for i in circuit: # processing the input and setting up input nodes and gates for each node
    x = i.strip() # ignore trailing whitespace
    if x[:2] == "//": continue # ignoring whitespace followed by //
    if len(x) == 0: continue # ignoring blank lines or whitespace-only lines
    inps = x.split() # separate line into words
    if ((inps[0]=="PRIMARY_INPUTS") or (inps[0]=="INTERNAL_SIGNALS") or (inps[0]=="PRIMARY_OUTPUTS")): 
        continue # ignore signal lines
    out = inps[-1]
    nodes[out][1].extend(inps[1:-1]) # set up input nodes for each node
    nodes[out][2] = inps[0] # set gate delay for relevant nodes

def calcVal_A(x): # recursive function to calculate the delay at each node
    # print(x, nodes) # debug line
    if nodes[x][1] == []: return nodes[x][0] # skip recursive step if node already processed
    s = 0
    for i in nodes[x][1]: # find max delay time of each input node
        nodes[i][0] = calcVal_A(i) # recursive call to function
        s = max(nodes[i][0], s) # node delay that controls delay time of output
    nodes[x][1] = [] # clear input nodes to indicate node delay is already calculated
    return s + gates[nodes[x][2]] # gate delay compensation

to_write = [] # initialize array of lines to be written to output

for i in out_nodes:
    nodes[i][0] = calcVal_A(i) # calculate delay for each output node using the recursive function
    if nodes[i][0] == round(nodes[i][0]): nodes[i][0] = round(nodes[i][0])
    to_write.append(i + " " + str(nodes[i][0]) + "\n") # write delay at each output node to array

with open("output_delays.txt", "w") as F:
    F.writelines(to_write) # write output array to file
''')

In [766]:
print_code(PythonMutator().expand_constants(PythonMutator().transform_assign(tree)))

with open('circuit.txt', 'r') as F:
    _37083758921 = F.readlines()
    circuit = _37083758921
with open('gate_delays.txt', 'r') as F:
    _134919776003 = F.readlines()
    delays = _134919776003
_148725109577 = {-1: 0}
gates = _148725109577
_73099917544 = {}
nodes = _73099917544
_85124048316 = []
out_nodes = _85124048316
_191936408691 = False
flag1 = flag2 = flag3 = _191936408691
for i in delays:
    _55972117923 = i.strip()
    x = _55972117923
    if x[:6828 / 3414] == '//':
        continue
    if len(x) == 0:
        continue
    _142109322958 = x.split()
    inps = _142109322958
    _73282818627 = float(inps[1])
    gates[inps[0]] = _73282818627
for i in circuit:
    _15238694815 = i.strip()
    x = _15238694815
    if x[:2] == '//':
        continue
    if len(x) == 0:
        continue
    _135783952074 = x.split()
    inps = _135783952074
    if inps[0] == 'PRIMARY_INPUTS':
        for j in inps[1:]:
            _169629850831 = [0, [], -1]
            nodes[j] = _169629850831


In [767]:
import trace
import sys

In [768]:
def traceit(frame, event, arg):
    """Trace program execution. To be passed to sys.settrace()."""
    if event == 'line':
        global coverage
        function_name = frame.f_code.co_name
        lineno = frame.f_lineno
        vars = dict(frame.f_locals)
        coverage.append([function_name, lineno, vars])
    return traceit

def tracer(f):
    global coverage
    coverage = []
    sys.settrace(traceit)  # Turn on
    f()
    sys.settrace(None)    # Turn off

In [769]:
def g():
    def simple_function(x):
        z = int(2)
        y = int(3) * x
        return y
    
    a = simple_function(2)
    b = int(0)
    for _ in range(int(6)):
        b += int(2) * a

    print("The answer is", b)

In [770]:
tracer(g)

The answer is 72


In [771]:
for i in coverage:
    print(f"{i[0]} {i[1]} {i[2]}")

g 2 {}
g 7 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>}
simple_function 3 {'x': 2}
simple_function 4 {'x': 2, 'z': 2}
simple_function 5 {'x': 2, 'z': 2, 'y': 6}
g 8 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6}
g 9 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6, 'b': 0}
g 10 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6, 'b': 0, '_': 0}
g 9 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6, 'b': 12, '_': 0}
g 10 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6, 'b': 12, '_': 1}
g 9 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6, 'b': 24, '_': 1}
g 10 {'simple_function': <function g.<locals>.simple_function at 0x00000158ACBEDBC0>, 'a': 6, 'b': 24, '_': 2}
g 9 {'simple_function': <function g.<locals>.simple_funct

In [772]:
g_tree = ast.parse(inspect.getsource(g)).body[0]
print_ast(g_tree)

FunctionDef(
    name='g',
    args=arguments(
        posonlyargs=[],
        args=[],
        kwonlyargs=[],
        kw_defaults=[],
        defaults=[]),
    body=[
        FunctionDef(
            name='simple_function',
            args=arguments(
                posonlyargs=[],
                args=[
                    arg(arg='x')],
                kwonlyargs=[],
                kw_defaults=[],
                defaults=[]),
            body=[
                Assign(
                    targets=[
                        Name(id='z', ctx=Store())],
                    value=Call(
                        func=Name(id='int', ctx=Load()),
                        args=[
                            Constant(value=2)],
                        keywords=[])),
                Assign(
                    targets=[
                        Name(id='y', ctx=Store())],
                    value=BinOp(
                        left=Call(
                            func=Name(id='int', ctx=Load()

In [773]:
print_ast(ast.parse('def f(x, y, *, z=3): print(x)'))

Module(
    body=[
        FunctionDef(
            name='f',
            args=arguments(
                posonlyargs=[],
                args=[
                    arg(arg='x'),
                    arg(arg='y')],
                kwonlyargs=[
                    arg(arg='z')],
                kw_defaults=[
                    Constant(value=3)],
                defaults=[]),
            body=[
                Expr(
                    value=Call(
                        func=Name(id='print', ctx=Load()),
                        args=[
                            Name(id='x', ctx=Load())],
                        keywords=[]))],
            decorator_list=[])],
    type_ignores=[])


In [774]:
for node in g_tree.body:
    print(node.lineno)

2
7
8
9
12


In [775]:
def get_trace(src):
    for node in src.body:
        data = src.name, node.lineno
        print(data)
        if isinstance(node, ast.FunctionDef):
            get_trace(node)

In [776]:
get_trace(g_tree)

('g', 2)
('simple_function', 3)
('simple_function', 4)
('simple_function', 5)
('g', 7)
('g', 8)
('g', 9)
('g', 12)


Now that we can get the line data for each node in the AST, we can get the data of the local variables at a particular AST node and use it for substitutions.

In [777]:
class VariableInjector(ast.NodeTransformer):        
    def traceit(self, frame, event, arg):
        if event == 'line':
            function_name = frame.f_code.co_name
            lineno = frame.f_lineno
            vars = dict(frame.f_locals)
            self.coverage.append([function_name, lineno, vars])
        return self.traceit

    def tracer(self, f):
        self.coverage = []
        sys.settrace(self.traceit)  # Turn on
        f()
        sys.settrace(None)    # Turn off

    def profile_function(self, f, fn_tree = None):
        if fn_tree is None:
            fn_tree = ast.parse(inspect.getsource(f)).body[0]
        self.tracer(f)

        self.seen = set()
        self.unstable = set()
        self.local_vars = set()
        self.browsing = True        
        self.visit(fn_tree)
        
        self.browsing = False
        self.visit(fn_tree)
            
        return fn_tree
    
                 

Currenly our class simply combines our existing methods, and then visits the AST. Now what we have to do is, while visiting the AST, we need to find the in-scope variables and their values at every line of execution. Then, we need to look for constants and check if they can be replaced by some variable or some simple arithmetic expression involving a variable.

In [778]:
import numpy as np

In [779]:
class VariableInjector(VariableInjector):
    def visit_Assign(self, src):
        if self.browsing:
            for v in src.targets:
                self.check_seens(v)

        return self.generic_visit(src)
    
    def visit_AugAssign(self, src):
        if self.browsing:
            v = src.target
            self.check_seens(v)

        return self.generic_visit(src)
    
    def visit_For(self, src):
        if self.browsing:
            v = src.target
            self.check_seens(v, True)

        return self.generic_visit(src)
            
    def check_seens(self, v, seen=False):
        if isinstance(v, ast.Tuple):
            for var in v.elts: self.check_seens(var, seen)
        elif isinstance(v, ast.Subscript): self.check_seens(v.value, seen)
        else: 
            if seen: self.seen.add(v.id)
            if v.id in self.seen: self.unstable.add(v.id)
            else: self.seen.add(v.id)
    
    def visit_FunctionDef(self, src):
        self.args = [x.arg for x in src.args.args + src.args.kwonlyargs]
        for node in src.body:
            if not self.browsing: self.get_locals(src.name, node.lineno)
            self.visit(node)
        

Moreover, we want that variables that are assigned lists to be usable by indexing the list. For this, we need to flatten the lists/dicts assigned in our code. This is much of the reason for the visit_Constant function's complexity; if we have a singular value we try substituting that, else we flatten the list and insert tuples of (node, node_value) that will be used for the substitution. 

In [780]:
print_ast(ast.parse("L[1]"))
print_ast(ast.parse("[1,2,3]"))
print_ast(ast.parse("D['x']"))

Module(
    body=[
        Expr(
            value=Subscript(
                value=Name(id='L', ctx=Load()),
                slice=Constant(value=1),
                ctx=Load()))],
    type_ignores=[])
Module(
    body=[
        Expr(
            value=List(
                elts=[
                    Constant(value=1),
                    Constant(value=2),
                    Constant(value=3)],
                ctx=Load()))],
    type_ignores=[])
Module(
    body=[
        Expr(
            value=Subscript(
                value=Name(id='D', ctx=Load()),
                slice=Constant(value='x'),
                ctx=Load()))],
    type_ignores=[])


In [781]:
class VariableInjector(VariableInjector):
    def visit_Constant(self, src):
        if len(self.local_vars) == 0 or self.browsing: return src

        queue = list(self.local_vars.keys()).copy()
        random.shuffle(queue)
        while len(queue) != 0:
            n = len(queue) - 1
            if isinstance(queue[len(queue)-1], tuple):
                val = queue[n][1]
                node = queue[n][0]
            else:
                val = self.local_vars[queue[n]]
                node = queue[n]

            queue.pop()
            new_node = None

            if isinstance(val, list) or isinstance(val, tuple):
                rand_val = list(enumerate(val)).copy()
                random.shuffle(rand_val)
                for i in range(len(rand_val)):
                    queue.append((ast.Subscript(value=node, slice=ast.Constant(rand_val[i][0])), rand_val[i][1]))
                
            elif isinstance(val, dict):
                rand_val = list(val.keys()).copy()
                random.shuffle(rand_val)
                for i in rand_val:
                    queue.append((ast.Subscript(value=node, slice=ast.Constant(i)), val[i]))

            else: new_node = self.unify_value(src, node, val)
            
            if new_node is not None: return new_node
            
        return src

Finally, we write the functions to set local_vars and replace basic types with appropriate variable calls.

In [782]:
class VariableInjector(VariableInjector):
    def get_locals(self, fn, ln):
        self.local_vars = {}
        for i in self.coverage:
            if i[0] == fn and i[1] == ln:
                self.local_vars = {ast.parse(k).body[0].value: v for k, v in i[2].items() if k not in self.args and k not in self.unstable}
                return
        
    def unify_value(self, src, var, val):
        if src.value == val:
            return var
        elif (isinstance(src.value, int) or isinstance(src.value, float)) and (isinstance(val, int) or isinstance(val, float)):
            try:
                op = random.randint(0, 3)
                assert eval("(" + str(src.value) + op_map[3-op][0] + str(val) + ")" + op_map[op][0] + str(val)) == src.value
                return ast.BinOp(left = ast.Constant(value=eval("(" + str(src.value) + op_map[3-op][0] + str(val) + ")")), op = op_map[op][1], right = var) 
            except ZeroDivisionError: return None
            except AssertionError: return None
        elif isinstance(src.value, str) and isinstance(val, str):
            if src.value in val:
                ind = val.find(src.value)
                return ast.Subscript(value = var, slice = ast.Slice(lower=ast.Constant(value=ind), upper=ast.Constant(value=ind+len(src.value))))
            elif val in src.value:
                ind = src.value.find(val)
                return ast.BinOp(left = ast.BinOp(left = ast.Constant(value = src.value[:ind]), op = ast.Add(), right = var), op = ast.Add(), right = ast.Constant(value = src.value[ind + len(val):]))
        

We have written the functions that traverse the tree and make appropriate calls to functions to get our local variables. Since we are running this entire thing on a function, the outermost scope will always be handled, and then similiarly inner scopes will get handled. One thing we should note is, when using get_locals, we should avoid substituting constants with arguments to the function, because it won't be consistent across function calls.

In [783]:
print_code(ast.parse(inspect.getsource(g)))
g()

def g():

    def simple_function(x):
        z = int(2)
        y = int(3) * x
        return y
    a = simple_function(2)
    b = int(0)
    for _ in range(int(6)):
        b += int(2) * a
    print('The answer is', b)
The answer is 72


In [784]:
new_g_code = ast.unparse(VariableInjector().profile_function(g))
print(new_g_code)

The answer is 72
def g():

    def simple_function(x):
        z = int(2)
        y = int(6 / z) * x
        return y
    a = simple_function(2)
    b = int(0 / a)
    for _ in range(int(a)):
        b += int(0.3333333333333333 * a) * a
    print('The answer is', b)


In [785]:
exec(new_g_code)
g()

The answer is 72


Note that using exec to set the value of g currently breaks the VariableInjector because it is unable to find the source code of the function through inspect. I'm working on it creating a temporary function inside the PythonMutator class and instead modifying that so any code can have variables injected.

In [786]:
from typing import cast

In [787]:
class PythonMutator(PythonMutator):
    sample_tree = ast.parse('''
def pymutator_profile_function():
    pass
''')

    def inject_variables(self, src):
        node = deepcopy(self.sample_tree)
        node.body[0].body = src.body
        node = ast.fix_missing_locations(node)

        current_module = sys.modules[__name__]
        code = compile(node, filename="<ast>", mode="exec")
        exec(code, current_module.__dict__)

        VariableInjector().profile_function(pymutator_profile_function, node.body[0])
        return ast.Module(body=src.body, type_ignores=src.type_ignores)

In [788]:
test_str = ast.parse('''
a = "hello world"
print("hello")                     
''')
print_code(PythonMutator().inject_variables(test_str))

hello
a = 'hello world'
print(a[0:5])


In [789]:
test_list = ast.parse('''
L = [1,2,3]
a=[int(2),int(4),int(6)]
print(a)
print(L)                    
''')

PythonMutator().inject_variables(test_list)
print_code(test_list)

[2, 4, 6]
[1, 2, 3]
L = [1, 2, 3]
a = [int(1 + L[0]), int(2 + L[1]), int(8 - L[1])]
print(a)
print(L)


In [790]:
g_code = ast.parse('''
def simple_function(x):
    z = int(2)
    y = int(3) * x
    return y

a = simple_function(2)
b = int(0)
for _ in range(int(6)):
    b += int(2) * a
''')

print_code(g_code)
print("=============")
PythonMutator().expand_constants(g_code)
PythonMutator().expand_constants(g_code)
PythonMutator().inject_variables(g_code)
PythonMutator().expand_constants(g_code)
new_g_code = ast.parse(ast.unparse(g_code))
PythonMutator().inject_variables(new_g_code)
PythonMutator().swap_numbers(new_g_code)
PythonMutator().transform_for(new_g_code)
print_code(g_code)
print("=============")
print_code(new_g_code)

def simple_function(x):
    z = int(2)
    y = int(3) * x
    return y
a = simple_function(2)
b = int(0)
for _ in range(int(6)):
    b += int(2) * a
def simple_function(x):
    z = int(0.0003365303718660609 * (-0.10050251256281408 * 9154 - -6863))
    y = int(6 / z) * x
    return y
a = simple_function(8182 + 1.4601927882898964 * (-0.9006430868167202 * 6220))
b = int(9399.0 - a - (9387.0 + a))
for _ in range(int(585.6666666666666 * a + -0.46163968943282013 * (7605.0 - a))):
    b += int(8.0 - a) * a
def simple_function(x):
    z = int((9154 * -0.10050251256281408 - -6863) * 0.0003365303718660609)
    y = x * int((8 - z) / z)
    return y
a = simple_function(6220 * -0.9006430868167202 * 1.4601927882898964 + 8182)
b = int(a * 1566.5 - a - (a + (a + 9381.0)))
_ = 0
while _ < int((a * 1267.5 - a) * -(2.769838136596921 / a) + a * (3514.0 / a)):
    b += a * int(a * 1.3333333333333333 - a)
    _ += 1


In [792]:
g_code = ast.parse(r'''
with open("circuit.txt", "r") as F:
    circuit = F.readlines() # read circuit file into a list
with open("gate_delays.txt", "r") as F:
    delays = F.readlines() # read gate delays into a list

gates = {-1: 0} # prepare dictionary to allow simpler access of gate delays
nodes = {} # prepare dictionary to store node data
out_nodes = [] # prepare list to store names of output nodes
flag1 = flag2 = flag3 = False # prep for processing circuit later

# loop to assign delay value to each kind of gate
for i in delays:
    x = i.strip() # ignore trailing whitespace
    if x[:2] == "//": continue # ignoring whitespace followed by //
    if len(x) == 0: continue # ignoring blank lines or whitespace-only lines
    inps = x.split() # separate line into words
    gates[inps[0]] = float(inps[1]) # assign corresponding delay values with key as gate name

for i in circuit:
    x = i.strip() # ignore trailing whitespace
    if x[:2] == "//": continue # ignoring whitespace followed by //
    if len(x) == 0: continue # ignoring blank lines or whitespace-only lines
    inps = x.split() # separate line into words
    if inps[0] == "PRIMARY_INPUTS": # handling input signal data
        for j in inps[1:]:
            nodes[j] = [0, [], -1] # initializing data with 0 value of delay, no nodes feeding in, associated with no gate  
        flag1 = True # flag to say input signals have been read
        continue
    if inps[0] == "INTERNAL_SIGNALS": # handling internal signal data
        for j in inps[1:]:
            nodes[j] = [0, [], -1] # initializing data with 0 value of delay, no nodes feeding in, associated with no gate
        flag2 = True # flag to say internal signals have been read
        continue
    if inps[0] == "PRIMARY_OUTPUTS": # handling output signal data
        for j in inps[1:]:
            nodes[j] = [0, [], -1] # initializing data with 0 value of delay, no nodes feeding in, associated with no gate
        out_nodes.extend(inps[1:]) # list of output nodes
        flag3 = True # flag to say output signals have been read
        continue
    if flag1 and flag2 and flag3: break # break the loop if all 3 conditions are met before loop termination

for i in circuit: # processing the input and setting up input nodes and gates for each node
    x = i.strip() # ignore trailing whitespace
    if x[:2] == "//": continue # ignoring whitespace followed by //
    if len(x) == 0: continue # ignoring blank lines or whitespace-only lines
    inps = x.split() # separate line into words
    if ((inps[0]=="PRIMARY_INPUTS") or (inps[0]=="INTERNAL_SIGNALS") or (inps[0]=="PRIMARY_OUTPUTS")): 
        continue # ignore signal lines
    out = inps[-1]
    nodes[out][1].extend(inps[1:-1]) # set up input nodes for each node
    nodes[out][2] = inps[0] # set gate delay for relevant nodes

def calcVal_A(x): # recursive function to calculate the delay at each node
    # print(x, nodes) # debug line
    if nodes[x][1] == []: return nodes[x][0] # skip recursive step if node already processed
    s = 0
    for i in nodes[x][1]: # find max delay time of each input node
        nodes[i][0] = calcVal_A(i) # recursive call to function
        s = max(nodes[i][0], s) # node delay that controls delay time of output
    nodes[x][1] = [] # clear input nodes to indicate node delay is already calculated
    return s + gates[nodes[x][2]] # gate delay compensation

to_write = [] # initialize array of lines to be written to output

for i in out_nodes:
    nodes[i][0] = calcVal_A(i) # calculate delay for each output node using the recursive function
    if nodes[i][0] == round(nodes[i][0]): nodes[i][0] = round(nodes[i][0])
    to_write.append(i + " " + str(nodes[i][0]) + "\n") # write delay at each output node to array

with open("output_delays.txt", "w") as F:
    F.writelines(to_write) # write output array to file

''')

PythonMutator().transform_for(g_code)
PythonMutator().expand_constants(g_code, 5)
PythonMutator().inject_variables(g_code)
PythonMutator().expand_constants(g_code, 10)
print_code(g_code)

with open('circuit.txt', 'r') as F:
    circuit = F.readlines()
with open('gate_delays.txt', circuit[0.00043252595155709344 * 6936][7:8]) as F:
    delays = F.readlines()
gates = {-(15173 + -5709 - (15885 + -6422)): 0}
nodes = {}
out_nodes = []
flag1 = flag2 = flag3 = False
for i in delays:
    x = i.strip()
    if x[:348 + 0.05891367273965605 * -5873] == delays[2][0:2]:
        continue
    if len(x) == 0:
        continue
    inps = x.split()
    gates[inps[0]] = float(inps[1])
for i in circuit:
    x = i.strip()
    if x[:14660 / 7330] == delays[1][0:2]:
        continue
    if len(x) == 0:
        continue
    inps = x.split()
    if inps[0] == circuit[5][0:14]:
        for j in inps[1:]:
            nodes[j] = [0, [], -1]
        flag1 = True
        continue
    if inps[0] == circuit[7][0:16]:
        for j in inps[1:]:
            nodes[j] = [0, [], -1]
        flag2 = True
        continue
    if inps[0] == circuit[6][0:15]:
        for j in inps[1:]:
            nodes[j] = [0,