#### Reducing Code
Milan Conrad, 2569325

In [None]:
import sys

In [None]:
from types import FunctionType

In [None]:
import inspect

In [None]:
import bookutils

In [None]:
from Debugger import Debugger

#### Collecting a Call

We start by creating an infrastructure that collects a call. The `CallCollector` class saves the first call observed in `_function`, `_args`, and `_exception` attributes, respectively; it then turns tracing off.

In [None]:
class CallCollector(object):
    """Collect an exception-raising function call f().
    Use as `with CallCollector(): f()`"""

    def __init__(self):
        """Initialize collector"""
        self.init()

    def init(self):
        """Reset for new collection."""
        self._function = None
        self._args = {}
        self._exception = None

    def traceit(self, frame, event, arg):
        """Tracing function. Collect first call, then turn tracing off."""
        if event == 'call':
            name = frame.f_code.co_name
            if name.startswith('__'):
                # Internal function
                return
            if self._function is not None:
                # Already set
                return

            if name in frame.f_globals:
                # Access exactly this function
                self._function = frame.f_globals[name]
            elif name in frame.f_locals:
                self._function = frame.f_locals[name]
            else:
                # Create new function from given code
                self._function = FunctionType(frame.f_code,
                                              globals=frame.f_globals,
                                              name=name)

            self._args = {}  # Create a local copy of args
            for var in frame.f_locals:
                self._args[var] = frame.f_locals[var]

            # Turn tracing off
            sys.settrace(self.original_trace_function)
        
    def call(self, new_args={}):
        """Call collected function. If new_args is given,
        override arguments from its {var: value} entries."""
        args = {}  # Create local copy
        for var in self.args():
            args[var] = self.args()[var]
        for var in new_args:
            args[var] = new_args[var]

        return self.function()(**args)
    
    def format_call(self, args=None):
        """Return a string representing a call of the function with given args."""
        if args is None:
            args = self.args()
        return self.function().__name__ + "(" + \
            ", ".join(f"{arg}={repr(args[arg])}" for arg in args) + ")"

    def format_exception(self, exc=None):
        """Return a string representing the given exception."""
        if exc is None:
            exc = self.exception()
        s = type(exc).__name__
        if str(exc):
            s += ": " + str(exc)
        return s

    def after_collection(self):
        """Called after collection. To be defined in subclasses."""
        pass

    def args(self):
        """Return the dictionary of collected arguments."""
        return self._args

    def function(self):
        """Return the function called."""
        return self._function

    def exception(self):
        """Return the exception produced."""
        return self._exception

    def __enter__(self):
        """Called at begin of `with` block. Turn tracing on."""
        self.init()
        self.original_trace_function = sys.gettrace()
        sys.settrace(self.traceit)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Called at end of `with` block. Turn tracing off."""
        sys.settrace(self.original_trace_function)
        if self._function is None:
            return False  # Re-raise exception, if any

        self._exception = exc_value
        self.after_collection()
        return True  # Ignore exception

In [None]:
def mystery(inp):
    x = inp.find(chr(0o17 + 0o31))
    y = inp.find(chr(0o27 + 0o22))
    if x >= 0 and y >= 0 and x < y:
        raise ValueError("Invalid input")
    else:
        pass

In [None]:
failing_input = 'V"/+!aF-(V4EOz*+s/Q,7)2@0_'
failing_input

In [None]:
from ExpectError import ExpectError

In [None]:
with ExpectError(ValueError):
    mystery(failing_input)

In [None]:
def generic_test(inp, fun, expected_exc=None):
    result = None
    detail = ""
    try:
        result = fun(inp)
        outcome = PASS
    except Exception as exc:
        detail = f" ({type(exc).__name__}: {str(exc)})"
        if expected_exc is None:
            outcome = FAIL
        elif type(exc) == type(expected_exc) and str(exc) == str(expected_exc):
            outcome = FAIL
        else:
            outcome = UNRESOLVED

    print(f"{fun.__name__}({repr(inp)}): {outcome}{detail}")
    return outcome

In [None]:
PASS = 'PASS'
FAIL = 'FAIL'
UNRESOLVED = 'UNRESOLVED'

In [None]:
with CallCollector() as call_collector:
    mystery(failing_input)

In [None]:
call_collector.function()

In [None]:
call_collector.args()

In [None]:
call_collector.exception()

We first introduce a `CallReducer` class as an abstract superclass for all kinds of reducers.
Its `run()` method tests the function and returns PASS, FAIL, or UNRESOLVED. As with `generic_test()`, above, we check for exception type and exact error message.

In [None]:
class CallReducer(CallCollector):
    def __init__(self, log=False):
        """Initialize. If log is True, enable logging."""
        super().__init__()
        self.log = log
        self.reset()

    def reset(self):
        """Reset the number of tests."""
        self.tests = 0

    def run(self, args):
        """Run collected function with args. Return
        * PASS if no exception occurred
        * FAIL if the collected exception occurred
        * UNRESOLVED if some other exception occurred.
        Not to be used directly; can be overloaded in subclasses.
        """
        try:
            result = self.call(args)
        except Exception as exc:
            self.last_exception = exc
            if (type(exc) == type(self.exception()) and
                    str(exc) == str(self.exception())):
                return FAIL
            else:
                return UNRESOLVED  # Some other failure

        self.last_result = result
        return PASS

In [None]:
class CachingCallReducer(CallReducer):
    """Like CallReducer, but cache test outcomes."""

    def init(self):
        super().init()
        self._cache = {}

    def test(self, args):
        # Create a hashable index
        try:
            index = frozenset((k, v) for k, v in args.items())
        except TypeError:
            index = None

        if index is None:
            # Non-hashable value – do not use cache
            return super().test(args)

        if index in self._cache:
            return self._cache[index]

        outcome = super().test(args)
        self._cache[index] = outcome

        return outcome

In [None]:
def compile_and_run(lines):
    exec("".join(lines), {}, {})

In [None]:
import ast
import astor
from bookutils import show_ast
import copy
from Assertions import remove_html_markup
from bookutils import print_content

In [None]:
fun_source = inspect.getsource(remove_html_markup)

In [None]:
print_content(fun_source, '.py')

In [None]:
fun_tree = ast.parse(fun_source)

In [None]:
show_ast(fun_tree)

#### Traversing Syntax Trees

Our goal is now to reduce this tree (or at least the subtree with the function definition) to a minimum. 
To this end, we manipulate the AST through the Python modules `ast` and `astor`. The [official Python `ast` reference](http://docs.python.org/3/library/ast) is complete, but a bit brief; the documentation ["Green Tree Snakes - the missing Python AST docs"](https://greentreesnakes.readthedocs.io/en/latest/) provides an excellent introduction.

###### The two means for exploring and changing ASTs are the classes `NodeVisitor` and `NodeTransformer`, respectively. We start with creating a list of all nodes in the tree, using a `NodeVisitor` subclass.

Its `visit()` method is called for every node in the tree, which we achieve by having it return `self.generic_visit()` for the current node. It saves all visited nodes in the `_all_nodes` attribute.

In [None]:
from ast import NodeTransformer, NodeVisitor, fix_missing_locations

In [None]:
class NodeCollector(NodeVisitor):
    """Collect all nodes in an AST."""
    def __init__(self):
        super().__init__()
        self._all_nodes = []

    def generic_visit(self, node):
        self._all_nodes.append(node) 
        #print('generic_visit of ' + str(type(node)))
        return super().generic_visit(node)

    def collect(self, tree):
        """Return a list of all nodes in tree."""
        self._all_nodes = []
        self.visit(tree)
        return self._all_nodes

In [None]:
fun_nodes = NodeCollector().collect(fun_tree)

#### Deleting Nodes

In our next step, we write some code that, given such a list of nodes, _prunes_ the tree such that _only_ elements in the list are still contained. To this end, we proceed in four steps:

1. We traverse the original AST, _marking_ all nodes as "to be deleted".
2. We traverse the given list of nodes, clearing their markers.
3. We copy the original tree (including the markers) into a new tree – the one to be reduced.
4. We traverse the new tree, now deleting all marked nodes.

Why do we go through such an extra effort? The reason is that our list of nodes contains references into the _original_ tree – a tree that needs to stay unchanged such that we can reuse it for later. The new tree (the copy) has the same nodes, but at different addresses, so our original references cannot be used anymore. Markers, however, just like any other attributes, are safely copied from the original into the new tree.

The `NodeMarker()` visitor marks all nodes in a tree:

In [None]:
class NodeMarker(NodeVisitor):
    def visit(self, node):
        node.marked = True
        return super().generic_visit(node)


The NodeReducer() transformer reduces all marked nodes. If a method visit_<node class>() is defined, it will be invoked; otherwise, visit_Node() is invoked, which deletes the node (and its subtree) by returning None. 

In [None]:
class NodeReducer(NodeTransformer):
    PASS_TREE = ast.parse("pass").body[0]
    FALSE_TREE = ast.parse("False").body[0].value

    
    def __init__(self):
        super().__init__()
        self.left_expansion = False
    
    def visit(self, node):
        method = 'visit_' + node.__class__.__name__
        visitor = getattr(self, method, self.visit_Node)
        return visitor(node)

    def visit_Module(self, node):
        # Can't remove modules
        return super().generic_visit(node)
    
    def visit_FunctionDef(self, node):
        # Not sure if we can remove function_def
        return super().generic_visit(node)

    def visit_Node(self, node):
        """Default visitor for all nodes"""
        if node.marked:
            return None  # delete it
        return super().generic_visit(node)
    
    def visit_Assign(self, node):
        if node.marked:
            # Replace by pass
            return self.PASS_TREE
        return super().generic_visit(node)
    
    def visit_Compare(self, node):
        if node.marked:
            # Replace by False
            return self.FALSE_TREE
        return super().generic_visit(node)
    
    def visit_BoolOp(self, node):
        print("Visited boolOp (not marked)")
        if node.marked:
            # Replace by left operator
            node.left_expansion = !node.left_expansion
            print("Visited boolOp (marked)")
            if(node.left_expansion):
                print("Left Expansion: "+str(node.left_expansion))
                if node.left_expansion:
                    return node.values[0]
                else:
                    return node.values[1]
            else:
                return node.values[0]
        return super().generic_visit(node)
    
    def visit_If(self, node):
        if node.marked:
            # Replace by body
            return node.body
        return super().generic_visit(node)

Our function `copy_and_reduce()` puts these two pieces together: It marks all cells and delete all cells except for those in the keep list

In [None]:
def copy_and_reduce(tree, keep_list):
    """Copy tree, reducing all nodes that are not in keep_list."""

    # Mark all nodes except those in keep_list
    NodeMarker().visit(tree)
    for node in keep_list:
        # print("Clearing", node)
        node.marked = False

    # Copy tree and delete marked nodes
    new_tree = copy.deepcopy(tree)
    NodeReducer().visit(new_tree)
    return new_tree

#### Reducing Trees

We can put all these steps together in a single function. `compile_and_test_ast()` takes a tree and a list of nodes, reduces the tree to those nodes in the list, and then compiles and runs the reduced AST.

In [None]:
def compile_and_test_ast(tree, keep_list, test_tree=None):
    new_tree = copy_and_reduce(tree, keep_list)
    # print(astor.to_source(new_tree))

    if test_tree is not None:
        new_tree.body += test_tree.body

    try:
        code_object = compile(new_tree, '<string>', 'exec')
    except Exception:
        raise SyntaxError("Cannot compile")

    exec(code_object, {}, {})

In [None]:
#fun_nodes

In [None]:
astor.to_source(fun_nodes[4])

In [None]:
keep_list = fun_nodes.copy()
del keep_list[4]
new_fun_tree = copy_and_reduce(fun_tree, keep_list)

In [None]:
show_ast(new_fun_tree)

In [None]:
test_source = (
    '''if remove_html_markup('<foo>bar</foo>') != 'bar':\n''' +
    '''    raise RuntimeError("Missing functionality")\n''' +
    '''assert remove_html_markup('"foo"') == '"foo"', "My Test"''')

In [None]:
test_tree = ast.parse(test_source)

In [None]:
#show_ast(test_tree)

In [None]:
print_content(astor.to_source(test_tree), '.py')

## Parsers

In [None]:
class ParserException(Exception):
    pass

In [None]:
class Parser(ast.NodeVisitor):
    """The base class for a parser"""
    def parse_tree(self, tree):
        self.visit(tree)

    def parse(self, source):
        tree = ast.parse(source=source)
        self.parse_tree(tree)
        return "The input was successfully parsed."

In [None]:
class Parser1(Parser):
    """
    Contains boolean operation
    """    
    def visit_BoolOp(self, node):
        raise ParserException(f"Something went wrong")

In [None]:
class Parser2(Parser):
    """
    Fails if an input program contains `if` statement
    """
    def visit_If(self, node):
        raise ParserException(f"Something went wrong")

In [None]:
class Parser3(Parser):
    """
    Fails if an input program contains a special unicode character
    """
    def __init__(self) -> None:
        self.assignment = False
        self.steps = 0

    def check_unicode(self, string):
        return string == u'\u0426'

    def generic_visit(self, node):
        self.steps += 1
        super().generic_visit(node)

    def visit_Assign(self, node):
        self.assignment = True
        self.steps = 0
        self.generic_visit(node)

    def visit_Str(self, node):
        if self.assignment and self.steps == 3:
            if self.check_unicode(node.s):
                raise ParserException(f"Something went wrong")

In [None]:
class Parser4(Parser):
    """
    Fails if an input program contains a variable which is not defined
    """
    def __init__(self) -> None:
        self.assignment = False
        self.steps = 0
        self.variables = set()

    def generic_visit(self, node):
        self.steps += 1
        super().generic_visit(node)

    def visit_Name(self, node):
        if self.assignment and self.steps == 1:
            self.variables.add(node.id)
            self.assignment = False
            self.generic_visit(node)
        elif node.id in self.variables:
            self.generic_visit(node)
        else:
            raise ParserException(f"Something went wrong")

    def visit_Assign(self, node):
        self.assignment = True
        self.steps = 0
        self.generic_visit(node)

In [None]:
class Parser5(Parser):
    """
    Fails if an input program contains a list
    """

    def visit_List(self, node):
        raise ParserException(f"Something went wrong")

## Tests

In [None]:
class Test1:
    parser = Parser1()

    def get_original(self):
        return '''
def original():
    a = True
    b = not False
    c = 30
    for i in range(c):
        if i == 15:
            if a and b:
                return 1
    return 0
'''

    def get_minimized(self):
        return '''
True and True'''

In [None]:
class Test2:
    parser = Parser2()
    def get_original(self):
        return '''
def original():
    a = True
    b = not False
    c = 30
    for i in range(c):
        if i == 15:
            if a and b:
                return 1
    return 0
    '''

    def get_minimized(self):
        return '''
if True:
    return
'''

In [None]:
class Test3:
    parser = Parser3()
    def get_original(self):
        return '''
def original():
    a = 1
    b = a
    c = a - b
    if c < a:
        d = ''
        while a == b:
            d = u'\u0426'
            a += 1
        return d
    return ''
'''

    def get_minimized(self):
        return '''
d = u'\u0426'
'''

In [None]:
class Test4:
    parser = Parser4()
    def get_original(self):
        return '''
def original():
    a = 1
    b = a
    c = a - b
    if c < a:
        while a == b:
            a += 1
        return d
    return ''
'''

    def get_minimized(self):
        return '''
d
'''

In [None]:
class Test5:
    parser = Parser5()

    def get_original(self):
        return '''
def original():
    a = 1
    b = 0
    while True:
        if a < b:
            return [1, 2, 3]
        else:
            return []
'''

    def get_minimized(self):
        return '''
[]
'''

## Testing Framework

In [None]:
class NodeCounter(ast.NodeVisitor):
    """
    This node counter is used to assess the amount of reductions performed by your reducer. It counts the number of nodes in the AST
    """

    def __init__(self) -> None:
        self.num_nodes = 0

    def visit(self, node):
        self.num_nodes += 1
        self.generic_visit(node)

    def count(self, source):
        tree = ast.parse(source=source)
        self.visit(tree)
        return self.num_nodes

In [None]:
class TestingFramework:
    THRESHOLD = 3
    test_cases = {
        'test1': Test1(),
        'test2': Test2(),
        'test3': Test3(),
        'test4': Test4(),
        'test5': Test5()
    }
    def __init__(self, reducer):
        self.reducer = reducer

    def count_nodes(self, source):
        if source is None:
            return 100000
        return NodeCounter().count(source)

    def run_test(self, test):
        """
        run a single test
        """
        print(f'Running test {test.__class__.__name__}')
        reducer = self.reducer(test.parser)
        reduced_code = reducer.minimize(test.get_original())
        return self.has_property(reduced_code, test.parser) and self.is_minimized(reduced_code, test.get_minimized())

    def run_tests(self):
        """
        run all public tests
        """
        passed_tests = 0
        for test in self.test_cases.values():
            success = self.run_test(test)
            if success:
               passed_tests += 1
        print(f"In total {passed_tests} tests passed")

    def has_property(self, source, parser):
        """returns True if the parser fails to parse the source"""
        try:
            parser.parse(source)
            print(f'HAS PROPERTY: FAIL')
            return False
        except ParserException:
            print(f'HAS PROPERTY: OK')
            return True
        except Exception as e:
            print(f'HAS PROPERTY: FAIL {e}')
            return False

    def is_minimized(self, reduced, reference):
        """returns True if the AST of the reduced code contains no more then the number of nodes in the reference + a THRESHOLD"""
        count_minimized = self.count_nodes(reduced)
        count_reference = self.count_nodes(reference)
        if count_minimized <= count_reference + self.THRESHOLD:
            print(f'IS MINIMIZED: OK')
            return True
        else:
            print(f'IS MINIMIZED: FAIL')
            return False

### Delta Debuggin Algorithm

In [None]:
def ddmin(test, inp, *test_args):
    """Reduce the input inp, using the outcome of test(fun, inp)."""
    assert test(inp, *test_args) != PASS

    n = 2     # Initial granularity
    while len(inp) >= 2:
        start = 0
        subset_length = len(inp) / n
        some_complement_is_failing = False

        while start < len(inp):
            complement = inp[:int(start)] + \
                inp[int(start + subset_length):]

            if test(complement, *test_args) == FAIL:
                inp = complement
                n = max(n - 1, 2)
                some_complement_is_failing = True
                break

            start += subset_length

        if not some_complement_is_failing:
            if n == len(inp):
                break
            n = min(n * 2, len(inp))

    return inp

To see how `ddmin()` works, let us run it on our failing input. We need to define a `test` function that returns PASS or FAIL, depending on the test outcome. This `generic_test()` assumes that the function fails if it raises an exception (such as an `AssertException`), and passes otherwise. The optional argument `expected_exc` specifies the name of exception to be checked for; this ensures we reduce only for the kind of error raised in the original failure.

In [None]:
def generic_test(inp, fun, expected_exc=None):
    result = None
    detail = ""
    try:
        result = fun(inp)
        outcome = PASS
    except Exception as exc:
        detail = f" ({type(exc).__name__}: {str(exc)})"
        if expected_exc is None:
            outcome = FAIL
        elif type(exc) == type(expected_exc) and str(exc) == str(expected_exc):
            outcome = FAIL
        else:
            outcome = UNRESOLVED

    print(f"{fun.__name__}({repr(inp)}): {outcome}{detail}")
    return outcome

# Begin of my Code
Implementing the HDD Approach, which is shown in a Paper of the Background Section

In [None]:
class MyHDDDebugger(CachingCallReducer):
    def __init__(self, parser):
        """
        We initialize the DebuggingReducer with a parser, as this is needed to verify whether the failure is still triggered after the code transformation
        """
        self.parser = parser
        self.node_reducer = NodeReducer()
        self.node_marker = NodeMarker()
        self.node_collector = NodeCollector()
        
    def compute_transformations(self,tree):
        """
        Realizes Deletion Template of Algorithm 1
        """
        # Find current candidates to which shall be substituted
        candidates = list()
        for child in ast.iter_child_nodes(tree):
            for subchild in ast.iter_child_nodes(child):
                candidates.append(subchild)
        return candidates
    
    def generalized_tree_reduction(self,tree,oracle,templates):
        """
        GTR Algorithm, Algorithm 2
        """
        for template in templates:
            self.apply_template(tree, oracle, template)
        
        return tree
        
    
    def apply_template(self,tree, oracle, template):
        """
        Part of Algorithm 2
        """
        
        #TODO: call algorithm 3
        return tree
    
    def reduce_level_nodes(self, level_nodes, oracle, templates):
        """
        Algorithm 3
        """
        pass
    
    def get_tree_depth(self,tree):
        current_depth = 0
        return current_depth
    
    def get_tree_size(self,tree):
        """
        In the notion of the GTR Paper, the size of a tree is the number of its nodes
        """
        nodes = self.node_collector.collect(tree)
        num_nodes = len(nodes)
        return num_nodes
    
    def GTR_rec(self, tree, oracle, templates):
        """
        Recursive Application of Generalized Tree Reduction (GTR) Algorithm, Algorithm 4
        """
        initial_transformations = compute_transformations(tree)
        self.node_collector.collect(code_ast)
        
        current_tree = tree.copy()
        current_tree_size = self.get_tree_size(current_tree)
        
        next_tree = self.generalized_tree_reduction(tree, generic_test, initial_transformations)
        next_tree_size = self.get_tree_size(next_tree)
        
        while(current_tree_size != next_tree_size):
            current_tree = next_tree
            next_tree = self.compute_transformations(current_tree)
        
        return next_tree
            
    
    def copy_and_reduce(self, tree, keep_list):
        """Copy tree, reducing all nodes that are not in keep_list."""

        # Mark all nodes except those in keep_list
        self.node_marker.visit(tree)
        for node in keep_list:
            # print("Clearing", node)
            node.marked = False

        # Copy tree and delete marked nodes
        new_tree = copy.deepcopy(tree)
        self.node_reducer.visit(new_tree)
        ast.fix_missing_locations(new_tree)
        return new_tree
    
    def minimize(self, code):
        """
        This function takes some Python code as string, reduces it using the NodeReducer defined earlier and returns the reduced program.
        """
        # Parse the code to a tree
        code_ast = ast.parse(source=code)
                
        show_ast(code_ast)
        
        #Collect all nodes in our current tree
        fun_nodes = self.node_collector.collect(code_ast)
        
        #Test the substitution of each candidate by creating the matching keep lists
        keep_list = fun_nodes
        
        candidate_keep_lists = list()
        print('\nCandidates to substitute: ')
        
        candidates = self.compute_transformations(code_ast)
        
        for candidate in candidates:
            print(candidate)
            current_keep_list = keep_list.copy()
            current_keep_list.remove(candidate)
            candidate_keep_lists.append(current_keep_list)
            
        #TODO: Create right Keep-List
        for keeper in candidate_keep_lists:
            print("############")
            for node in keeper:
                print(node)

        # Use the copy and reduce method to apply one minimization step
        new_code_ast = self.copy_and_reduce(code_ast, keep_list);

        # Generate code from the reduced tree
        new_code = astor.to_source(new_code_ast)

        # Test, whether the error is still triggered by the reduced code
        try:
            self.parser.parse(new_code)
            # No exception is thrown. This means the new_code does not 
            # trigger an error anymore. Therefore, we failed in
            # reduction and return the initial code.
            return code
        except ParserException:
            # The error is still triggered. Return the reduced code
            return new_code


In [None]:
tf = TestingFramework(MyHDDDebugger)
tf.run_tests()