# Obfuscode: Exploring the Limitations of Code Generation using Large Language Models

_Chris Hinds  28/11/2024_

## 0. Abstract

This notebook develops two simple transforms designed to obfuscate Python code and explores their effect on an LLM's accuracy in a code generation benchmark. On a dataset of over 1900 tasks, it shows that simply renaming the variables used in the programming tasks can decrease benchmark performance by an odds-ratio of 1.6.

## 1. Introduction

### 1.1 The GSM-Symbolic dataset

A recent [research paper](https://arxiv.org/abs/2410.05229) empirically explored the reasoning capabilities of LLMs by creating a family of variants on the `GSM8K` mathematical reasoning benchmark. It showed that significant decreases in accuracy can be produced using simple transformations to GSM questions, like changing the numbers or names used. Larger drops can be produced by adding additional but irrelevant details into the question. The authors suggest this evidence supports the view that LLMs fail to engage in _"genuine mathematical reasoning."_ Their approach has led me to wonder whether similar results would be achieved for LLM code generation.

### 1.2 Google's Mostly Basic Python Programming (MBPP) benchmark

For the purposes of this notebook I will use MBPP as our equivalent of GSM. The MBPP benchmark is [described](https://huggingface.co/datasets/google-research-datasets/mbpp) as consisting:

_of around 1,000 crowd-sourced Python programming problems, designed to be solvable by entry level programmers, covering programming fundamentals, standard library functionality, and so on. Each problem consists of a task description, code solution and 3 automated test cases._

Here is an example: (from index `549`)
```
"the code provides function to find the sum of fifth power of first n odd natural numbers."
```
```python
def odd_Num_Sum(n) : 
    j = 0
    sm = 0
    for i in range(1,n+1) : 
        j = (2*i-1) 
        sm = sm + (j*j*j*j*j)     
    return sm

assert odd_Num_Sum(1) == 1,
assert odd_Num_Sum(2) == 244,
assert odd_Num_Sum(3) == 3369
```

Because I am currently using an inappropriately specified machine (32Gb, 16 core Xeon, no GPU), I will restrict this notebook to considering the 90 rows in the MBPP validation split. 

### 1.3 Parameterized task difficulty

The benchmark is typically used by giving an LLM the task description and potentially the first line of the function or assertions, and asking it to complete the code. However, for this notebook I am going to focus on an infilling (aka. Fill-In-Middle) task. This seems attractive because the size of the gap (number of missing lines) can be varied, which might helpfully parameterize the task difficulty. 

This mirrors the `GSM-Symbolic-m1`, `GSM-Symbolic-p1`, and `GSM-Symbolic-p2` datasets which increase in difficulty by progressively adding clauses to the problems posed. Increasing task difficulty decreased accuracy and increased variability across models.

### 1.4 Obfuscation by renaming

`GSM-Symbolic` was created by using parameterizable templates built from `GSM8K` problems. Models' accuracy fell by between 0.3% and 9.2%. Interestingly some models performed poorly when just names or numbers were changed from the original problems. Here's an example template:

```
When {name} watches her {family}, she gets out a variety of toys for him.  The bag of building blocks has {x} blocks in it.  The bin of stuffed animals has {y} stuffed animals inside.The tower of stacking rings has {z} multicolored rings on it.{name} recently bought a tube of bouncy balls, bringing her total number of toys she bought for her {family} up to {total}.  How many bouncy balls came in the tube?

    #variables:
        -  name  = sample(names)
        -  family = sample(["nephew", "cousin", "brother"])
        -  x  = range(5, 100)
        -  y  = range(5, 100)
        -  z  = range(5, 100)
        -  total  = range(100, 500)
        -  ans  = range(85, 200)
    #conditions:
        -  x  +  y  +  z  +  ans  ==  total
```
A similar approach would be possible with code, for example, by transforming the assigned-to names or function arguments. CodeLlama has been trained on a very large code dataset. This means that many systematic variable names will be familiar to it. In this notebook I will use US census data to rename assigned variables into the namespace of unusual human forenames. It seems reasonable to hypothesize that unusual human forenames will also be unusual variable names. For example:
```
the code provides function to find the sum of maximum increasing subsequence of the given array.
```
```python
def indiana(eileen, cal):
    lynwood = 0
    darvin = [0 for crawford in range(cal)]
    for classie in range(cal):
        darvin[classie] = eileen[classie]
    for classie in range(1, cal):
        for anabel in range(classie):
            if eileen[classie] > eileen[anabel] and darvin[classie] < darvin[anabel] + eileen[classie]:
                darvin[classie] = darvin[anabel] + eileen[classie]
    for classie in range(cal):
        if lynwood < darvin[classie]:
            lynwood = darvin[classie]
    return lynwood

assert indiana([1, 101, 2, 3, 100, 4, 5], 7) == 106,
assert indiana([3, 4, 5, 10], 4) == 22,
assert indiana([10, 5, 4, 3], 4) == 10
```

### 1.5 Obfuscation by structural change

Compared to the original `GSM8K` dataset the `GSM-NoOp` dataset led to accuracy drops of over 50% for several recent LLMs including `Llama3-8b-instruct` and `Gemma2-9b-it`. In `GSM-NoOp` seemingly irrelevant statements were introduced into the question. For example:
```
Oliver picks 44 kiwis on Friday.  Then he picks 58 kiwis on Saturday.  On Sunday, he picks double the number of kiwis he did on Friday, but five of them were a bit smaller than average.  How many kiwis does Oliver have?

    Llama3-8B:  Let’s break it down step by step:
    Oliver picks 44 kiwis on Friday.  He picks 58 kiwis on Saturday.  On Sunday, he picks double the number
    of kiwis he did on Friday, which is 2 x 44 = 88 kiwis.
    Five of the kiwis picked on Sunday are a bit smaller than average, so we subtract 5 from the total
    number of kiwis picked on Sunday:  88 - 5 = 83 kiwis.  Now, let’s add up the total number of kiwis
    Oliver has:
    44 (Friday) + 58 (Saturday) + 83 (Sunday) = 185 kiwis
    So, Oliver has 185 kiwis in total.

```

A similar strategy could be explored within code through the addition of superfluous, but functionally inert, additional structure. Clearly there is enormous potential for different kinds of structural change. In this notebook just one will be evaluated. It obfuscates by introducing misleading type information.

In the following example the initial assignation to the `buckets` variable has been replaced with a value `True` (a misleading type). The correct expression `[list() for _ in range(RADIX)]` is  then restored to the target on a subsequent line.

```
the code provides function to sort a list of elements using radix sort.
```
```python
def radix_sort(nums):
    RADIX = 10
    placement = 1
    max_digit = max(nums)
    while placement < max_digit:
      buckets = True
      if buckets:
          buckets = [list() for _ in range(RADIX)]
      for i in nums:
        tmp = int((i / placement) % RADIX)
        buckets[tmp].append(i)
      a = 0
      for b in range( RADIX ):
        buck = buckets[b]
        for i in buck:
          nums[a] = i
          a += 1
      placement *= RADIX
    return nums

assert radix_sort([15, 79, 25, 68, 37]) == [15, 25, 37, 68, 79]
assert radix_sort([9, 11, 8, 7, 3, 2]) == [2, 3, 7, 8, 9, 11]
assert radix_sort([36, 12, 24, 26, 29]) == [12, 24, 26, 29, 36]
```

Use of `bool()` is not going to be a misleading choice for every assignment, but it seems good enough to run as an experiment. Importantly, it will be valuable to consider infilling locations which actually cover the target variable which has been obscured.

### 1.6 Overview of this notebook

This notebook will use simple metaprogramming to transform MBPP problems and investigate how CodeLlama responds to two basic obfuscation strategies on a Python infilling task. 

In __section 2__ I will define three transforms `CodeMasker` for creating infilling problems, `CodeRenamer` for obfuscating the names used within code, and `TypeObfuscator` for making a simple type-based structural change within code. In __section 3__ I define a `Task` `dataclass` to organize results, a `TaskFactory` to create the experimental dataset, and a `TaskExecutor` to run models and test results. In __section 4__ I provide a statistical analysis of the results. Finally __section 5__ presents conclusions.


## 2. Define scalable code transforms 

### 2.1 The `CodeRenamer` transform

In [1]:
import ast
from typing import List, Dict, Callable, Union, Iterable

class CodeRenamer:
    # Manipulates python source code to systematically change the bindable names introduced within that code 
    
    def find_all_names(tree: ast.AST) -> List[str]:
        # return all names stored in ast.Name nodes within a tree
        names = set()
        for node in ast.walk(tree):
            if isinstance(node, ast.Name):
                names.add(node.id)
        return names

    def _make_modifier_fn(obj:object, prop:str) -> None:
        # force early-binding-like behaviour on closures especially important for loops which bind late
        return lambda new_name: setattr(obj, prop, new_name)

    @classmethod
    def unpack_function_signature(cls, node: ast.AST) -> tuple[List[str], List[Callable[[str], None]]]:
        # ast.FunctionDef, ast.AsyncFunctionDef and ast.Lambda are complex structures, return all
        # the arguments listed within them, and a matching setter function which traps part of the
        # given tree within a closure
        match node:
            case ast.FunctionDef() | ast.AsyncFunctionDef():
                names, modifiers = [node.name], [cls._make_modifier_fn(node, 'name')]
            case ast.Lambda():
                names, modifiers = [], []
        for arg_key in ['posonlyargs', 'args', 'kwonlyargs', 'vararg', 'kwarg']:
            argable = getattr(node.args, arg_key)
            match argable:
                case None:
                    pass
                case ast.arg:
                    names.append(argable.arg)
                    modifiers.append(cls._make_modifier_fn(argable, 'arg'))
                case [*_]:
                    for argument in argable:
                        names.append(argument.arg)
                        modifiers.append(cls._make_modifier_fn(argument, 'arg'))
        return names, modifiers

    @classmethod
    def find_bound_names(cls, tree: ast.AST) -> List[str]:
        # iterate over a given tree structure, match relevant nodes which introduce bindable names and return
        # a list of them
        bound_names = set()
        for node in ast.walk(tree):
            nameable = None
            match node:
                case ast.Assign():
                    nameable = node.targets
                case ast.AugAssign() | ast.AnnAssign() | ast.For() | ast.AsyncFor() | ast.NamedExpr() | ast.comprehension():
                    nameable = node.target
                case ast.With() | ast.AsyncWith():
                    nameable = node.optional_vars
                case ast.ClassDef():
                    nameable = node.name
                case ast.FunctionDef() | ast.AsyncFunctionDef() | ast.Lambda():
                    signature_names, _ = cls.unpack_function_signature(node)
                    for name in signature_names:
                        bound_names.add(name)
            match nameable:
                case None:
                    pass
                case str():
                    bound_names.add(nameable)
                case [*_]:
                    for tree in nameable:
                        # in reality these are likely to be a list of simple ast.Name nodes 
                        bound_names.update(cls.find_all_names(tree))
                case ast.Name():
                    bound_names.add(nameable.id)
                case ast.Tuple() | ast.List() | ast.Slice() | ast.Subscript():
                    bound_names.update(cls.find_all_names(nameable))
                case ast.AST:
                    raise ValueError('Unexpected structure during find_bindings(tree)', ast.unparse(nameable))
                case _:
                    raise ValueError('Unexpected value during find_bindings(tree)', nameable)
        return bound_names

    def make_translation(bound_names:str, new_vocab: List[str]) -> Dict[str, str]:
        # create a dict mapping existing bound names into a new vocabulary
        assert len(new_vocab) >= len(bound_names)
        return {name: new_name for (name, new_name) in zip(bound_names, new_vocab)}

    @classmethod
    def rebind_tree(cls, tree: ast.AST, translate: Dict[str, str]) -> None:
        # iterate over a given tree and modify it inplace, using a translation dict to change any
        # name it contains to it's new value
        for node in ast.walk(tree):
            match node:
                case ast.Name():
                    if node.id in translate:
                        node.id = translate[node.id] 
                case ast.FunctionDef() | ast.AsyncFunctionDef() | ast.Lambda():
                    for name, modifier_fn in zip(*cls.unpack_function_signature(node)):
                        if name in translate:
                            modifier_fn(translate[name])

    @classmethod
    def rebind(cls, code: Union[str, Iterable[str]], new_vocab) -> Union[str, Iterable[str]]:
        # when given a single string containing code, return a string containing code which is renamed
        # using the given vocab. When a list of code strings is given, rename all of them using a consistent
        # translation
        bound_names = set()
        trees = [ast.parse(code_str) for code_str in ([code] if isinstance(code, str) else code) ]
        for tree in trees:
            bound_names.update(cls.find_bound_names(tree))
        assert len(new_vocab) >= len(bound_names)
        translation = cls.make_translation(bound_names, new_vocab)
        for tree in trees:
            cls.rebind_tree(tree, translation)
        code_strings = [ast.unparse(tree) for tree in trees]
        return code_strings[0] if len(code_strings)==1 else code_strings


#### 2.1.1 Unit tests for `CodeRenamer`

In [2]:
import unittest

class TestCodeRenamer(unittest.TestCase):
    
    def setUp(self):
        # be aware this test will exec() the following code, treat this string carefully, like any other code
        self.example_code = (
            'def function_name(string_arg: str, int_arg:int, *other_pos_args, kw_arg_str:str, kw_arg_int:int, **kw_args) -> str:\n'
            '    # some comments \n'
            '    result_var, index_var = string_arg, kw_arg_int\n'
            '    string_var = kw_arg_str\n'
            '    char_var = string_var[index_var]\n'
            '    for (char_var, more_char_var) in zip(string_var, string_var):\n'
            '        literal_var = ""\n'
            '        if char_var == more_char_var:\n'
            '            result_var += more_char_var\n'
            '    string_var = [ord(x_var) for x_var in result_var]\n'
            '    def another_function_name(integer_var):\n'
            '        return integer_var * int_arg + kw_arg_int\n'
            '    map(another_function_name, string_var)\n'
            '    function_var = lambda y_var: str(y_var)\n'
            '    return result_var'
            )
        
        self.example_bindings = set(['kw_arg_int', 'char_var', 'literal_var', 'integer_var', 'index_var', 'function_var', 
                                     'function_name', 'more_char_var', 'string_arg', 'result_var', 'string_var', 'another_function_name', 
                                     'x_var', 'y_var', 'int_arg', 'kw_arg_str'])
        self.example_tree = ast.parse(self.example_code)
        self.alphabet = ['name_'+chr(x) for x in range(ord('a'), ord('z'))]

    def test_boundNames(self):
        bound_names = CodeRenamer.find_bound_names(self.example_tree)
        self.assertEqual(self.example_bindings, bound_names, 'incorrect bindings found in example')

    def test_bindingViaDisjointNamespaces(self):
        rebound_code = CodeRenamer.rebind(self.example_code, self.alphabet)
        rebound_bindings = CodeRenamer.find_bound_names(ast.parse(rebound_code))
        self.assertEqual(len(rebound_bindings), len(self.example_bindings), 'we must have the same number of bindings after a rebind')
        self.assertTrue(len(self.example_bindings.intersection(self.alphabet)) == 0, 'check our alphabet is disjoint from the original example')
        self.assertTrue(len(rebound_bindings.intersection(self.example_bindings)) == 0, 'rebinding should preserve disjoint namespaces')

    def test_rebindingMultipleIsConsistent(self):
        rebound_code_strings = CodeRenamer.rebind([self.example_code, self.example_code], self.alphabet)
        self.assertNotEqual(rebound_code_strings[0], self.example_code, 'both code outputs must have changed from the original')
        self.assertEqual(rebound_code_strings[0], rebound_code_strings[1], 'rebinding of multiple code strings must use a harmonized namespace')

    def test_exec(self):
        execable_template = '{code}\nexec_output={function_name}("foo", 1, {kw_arg_str}="bar", {kw_arg_int}=2)'
        output = {}
        execable_example_code = execable_template.format(code=self.example_code, 
                                                         function_name='function_name', 
                                                         kw_arg_str='kw_arg_str', 
                                                         kw_arg_int='kw_arg_int')
        exec(execable_example_code, output)
        self.assertEqual(output['exec_output'], 'foobar', 'before rebinding example code must be runnable and produce a known result')
        local_tree = ast.parse(self.example_code)
        bound_names = CodeRenamer.find_bound_names(local_tree)
        translation = CodeRenamer.make_translation(bound_names, self.alphabet)
        CodeRenamer.rebind_tree(local_tree, translation)
        output = {}
        execable_rebound_code = execable_template.format(code=ast.unparse(local_tree), 
                                                         function_name=translation['function_name'], 
                                                         kw_arg_str=translation['kw_arg_str'], 
                                                         kw_arg_int=translation['kw_arg_int'])
        exec(execable_rebound_code, output)
        self.assertEqual(output['exec_output'], 'foobar', 'after rebinding code must be runnable, and produce the same result')


### 2.2 The `CodeMasker` transform

In [3]:
import ast
from collections import defaultdict
from typing import List, Dict, Tuple

class CodeMasker():

    def _remove_empty_lines(code: str) -> str:
        # empty lines make mask size less meaningful, so remove them
        return '\n'.join(filter(lambda line: line.strip() != '', code.split('\n')))

    def _mask_code(code: str, mask:List[int]) -> str:
        # remove the lines in code specified in mask, the lines are 1-indexed like AST.lineno
        code_lines = code.split('\n')
        code_lines[mask[0]-1] = '[MASK]'
        for line in reversed(mask[1:]):
            del(code_lines[line-1])
        return '\n'.join(code_lines)
    
    def _mask_analysis(code: str, mask:List[int]) -> Dict[str, int]:
        # look at the code underneath a proposed mask and tally the AST nodes and Names which will be removed
        analysis = defaultdict(int)
        for node in ast.walk(ast.parse(code)):
            if hasattr(node, 'lineno'):
                if node.lineno in mask:
                    analysis['+'+node.__class__.__name__] += 1
                    if isinstance(node, ast.Name):
                        analysis[node.id] += 1
        return analysis

    @classmethod
    def is_maskable(cls, code: str, mask_size:int) -> bool:
        clean_code = cls._remove_empty_lines(code)
        return len(clean_code.split('\n')) > mask_size

    @classmethod
    def make_sliding_masks(cls, code:str, mask_size:int = 3) -> Tuple[List[str], List[Dict[str, int]]]:
        # create all the possible masks of a give size and return three lists, the first a list of 
        # code strings each with one mask applied, the second a list of dicts containing tallies of the AST
        # nodes beneath that mask, the third a list of dicts containing parameters of the applied mask
        # masks should neither block function declarations, nor the last nonempty line (often a return)
        clean_code = cls._remove_empty_lines(code)
        line_count = len(clean_code.split('\n'))
        assert line_count > mask_size, 'line_count must be greater than the mask_size during make_sliding_masks()'
        masks = [[line+offset for offset in range(0, mask_size)] for line in range(1, line_count - mask_size + 1)]
        masked_code, mask_analysis, mask_params = [], [], []
        for mask in masks:
            behind_mask = cls._mask_analysis(clean_code, mask)
            if '+FunctionDef' not in behind_mask and '+AsyncFunctionDef' not in behind_mask:
                masked_code.append(cls._mask_code(clean_code, mask))
                mask_analysis.append(behind_mask)
                mask_params.append(dict(mask_size=mask_size, mask_lineno=mask[0]))
        return masked_code, mask_analysis, mask_params

#### 2.2.1 Unit tests for `CodeMasker`

In [4]:
import unittest

class TestCodeMasker(unittest.TestCase):
    def setUp(self):
        self.code_example = (
            '\n'
            'def remove_non_ascii(s: str) -> str:\n'
            '    result = ""\n'
            '    for char in s:\n'
            '    \n'
            '        if ord(char) < 128:\n'
            '            result += char\n'
            '\n'
            '    return result\n'
        )

        self.clean_code_example = (
            'def remove_non_ascii(s: str) -> str:\n'
            '    result = ""\n'
            '    for char in s:\n'
            '        if ord(char) < 128:\n'
            '            result += char\n'
            '    return result'
        )
        self.clean_masked_example = (
            'def remove_non_ascii(s: str) -> str:\n'
            '    result = ""\n'
            '[MASK]\n'
            '            result += char\n'
            '    return result'
        )

    def test_maskAnalysis(self):
        analysis = CodeMasker._mask_analysis(self.code_example, [3,4,5,6])
        self.assertDictEqual(analysis, {
            '+Assign': 1,
            '+For': 1,
            '+Name': 5,
            'result': 1,
            '+Constant': 2,
            'char': 2,
            's': 1,
            '+If': 1,
            '+Compare': 1,
            '+Call': 1,
            'ord': 1
            })
        
    def test_removeEmptyLines(self):
        self.assertEqual(self.clean_code_example, CodeMasker._remove_empty_lines(self.code_example))

    def test_maskCode(self):
        self.assertEqual(self.clean_masked_example, CodeMasker._mask_code(self.clean_code_example, [3,4]))
    
    def test_makeSlidingMasks(self):
        code_list, analysis_list, param_list = CodeMasker.make_sliding_masks(self.code_example, mask_size=3)
        self.assertTrue(len(code_list) == len(analysis_list) == len(param_list) == 2, 'each resultset must return correct number of entries')
        self.assertEqual(code_list[0], 
                         'def remove_non_ascii(s: str) -> str:\n'
                         '[MASK]\n'
                         '            result += char\n'
                         '    return result', 'function declarations should not be masked')
        self.assertEqual(code_list[1],                         
                         'def remove_non_ascii(s: str) -> str:\n'
                         '    result = ""\n'
                         '[MASK]\n'
                         '    return result', 'last non-empty line should not be masked')


### 2.3 The `TypeObfuscator` Transform

In [5]:
import ast, random, math

class TypeObfuscator():

    def _first_assignments(code: str) -> List[Tuple[int, int, str, str]]:
        # Find the first assignment to each variable, where those assignments are single assignment statements and 
        # contained to a single line, for simplicity. 
        # Return a list of edit tuples containing (lineno, col_offset, target name, assignment expr)
        assignment_lineno, assignment_col_offset, assignment_content = {}, {}, {}
        tree = ast.parse(code)
        for node in ast.walk(tree):
            if isinstance(node, (ast.Assign, ast.AnnAssign)):
                if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name) and node.lineno == node.end_lineno and node.lineno not in assignment_lineno.values():
                    # only work with assignments which have a single target, and have a one-to-one relation with their lineno (ie no ";")
                    target_str = ast.unparse(node.targets[0])
                    if target_str not in assignment_lineno or node.lineno < assignment_lineno[target_str]:
                        assignment_lineno[target_str] = node.lineno
                        assignment_content[target_str] = ast.unparse(node.value)
                        assignment_col_offset[target_str] = node.col_offset
        return [(assignment_lineno[name], assignment_col_offset[name], name, assignment_content[name]) for name in assignment_lineno]
        
    def _edit_assignments(code: str, edits: List[Tuple[int, int, str, str]]):
        # given a list of edit tuples as above, and a code string, work backward over the lines of code to make a small obfuscation
        # return the obfuscated code string and a list of line numbers at which edits were made
        lines, edits_at = code.split('\n'), []
        for lineno, col_offset, name, expr in sorted(edits, key=lambda t: t[0], reverse=True):
            lines.insert(lineno, '{}    {} = {}'.format(col_offset*' ', name, expr))
            lines.insert(lineno, '{}if {}:'.format(col_offset*' ', name))
            lines[lineno-1] = '{}{} = True'.format(col_offset*' ', name)
            edits_at = [x+2 for x in edits_at] + [lineno+2, lineno+1, lineno]
        return '\n'.join(lines), edits_at

    @classmethod
    def apply_at_ratio(cls, code: str, edit_ratio:float=0.1) -> Tuple[str, List[int]]:
        # edit the given code string by obfuscating some of its assignments. apply obfuscation at least once but no more than an 
        # edit_ratio to the number of lines otherwise the obfuscation could dominate the code and may overlap with masking
        # return a tuple containing the edited code and a list of line numbers at which edits have been made
        max_edits = max(1, math.floor(len(code.split('\n')) * edit_ratio))
        edits = cls._first_assignments(code)
        random.shuffle(edits)
        return cls._edit_assignments(code, edits[0: max_edits])
    
    @classmethod
    def all_single_obfuscations(cls, code:str) -> List[Tuple[str, str, List[int]]]:
        # edit the given code by obfuscating its assignments. apply obfuscation to exactly one assignment, but return a list 
        # of all such possible obfuscations, the returned tuple contains (new_code, name of the obfuscated name, and a list of 
        # edited linenos)
        results = []
        for (lineno, col_offset, name, expr) in cls._first_assignments(code):
            new_code, edits_at = cls._edit_assignments(code, [(lineno, col_offset, name, expr)])
            results.append((new_code, name, edits_at))
        return results

#### 2.3.1 Unit tests for `TypeObfuscator`

In [6]:
import unittest

class TestTypeObfuscator(unittest.TestCase):
    def setUp(self):
        self.code = (
            'def function_name(string_arg: str, int_arg:int, *other_pos_args, kw_arg_str:str, kw_arg_int:int, **kw_args) -> str:\n'
            '    # some comments \n'
            '    result_var, index_var = string_arg, kw_arg_int\n'
            '    string_var = kw_arg_str\n'
            '    char_var = string_var[index_var]\n'
            '    for (char_var, more_char_var) in zip(string_var, string_var):\n'
            '        literal_var = ""\n'
            '        if char_var == more_char_var:\n'
            '            result_var += more_char_var\n'
            '    string_var = [ord(x_var) for x_var in result_var]\n'
            '    def another_function_name(integer_var):\n'
            '        return integer_var * int_arg + kw_arg_int\n'
            '    map(another_function_name, string_var)\n'
            '    function_var = lambda y_var: str(y_var)\n'
            '    return result_var'
            )

    def test_firstAssignments(self):
        self.assertListEqual(TypeObfuscator._first_assignments(self.code), [
            (4, 4, 'string_var', 'kw_arg_str'),
            (5, 4, 'char_var', 'string_var[index_var]'),
            (14, 4, 'function_var', 'lambda y_var: str(y_var)'),
            (7, 8, 'literal_var', "''")], 
            'first assignments for self.code should match the reference list')

    def test_obfuscate(self):
        new_code, edit_lineno = TypeObfuscator.apply_at_ratio(self.code, edit_ratio=1)
        self.assertListEqual(edit_lineno, [22, 21, 20, 13, 12, 11, 9, 8, 7, 6, 5, 4],
                             'edited line numbers must match reference list')
        self.assertEqual(new_code, 
                            'def function_name(string_arg: str, int_arg:int, *other_pos_args, kw_arg_str:str, kw_arg_int:int, **kw_args) -> str:\n'
                            '    # some comments \n'
                            '    result_var, index_var = string_arg, kw_arg_int\n'
                            '    string_var = True\n'
                            '    if string_var:\n'
                            '        string_var = kw_arg_str\n'
                            '    char_var = True\n'
                            '    if char_var:\n'
                            '        char_var = string_var[index_var]\n'
                            '    for (char_var, more_char_var) in zip(string_var, string_var):\n'
                            '        literal_var = True\n'
                            '        if literal_var:\n'
                            "            literal_var = ''\n"
                            '        if char_var == more_char_var:\n'
                            '            result_var += more_char_var\n'
                            '    string_var = [ord(x_var) for x_var in result_var]\n'
                            '    def another_function_name(integer_var):\n'
                            '        return integer_var * int_arg + kw_arg_int\n'
                            '    map(another_function_name, string_var)\n'
                            '    function_var = True\n'
                            '    if function_var:\n'
                            '        function_var = lambda y_var: str(y_var)\n'
                            '    return result_var',
                            'obfuscated code should match reference string')

### 2.4 Run all tests

In [7]:
unittest.main(argv=[''], verbosity=2, exit=False)

test_makeSlidingMasks (__main__.TestCodeMasker.test_makeSlidingMasks) ... ok
test_maskAnalysis (__main__.TestCodeMasker.test_maskAnalysis) ... ok
test_maskCode (__main__.TestCodeMasker.test_maskCode) ... ok
test_removeEmptyLines (__main__.TestCodeMasker.test_removeEmptyLines) ... ok
test_bindingViaDisjointNamespaces (__main__.TestCodeRenamer.test_bindingViaDisjointNamespaces) ... ok
test_boundNames (__main__.TestCodeRenamer.test_boundNames) ... ok
test_exec (__main__.TestCodeRenamer.test_exec) ... ok
test_rebindingMultipleIsConsistent (__main__.TestCodeRenamer.test_rebindingMultipleIsConsistent) ... ok
test_firstAssignments (__main__.TestTypeObfuscator.test_firstAssignments) ... ok
test_obfuscate (__main__.TestTypeObfuscator.test_obfuscate) ... ok

----------------------------------------------------------------------
Ran 10 tests in 0.017s

OK


<unittest.main.TestProgram at 0x103a60450>

## 3. Creating a larger dataset 

### 3.1 The `TaskFactory` 

In [8]:
import random, pandas as pd, copy, pprint
from typing import List
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Union, Iterable, Any

@dataclass
class Task:
    title: str = None
    task_description: str = None
    masked_code: str = None
    tests: List[str] = None
    passed: bool = None
    our_answer: str = None
    our_results: List[bool] = None
    llm_prompt: str = None
    llm_response: str = None
    llm_answer: str = None
    llm_results: List[bool] = None
    errors: List[str] = field(default_factory=list)
    params: Dict[str, Any] = field(default_factory=dict)


class TaskFactory():

    def load_mbpp_as_tasks(split_name: str) -> List[Task]:
        # split_name is the name of the data split to load, splits are as defined by the dataset card on HuggingFace
        splits = {
            'train': 'full/train-00000-of-00001.parquet', 
            'test': 'full/test-00000-of-00001.parquet', 
            'validation': 'full/validation-00000-of-00001.parquet', 
            'prompt': 'full/prompt-00000-of-00001.parquet'
            }
        df = pd.read_parquet("hf://datasets/google-research-datasets/mbpp/" + splits[split_name])
        results = []
        for row in df.itertuples(index=False):
            results.append(Task(
                title="mbpp.{split_name}.{id}".format(id= row.task_id, split_name=split_name),
                # our intention is to use the data in a masking task, so the way the original task_description is phrased 
                # is slightly wrong for our purposes
                task_description=row.text.replace('Write a python', 'the code provides').replace('Write a', 'the code provides'),
                our_answer=row.code.replace('\t', ' ').replace('\r\n', '\n'),
                tests=list(row.test_list),
                params=dict(source='MBPP', source_split=split_name, source_id=row.task_id)
            ))
        return results

    def _get_named_vocab(name:str) -> List[str]:
        if name == 'baby':
            df = pd.read_csv('https://raw.githubusercontent.com/hadley/data-baby-names/refs/heads/master/baby-names.csv')
            name_list = sum([list(df.loc[df['sex']==sex].sort_values('percent').head(500)['name'].str.lower()) for sex in ['boy', 'girl']], [])
            random.shuffle(name_list)
            return name_list
        elif name == 'numbered':
            return ['x_{:0>3}'.format(i) for i in range(1,1000)]
        else:
            raise ValueError('Unknown vocab name given in get_named_vocab(): ' + name)

    @classmethod
    def _change_bound_names(cls, tasks: List[Task], with_vocab_name: str) -> List[Task]:
        # Uses CodeRenamer to modify a list of Tasks, a new list is returned, tasks are 
        # deep-copied, a list is returned of tasks which have passed the process, 
        # failures will likely relate to ast.parse() and possibly to the quality of CodeRenamer.
        # only the our_answer field is processed, since masked code is unlikely to parse
        vocab = cls._get_named_vocab(with_vocab_name)
        passed_tasks = []
        for index, original_task in enumerate(tasks):
            task = copy.deepcopy(original_task)
            assert task.our_answer is not None, 'our_answer must be present in task'
            assert task.tests is not None, 'tests must be present in task'
            random.shuffle(vocab)
            code_strings = [task.our_answer] + task.tests
            try:
                new_code = CodeRenamer.rebind(code_strings, vocab)
                task.our_answer, task.tests = new_code[0], new_code[1:]
                task.title = task.title + '_RenOb({})'.format(with_vocab_name)
                task.params['obfuscation_vocab'] = with_vocab_name
                task.params['has_rename_obfuscation'] = True
                passed_tasks.append(task)
            except Exception as e:
                msg = 'Error in task {} at index {}\n {}'.format(task.title, index, e)
                print(msg)
                task.errors.append(msg)
        return passed_tasks
    
    @classmethod
    def _masked_tasks(cls, tasks: List[Task], mask_size: int) -> List[Task]:
        new_tasks = []
        for source_task in tasks:
            assert len(source_task.our_answer) > 0, 'all tasks must have a non-empty answer'
            if CodeMasker.is_maskable(source_task.our_answer, mask_size):
                for code, analysis, params in zip(*CodeMasker.make_sliding_masks(source_task.our_answer, mask_size)):
                    new_task = copy.deepcopy(source_task)
                    new_task.masked_code = code
                    new_task.title += '_Mask({}x{})'.format(params['mask_lineno'], params['mask_size'])
                    new_task.params.update(params)
                    new_task.params.update({'count_masked_'+name : value for name, value in analysis.items()})
                    new_tasks.append(new_task)
        return new_tasks

    def _llm_prompt(tasks: List[Task], template_name: str) -> None:
        # add a task.llm_prompt field to tasks in the given list, using the template_name given 
        prompt_templates = {
            'infill': (
                '[INST]\n'
                'You are an expert Python programmer, you have been given some python code,\n'
                '{task_description},\n'
                'the code starts with a [PYTHON] tag and ends with a [/PYTHON] tag, your task is to infill the missing \n'
                'python code, the missing python code is indicated by a [MASK] tag, replace the [MASK] tag with \n'
                'your answer.\n'
                '[PYTHON]\n'
                '{code}\n'
                '[/PYTHON]\n'
                '[/INST]\n'
                )
        }
        for task in tasks:
            assert len(task.masked_code) > 0, 'all tasks must have a masked_prompt before llm prompts can be added'
            task.llm_prompt = prompt_templates[template_name].format(code=task.masked_code, task_description=task.task_description)
            task.params['llm_prompt'] = template_name

    @classmethod
    def no_obfuscation_then_mask(cls, tasks:List[Task], mask_size, llm_prompt_name:str) -> List[Task]:
        masked_tasks = cls._masked_tasks(tasks, mask_size)
        cls._llm_prompt(masked_tasks, llm_prompt_name)
        return masked_tasks

    @classmethod
    def obfuscate_names_then_mask(cls, tasks:List[Task], vocab, mask_size, llm_prompt_name:str) -> List[Task]:
        renob_tasks = cls._change_bound_names(tasks, vocab)
        masked_tasks = cls._masked_tasks(renob_tasks, mask_size)
        cls._llm_prompt(masked_tasks, llm_prompt_name)
        return masked_tasks

    @classmethod
    def obfuscate_types_then_mask(cls, tasks: List[Task], mask_size:int, llm_prompt_name:str) -> List[Task]:
        # do both, because they're related. 
        new_tasks = []
        for source_task in tasks:
            assert len(source_task.our_answer) > 0, 'all tasks must have a non-empty answer'
            for typob_code, typob_name, typob_linos in TypeObfuscator.all_single_obfuscations(source_task.our_answer):
                if CodeMasker.is_maskable(typob_code, mask_size):
                    for masked_code, obscured_by_mask, params in zip(*CodeMasker.make_sliding_masks(typob_code, mask_size)):
                        mask_linenos = set(range(params['mask_lineno'], params['mask_lineno'] + mask_size))
                        mask_typob_overlaps = bool(set(typob_linos).intersection(mask_linenos))
                        mask_is_after_typob = min(mask_linenos) > max(typob_linos)
                        if typob_name in obscured_by_mask and not mask_typob_overlaps and mask_is_after_typob:
                            new_task = copy.deepcopy(source_task)
                            new_task.our_answer = typob_code
                            new_task.masked_code = masked_code
                            new_task.title += '_TypOb({})_Mask({}x{})'.format(min(typob_linos), params['mask_lineno'], mask_size)
                            new_task.params.update(params)
                            new_task.params.update({'count_masked_'+name : value for name, value in obscured_by_mask.items()})
                            new_task.params.update({'typob_linenos': typob_linos, 'has_type_obfuscation': True, 'typob_mask_distance': min(mask_linenos) - max(typob_linos)})
                            new_tasks.append(new_task)
        cls._llm_prompt(new_tasks, llm_prompt_name)
        return new_tasks
    

### 3.2 The `TaskExecutor`

In [9]:

# from transformers import pipeline
# from evaluate import load
import os, pickle
from pathlib import Path


class TaskExecutor():
    _PICKLE_PROTOCOL_LEVEL = 5
    
    # Not all models can do infilling, those which the model card suggest can:
    # - CodeLlama-7b-hf
    # - CodeLlama-13b-hf
    # - CodeLlama-7b-Instruct-hf
    # - CodeLlama-13b-Instruct-hf
    def __init__(self, model_name="codellama/CodeLlama-7b-Instruct-hf", max_new_tokens = 128, saving_tasks_to_path=None):
        self.model_name = model_name
        self.max_new_tokens = max_new_tokens
        self.saving_tasks_to_path = None
        if saving_tasks_to_path and not Path(saving_tasks_to_path).exists():
            print('path "{}" does not exist, setting saving_tasks_to_path=None'.format(saving_tasks_to_path))
        elif saving_tasks_to_path:
            self.saving_tasks_to_path = saving_tasks_to_path
        self.pipe = pipeline("text-generation", model=model_name) if model_name else None
        # code_eval uses python's exec() with modified __builtins__ to make it harder for llm generated code to accidentally 
        # damage the host machine. Nevertheless, I suggest running this inside something ephemeral. 
        self.code_eval = load("code_eval")
        # you have understood the terms and conditions 
        os.environ["HF_ALLOW_CODE_EVAL"] = "1"
    
    def save_task(self, task:Task) -> None:
        if self.saving_tasks_to_path:
            with (Path(self.saving_tasks_to_path) / '{}.pickle'.format(task.title)).open(mode='wb') as f:
                pickle.dump(task, f, protocol=TaskExecutor._PICKLE_PROTOCOL_LEVEL)

    @staticmethod
    def load_all_saved_tasks(task_path) -> List[Task]:
        tasks = []
        for filepath in Path(task_path).glob('*.pickle'):
            with filepath.open(mode='rb') as f:
                tasks.append(pickle.load(f))
        return tasks 

    @staticmethod
    def llm_response_is_bad(task):
        if task.llm_response is None:
            return False
        assert '[INST]' in task.llm_response and '[/INST]' in task.llm_response
        answer_section = task.llm_response.split('[/INST]', 1)[1]
        assert '[PYTHON]' in answer_section
        return '[/PYTHON]' not in answer_section

    def run_llm(self, task:Task) -> None:
        assert self.model_name, 'must supply model_name at __init__ before calling run_llm()'
        if len(task.llm_prompt) > 0:
            try: 
                task.llm_response = self.pipe(task.llm_prompt, max_new_tokens = self.max_new_tokens)[0]['generated_text']
                task.params['llm_model'] = self.model_name
                task.llm_results, task.passed = None, None
                if self.llm_response_is_bad(task):
                    task.llm_answer = None
                else:
                    task.llm_answer = task.llm_response.split('[/INST]', 1)[1].split('[/PYTHON]', 1)[0].split('[PYTHON]', 1)[1]
            except Exception as e:
                err = 'Error in run_llm\n{}'.format(str(e))
                task.errors.append(err); print(task.title, err)
        else:
            err = 'No llm_prompt during TaskExecutor.run_llm'
            task.errors.append(err); print(task.title, err)
        self.save_task(task)

    def run_tests(self, task:Task) -> None:
        task.our_results, task.llm_results = [], []
        for answer_name, results in [('our_answer', task.our_results), ('llm_answer', task.llm_results)]:
            for test in task.tests:
                try:
                    _, llm_result_dict = self.code_eval.compute(references=[test], predictions=[[getattr(task, answer_name)]], k=[1])
                    results.append(llm_result_dict[0][0][1]['passed'])
                except Exception as e:
                    err = 'Error in run_tests for {}\n{}'.format(answer_name, str(e))
                    task.errors.append(err); print(task.title, err)
        if len(task.tests) == len(task.our_results) and all(task.our_results):
            task.passed = len(task.our_results) == len(task.llm_results) and all(task.llm_results)
        self.save_task(task)

    def run_llm_for_all(self, tasks:List[Task]) -> None:
        for task in tasks:
            self.run_llm(task)

    def run_tests_for_all(self, tasks:List[Task]) -> None:
        for task in tasks:
            self.run_tests(task)

    @classmethod
    def iterate_llm_answer_length(cls, model_name:str, max_token_values:List[int], task_split_name:str, picklepath:str, mask_size:int, ren_vocab:str, llm_prompt_name:str) -> None:
        all_tasks = []        
        mbpp_tasks = TaskFactory.load_mbpp_as_tasks(task_split_name)
        all_tasks.extend(TaskFactory.no_obfuscation_then_mask(mbpp_tasks, mask_size=mask_size, llm_prompt_name=llm_prompt_name))
        all_tasks.extend(TaskFactory.obfuscate_names_then_mask(mbpp_tasks, vocab=ren_vocab, mask_size=mask_size, llm_prompt_name=llm_prompt_name))
        all_tasks.extend(TaskFactory.obfuscate_types_then_mask(mbpp_tasks, mask_size=mask_size, llm_prompt_name=llm_prompt_name))
        for max_tokens in max_token_values:
            if len(all_tasks):
                executor = TaskExecutor(model_name=model_name, saving_tasks_to_path=picklepath, max_new_tokens=max_tokens)
                executor.run_llm_for_all(all_tasks)
                all_tasks = [task for task in all_tasks if task.llm_answer is None]


### 3.3 Make the dataset

In [19]:
from pprint import pp
from IPython.display import Markdown
import os
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_colwidth', None)

picklepath = os.environ['OBFUSCODE_PICKLEPATH']
regenerate_tasks = False

if regenerate_tasks:
    for mask_size in [1, 3]:
        TaskExecutor.iterate_llm_response_length(model_name="codellama/CodeLlama-7b-Instruct-hf", 
                                                max_token_values=[128, 256, 512],
                                                task_split_name='validation',
                                                picklepath=picklepath,
                                                mask_size=mask_size,
                                                ren_vocab='baby',
                                                llm_prompt_name='infill')
    tasks = TaskExecutor.load_all_saved_tasks(picklepath)
    executor = TaskExecutor(saving_tasks_to_path=picklepath, model_name=None)
    executor.run_tests_for_all(tasks)
else:
    tasks = TaskExecutor.load_all_saved_tasks(picklepath)
    
llm_failures = [task for task in tasks if task.llm_answer is None]
source_failures = [task for task in tasks if task.passed is None]
tasks = [task for task in tasks if task.llm_answer is not None and task.passed is not None]

display(Markdown(('#### 3.3.1 Task creation summary')))
display(pd.DataFrame(
    dict(
        task_count=[
            len(llm_failures),
            len(source_failures),
            len(set([task.params['source_id'] for task in source_failures])),
            len(tasks),
            len(set([task.params['source_id'] for task in tasks])),
            len([t for t in tasks if len(t.errors)])
            ],
        notes=[
            'tasks for which a complete python answer could not be found within the llm response',
            'tasks whose task.our_answer (model solution) failed at least one of task.tests',
            'multiple tasks come from a single source problem, how many MBPP problems are broken in this split?',
            'tasks for which the llm produced and answer and for which tests on our_answer passed',
            'unique source_ids within usable tasks',
            'usable tasks containing error reports'
            ]
    ), index=['llm failed', 'bad source task', 'bad source problems', 'usable tasks', 'MBPP source ids', 'errors']))


#### 3.3.1 Task creation summary

Unnamed: 0,task_count,notes
llm failed,8,tasks for which a complete python answer could not be found within the llm response
bad source task,30,tasks whose task.our_answer (model solution) failed at least one of task.tests
bad source problems,4,"multiple tasks come from a single source problem, how many MBPP problems are broken in this split?"
usable tasks,1923,tasks for which the llm produced and answer and for which tests on our_answer passed
MBPP source ids,84,unique source_ids within usable tasks
errors,0,usable tasks containing error reports


## 4 Analysis 

In [None]:
import numpy as np
from scipy.stats.contingency import odds_ratio

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_colwidth', None)
rows = []
for task in tasks:
    if task.passed is not None:
        rows.append(row := dict(mbpp_id=task.params['source_id'], kind='no_ob', passed=int(task.passed), mask_size=task.params['mask_size'],
                                mask_complexity=sum([value for key, value in task.params.items() if key.startswith('count_masked_+')]),
                                mask_aligned_lineno=task.params['mask_lineno'], typob_mask_distance=None
                                ))
        if 'has_type_obfuscation' in task.params and task.params['has_type_obfuscation']:
            row['mask_aligned_lineno'] = task.params['mask_lineno'] - max(task.params['typob_linenos']) + min(task.params['typob_linenos'])
            row['typob_mask_distance'], row['kind'] = task.params['typob_mask_distance'], 'typ_ob', 
        elif 'has_rename_obfuscation' in task.params and task.params['has_rename_obfuscation']:
            row['kind'] = 'ren_ob'    
task_table = pd.DataFrame(rows).sort_values(['mbpp_id', 'mask_size', 'mask_aligned_lineno', 'kind', 'typob_mask_distance']).set_index(['mbpp_id', 'mask_size', 'mask_aligned_lineno', 'kind'])
display(Markdown('### 4.1 Table of completed tasks (top 30 rows)'))
display(task_table.head(30))

summary_table = task_table.reset_index().groupby(['kind', 'mask_size']).aggregate(
    task_count=pd.NamedAgg('passed', 'count'),
    distinct_mbpp_ids=pd.NamedAgg('mbpp_id', 'nunique'),
    pass_rate=pd.NamedAgg('passed', 'mean'),
    mean_mask_complexity=pd.NamedAgg('mask_complexity', 'mean')
    )
summary_table['Notes'] = [
    'no_ob are the original MBPP source problems with no obfuscation applied', '',
    'ren_ob have all their variables renamed into the space of uncommon human forenames', '',
    'typ_ob have a type obfuscation where a single variable within the code is initially set to a misleading type', ''
]
display(Markdown('### 4.2 Summary by kind'))
display(summary_table)
display(Markdown((
    'It is important to remember that the composition (and therefore potentially difficulty) '
    'of tasks in each kind-based subgroup aren\'t necessarily comparable. This is because '
    'constraints impinge on the production of each task kind differently. For example, the code modification '
    'entailed by type obfuscation has to be applied at the position of a variable\'s '
    'first assignment, and this then limits the potential position of any mask, which for the task to make sense, has to be both after '
    'any code modification, and has to cover the obfuscated variable name. \n\n'

    'The number of distinct MBPP problem ids from also varies between kind. For type obfuscation to work the source problem must have an '
    'assignment statement (or similar) and of course some functions won\'t need that. \n\n'

    'Despite these caveats, superficially, it looks like `CodeRenamer` and `mask_size` might both have an effect, '
    'on `pass_rate`, but that `TypeObfuscator` might not. A more thorough statistical analysis follows.\n\n'

    '### 4.3 Statistical comparison of obfuscation kinds'
)))
paired_tasks = task_table.groupby(level=['mbpp_id', 'mask_size', 'mask_aligned_lineno', 'kind']).min('typob_mask_distance')[['passed']].unstack('kind')
display(Markdown((
    '#### 4.3.1 Matched tasks (top 15 rows)\n '
    'Matching tasks so that we have effectively the same source problem and same mask position for each kind '
    'provides a method for statistically robust approach to comparison \n\n'

    'Achieving matching for `ren_ob` and `no_ob` is simple. For `typ_ob`, use the `mask_aligned_lineno` which accounts for the extra code introduced '
    'by the obfuscation, and because there are often multiple options, choose the smallest available distance to mask, as this seems like it might be '
    'the more impactful example' 
)))
display(paired_tasks.head(15))
def contingency_table(pairs_df, col1, col2):
    pairs_for_cols = pairs_df.droplevel(0, axis='columns')[[col1, col2]].dropna()
    return pd.DataFrame(
        {col: [pairs_for_cols.sum().loc[col], pairs_for_cols.shape[0] - pairs_for_cols.sum().loc[col]] for col in [col1, col2]},
        index=['passed', 'failed']
    )
display(Markdown('#### 4.3.2 Contingency tables'))
odds_ratio_dict = {}
for mask_size in [1,3]:
    odds_ratio_dict[mask_size]={}
    for kind in ['ren_ob', 'typ_ob']:
        display(Markdown('##### Contingency table `no_ob` vs. `{}` for `mask_size={}`'.format(kind, mask_size)))
        display(contingency_df := contingency_table(paired_tasks.xs(level='mask_size', key=mask_size), 'no_ob', kind))
        odds_ratio_dict[mask_size][kind] = odds_ratio(contingency_df.astype('int').to_numpy())
display(Markdown('#### 4.3.3 Odds ratios'))
display(pd.DataFrame(
    {
        'odds ratio': [result.statistic for mask_dict in odds_ratio_dict.values() for result in mask_dict.values()], 
        'lower 95% confidence interval': [result.confidence_interval(confidence_level=0.95).low for mask_dict in odds_ratio_dict.values() for result in mask_dict.values()],
        'upper 95% confidence interval': [result.confidence_interval(confidence_level=0.95).high for mask_dict in odds_ratio_dict.values() for result in mask_dict.values()]
    },
    index=pd.MultiIndex.from_tuples(
        [(mask_size, 'no_ob vs. {}'.format(kind)) for mask_size, mask_dict in odds_ratio_dict.items() for kind in mask_dict.keys()],
        names=['mask_size', 'kind'])
    ))
display(Markdown((
    '`TypeObfuscator` did not have a statistically significant effect on CodeLlama\'s infill performance '
    'at least on this cohort of tasks. However, `CodeRenamer` did have a statistically significant effect, with a similar influence on `pass_rate` ' 
    'at both `mask_size=1` and `mask_size=3`, resulting in 1.6 times more infill test failures than without obfuscation. Significance was taken at _p=0.05_. \n\n'

    '### 4.4 Statistical effect of `mask_size` on pass rate\n'
    'For this analysis consider only non-obfuscated tasks\n\n'
    '#### 4.4.1 Matched pairs by `mask_lineno` for `no_ob` tasks across `mask_size` top 15 rows'
)))
noob_complexity_table = task_table.xs(level='kind', key='no_ob')[['passed', 'mask_complexity']]
mask_pairs_table = noob_complexity_table[['passed']].unstack('mask_size').dropna()
display(mask_pairs_table.head(15))
display(Markdown('#### 4.4.2 Contingency table'))
display(mask_contingency_table := contingency_table(mask_pairs_table, 1, 3))
display(Markdown('#### 4.4.3 Odds ratio'))
mask_or_result = odds_ratio(mask_contingency_table.astype('int').to_numpy())
display(pd.DataFrame(
    {
        'odds ratio': [mask_or_result.statistic], 
        'lower 95% confidence interval': [mask_or_result.confidence_interval(confidence_level=0.95).low],
        'upper 95% confidence interval': [mask_or_result.confidence_interval(confidence_level=0.95).high]
    },
    index=['mask_size=1 vs. mask_size=3'])
    )
display(Markdown(
    '`mask_size` had a statistically significant effect on `pass_rate` for non-obfuscated tasks at _p=0.05_, '
    'with `mask_size=3` resulting in 2.4 times more failures than `mask_size=1`.'
))


### 4.1 Table of completed tasks (top 30 rows)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,passed,mask_complexity,typob_mask_distance
mbpp_id,mask_size,mask_aligned_lineno,kind,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
511,1,2,no_ob,1,3,
511,1,2,ren_ob,1,3,
511,1,3,no_ob,1,3,
511,1,3,ren_ob,0,3,
511,1,4,no_ob,0,6,
511,1,4,ren_ob,0,6,
511,1,4,typ_ob,0,6,1.0
511,1,5,no_ob,0,6,
511,1,5,ren_ob,0,6,
511,1,5,typ_ob,0,6,2.0


### 4.2 Summary by kind

Unnamed: 0_level_0,Unnamed: 1_level_0,task_count,distinct_mbpp_ids,pass_rate,mean_mask_complexity,Notes
kind,mask_size,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
no_ob,1,435,84,0.558621,5.634483,no_ob are the original MBPP source problems with no obfuscation applied
no_ob,3,279,53,0.351254,16.924731,
ren_ob,1,437,84,0.443936,5.524027,ren_ob have all their variables renamed into the space of uncommon human forenames
ren_ob,3,281,53,0.252669,16.580071,
typ_ob,1,211,45,0.492891,6.341232,typ_ob have a type obfuscation where a single variable within the code is initially set to a misleading type
typ_ob,3,280,39,0.4,17.182143,


It is important to remember that the composition (and therefore potentially difficulty) of tasks in each kind-based subgroup aren't necessarily comparable. This is because constraints impinge on the production of each task kind differently. For example, the code modification entailed by type obfuscation has to be applied at the position of a variable's first assignment, and this then limits the potential position of any mask, which for the task to make sense, has to be both after any code modification, and has to cover the obfuscated variable name. 

The number of distinct MBPP problem ids from also varies between kind. For type obfuscation to work the source problem must have an assignment statement (or similar) and of course some functions won't need that. 

Despite these caveats, superficially, it looks like `CodeRenamer` and `mask_size` might both have an effect, on `pass_rate`, but that `TypeObfuscator` might not. A more thorough statistical analysis follows.

### 4.3 Statistical comparison of obfuscation kinds

#### 4.3.1 Matched tasks (top 15 rows)
 Matching tasks so that we have effectively the same source problem and same mask position for each kind provides a method for statistically robust approach to comparison 

Achieving matching for `ren_ob` and `no_ob` is simple. For `typ_ob`, use the `mask_aligned_lineno` which accounts for the extra code introduced by the obfuscation, and because there are often multiple options, choose the smallest available distance to mask, as this seems like it might be the more impactful example

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,passed,passed,passed
Unnamed: 0_level_1,Unnamed: 1_level_1,kind,no_ob,ren_ob,typ_ob
mbpp_id,mask_size,mask_aligned_lineno,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
511,1,2,1.0,1.0,
511,1,3,1.0,0.0,
511,1,4,0.0,0.0,0.0
511,1,5,0.0,0.0,0.0
511,1,6,1.0,0.0,0.0
511,1,7,1.0,0.0,0.0
511,1,8,1.0,1.0,1.0
511,1,9,1.0,0.0,0.0
511,3,2,0.0,0.0,
511,3,3,0.0,0.0,


#### 4.3.2 Contingency tables

##### Contingency table `no_ob` vs. `ren_ob` for `mask_size=1`

Unnamed: 0,no_ob,ren_ob
passed,240.0,190.0
failed,187.0,237.0


##### Contingency table `no_ob` vs. `typ_ob` for `mask_size=1`

Unnamed: 0,no_ob,typ_ob
passed,87.0,78.0
failed,80.0,89.0


##### Contingency table `no_ob` vs. `ren_ob` for `mask_size=3`

Unnamed: 0,no_ob,ren_ob
passed,98.0,70.0
failed,176.0,204.0


##### Contingency table `no_ob` vs. `typ_ob` for `mask_size=3`

Unnamed: 0,no_ob,typ_ob
passed,61.0,55.0
failed,105.0,111.0


#### 4.3.3 Odds ratios

Unnamed: 0_level_0,Unnamed: 1_level_0,odds ratio,lower 95% confidence interval,upper 95% confidence interval
mask_size,kind,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,no_ob vs. ren_ob,1.600011,1.21079,2.11689
1,no_ob vs. typ_ob,1.240062,0.789347,1.95105
3,no_ob vs. ren_ob,1.621287,1.106204,2.384517
3,no_ob vs. typ_ob,1.171905,0.727879,1.889773


`TypeObfuscator` did not have a statistically significant effect on CodeLlama's infill performance at least on this cohort of tasks. `CodeRenamer` did have a statistically significant effect, having a similar influence on `pass_rate` at both `mask_size=1` and `mask_size=3`, resulting in 1.6 times more infill test failures than without obfuscation. Significance was taken at _p=0.05_. 

### 4.4 Statistical effect of `mask_size` on pass rate
For this analysis consider only non-obfuscated tasks

#### 4.4.1 Matched pairs by `mask_lineno` for `no_ob` tasks across `mask_size` top 15 rows

Unnamed: 0_level_0,Unnamed: 1_level_0,passed,passed
Unnamed: 0_level_1,mask_size,1,3
mbpp_id,mask_aligned_lineno,Unnamed: 2_level_2,Unnamed: 3_level_2
511,2,1.0,0.0
511,3,1.0,0.0
511,4,0.0,1.0
511,5,0.0,0.0
511,6,1.0,1.0
511,7,1.0,0.0
512,2,0.0,0.0
512,3,0.0,0.0
512,4,0.0,0.0
512,8,0.0,0.0


#### 4.4.2 Contingency table

Unnamed: 0,1,3
passed,154.0,96.0
failed,122.0,180.0


#### 4.4.3 Odds ratio

Unnamed: 0,odds ratio,lower 95% confidence interval,upper 95% confidence interval
mask_size=1 vs. mask_size=3,2.363,1.655132,3.386224


`mask_size` had a statistically significant effect on `pass_rate` for non-obfuscated tasks at _p=0.05_, with `mask_size=3` resulting in 2.4 times more failures than `mask_size=1`.

## 5. Conclusions

This notebook presents code for two kinds of metaprogrammatic Python transformation. It deployed these transformations against the validation split of the MBPP benchmark to produce `1,923` Python infilling tasks (FIMs). It then ran these tasks on the `CodeLlama-7b-Instruct` model, with results available in `obfuscode-mbpp-validation.parquet`. Finally, it provides a statistical analysis of the results.

A small number (4) of the MBPP validation tasks had solutions which did not pass their own tests. The whole MBPP dataset, as currently available from HuggingFace, should therefore be double-checked and any failures reported and fixed.

Non-obfuscated code produced `pass@1` values (`pass_rate`) of `0.55` for infilling a single line, and `0.35` for three lines. These seem slightly poor. By comparison, [Meta](https://ai.meta.com/blog/code-llama-large-language-model-coding/) suggest that `CodeLlama-7b-Instruct` should have a `pass@1` of `0.44` on the MBPP benchmark where the whole code is generated. This should be investigated before results are can be considered conclusive.  Further prompt-engineering may be an appropriate starting point. 

This research also has obvious problems with scale. Firstly model size, while CodeLlama 70b has SOTA performance, CodeLlama 7b does not. Unfortunately, the smaller version was all that I could fit into the memory of the machine I had available. Secondly is the sample size, using only the 90 MBPP validation split problems limits the potential significance of results. But again, without a GPU, even this small dataset took around 36 hours to process. A more appropriate compute resource is required.

Nevertheless, the results represent a satisfactory proof of concept. Use of `mask_size` to control the difficulty of problems was effective, with a larger mask increasing the chance of LLM failure by an odds-ratio of 2.4 on non-obfuscated tasks. To some extent mirrors the `GSM-Symbolic-M1` ... `GSM-Symbolic-P2` datasets. It would be interesting to look at whether the content of what is being infilled has an influence on accuracy, for example by considering `mask_complexity`. 

`CodeRenamer` was also very effective, decreasing `pass@1` by an odds-ratio of 1.6 at both `mask_size` values. This mirrors findings on `GSM-Symbolic` and suggests that the LLM is significantly attending to the language semantics of variable names, and not just to the code semantics, just as human software engineers would. It may be worth producing a `MBPP-obfuscode-ren` dataset so others can use it for benchmarking. It may also be worth releasing this transform as part of an `obfuscode` Python library, so that others could produce their own larger fine-tuning datasets.

Interestingly despite extremely compelling results on `GSM-NoOp`, my `TypeObfuscator`, which appeared to use similar ideas, had no statistically significant impact on `pass@1`. More investigation would be required, for example by studying the effect of `typob_mask_distance`. However, even if this specific structural obfuscation turns out not to be useful, there are clearly a lot of other possible structural code transformations, whose combinatorial effects could be more extensively explored. 

In summary, and caveats aside, this notebook shows how simple metaprogrammatic transformation has the potential to help understand weaknesses in LLM code generation, and could provide options for improving their performance.