In [41]:
%autoreload 2

In [74]:
import os
import re
import sys
import typing


from IPython.display import display, Markdown, HTML

import datasets
from evaluate import load

from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts.chat import (
    ChatPromptTemplate,
    AIMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.schema import (
    SystemMessage
)

sys.path.append(os.path.abspath('..'))

import llm_feedback.utils.env as env


In [52]:
os.environ["HF_ALLOW_CODE_EVAL"] = '1'
env.load_dotenv()
mbpp_dataset = datasets.load_dataset('mbpp')

No config specified, defaulting to: mbpp/full
Found cached dataset mbpp (/Users/guydavidson/.cache/huggingface/datasets/mbpp/full/1.0.2/4458a31cd4305553c8e88e3f0bfb94fc74fe1a9faeeb8c32ed166939735eaeff)


  0%|          | 0/4 [00:00<?, ?it/s]

In [44]:
mbpp_dataset['train'][0]

{'task_id': 601,
 'text': 'Write a function to find the longest chain which can be formed from the given set of pairs.',
 'code': 'class Pair(object): \r\n\tdef __init__(self, a, b): \r\n\t\tself.a = a \r\n\t\tself.b = b \r\ndef max_chain_length(arr, n): \r\n\tmax = 0\r\n\tmcl = [1 for i in range(n)] \r\n\tfor i in range(1, n): \r\n\t\tfor j in range(0, i): \r\n\t\t\tif (arr[i].a > arr[j].b and\r\n\t\t\t\tmcl[i] < mcl[j] + 1): \r\n\t\t\t\tmcl[i] = mcl[j] + 1\r\n\tfor i in range(n): \r\n\t\tif (max < mcl[i]): \r\n\t\t\tmax = mcl[i] \r\n\treturn max',
 'test_list': ['assert max_chain_length([Pair(5, 24), Pair(15, 25),Pair(27, 40), Pair(50, 60)], 4) == 3',
  'assert max_chain_length([Pair(1, 2), Pair(3, 4),Pair(5, 6), Pair(7, 8)], 4) == 4',
  'assert max_chain_length([Pair(19, 10), Pair(11, 12),Pair(13, 14), Pair(15, 16), Pair(31, 54)], 5) == 5'],
 'test_setup_code': '',
 'challenge_test_list': []}

In [162]:
test_gen_prompt_template = ChatPromptTemplate.from_messages([
    SystemMessage(content="You are a helpful Python test generator, capable of generating tests that are helpful and evaluate code coverage and edge cases."),
    HumanMessagePromptTemplate.from_template("""
You will be given a Python programming task and 3 unit tests. Please write 3 additional unit tests to help the developer test their code. The tests should be as helpful as possible, and should cover as many edge cases as possible. The tests should be written in the same style as the two provided tests. 
Imporant: Do not include the existing test cases in your solution! Output just the new test cases. Ensure that you use the same function and class names as in the existing tests. Each test case should be ready to copy-paste Python console and run.
Instruction:
"{text}"
Current unit tests:
```python 
{test_list_0}
{test_list_1}
{test_list_2}
```
New unit tests:
```python
""".strip(), input_variables=["text", "test_list_0", "test_list_1", "test_list_2"]),
])

llm = ChatOpenAI(model_name="gpt-3.5-turbo-0613")

chain = LLMChain(llm=llm, prompt=test_gen_prompt_template)

In [149]:
code_eval = load("guydav/restrictedpython_code_eval")
markdown_pattern = re.compile(r"```\w*")

DEFAULT_ADDITIONAL_GLOBALS = {
    'all': all,
    'dict': dict,
    'filter': filter,
    'map': map,
    'max': max,
    'sum': sum,
    'enumerate': enumerate, 
    'reversed': reversed,
}

In [163]:


def _parse_test_cases(text: str):
    current_start = -1

    test_cases = []

    while True:
        current_start = text.find('assert', current_start + 1)
        if current_start == -1:
            break

        current_end = text.find('\n', current_start)
        test_cases.append(text[current_start:current_end])

    return test_cases

def run_and_evaluate_chain(chain: LLMChain, mbpp_example: typing.Dict[str, typing.Any]):
    out = chain({
        "text": mbpp_example["text"],
        # HumanMessagePromptTemplate appears to not be able to handle lists,
        # so we need to pass each element separately.
        "test_list_0": mbpp_example["test_list"][0],
        "test_list_1": mbpp_example["test_list"][1],
        "test_list_2": mbpp_example["test_list"][2],
    })
    
    test_cases = _parse_test_cases(out["text"])

    solutions = [mbpp_example['code']]
    solutions = [markdown_pattern.sub('', solution).strip() for solution in solutions]
    solutions = [solution.replace('(object)', '') for solution in solutions]

    result = code_eval.compute(
        references=test_cases, 
        predictions=[solutions] * len(test_cases), 
        k=[len(solutions)],
        allowed_imports=['typing', 'collections', 'math', 're', 'heapq'],
        additional_globals=DEFAULT_ADDITIONAL_GLOBALS,
        allow_str_format=True,
        allow_underscore_variable_names=True,
        timeout=60,
        )[1]  # type: ignore
    
    return out, test_cases, solutions, result



## Notes
* Frame around code coverage -- LOC hit, edge and corner cases, etc.
    * Increase against both the existing unit tests, and randomly generated inputs
* Provide the test-generating/feedback model with model-generated code, and evaluate the test cases generated on the model's code and the gold code
* How do we know if this works? 
    * (1) tests pass on gold code
    * (2) tests help identify cases where model-generated code passes the MBPP tests but misses something
    * (3) compared to a random baseline [generate random inputs, query gold code for correct outputs, test against model-generated code]
* (Potential thought for future: if we do a multi-turn thing, where the test-generating model gets a chance to revise the the tests based on the code's output, how psychophantic is the model -- does it tend to revise its tests to match the code's output, or does it maintain its original tests against this feedback -- type I vs type II errors, etc.)
* Multi-turn dialog to fix errors or revise tests according to the code's output


In [164]:
index = 5
out, test_cases, solutions, result = run_and_evaluate_chain(chain, mbpp_dataset['train'][index])

result

defaultdict(list,
            {1: [(0,
               {'task_id': 1,
                'passed': True,
                'result': 'passed',
                'completion_id': 0})],
             2: [(0,
               {'task_id': 2,
                'passed': False,
                'result': "failed (<class 'AssertionError'>): ",
                'completion_id': 0,
                'exception_type': 'AssertionError',
                'exception_description': ''})],
             0: [(0,
               {'task_id': 0,
                'passed': True,
                'result': 'passed',
                'completion_id': 0})]})

In [166]:
test_cases

['assert radian_degree(0)==0',
 'assert radian_degree(180)==3.141592653589793',
 'assert radian_degree(270)==4.7123889803846']

In [157]:
import numpy as np
np.radians(45.5) - 0.7941249230758024

-1.1341838246359259e-07

In [167]:
mbpp_dataset['train'][index]

{'task_id': 606,
 'text': 'Write a function to convert degrees to radians.',
 'code': 'import math\r\ndef radian_degree(degree):\r\n radian = degree*(math.pi/180)\r\n return radian',
 'test_list': ['assert radian_degree(90)==1.5707963267948966',
  'assert radian_degree(60)==1.0471975511965976',
  'assert radian_degree(120)==2.0943951023931953'],
 'test_setup_code': '',
 'challenge_test_list': []}

In [131]:
print(mbpp_dataset['train'][index]['text'])

for test_case in mbpp_dataset['train'][index]['test_list']:
    display(Markdown((f'```python\n{test_case}\n```\n')))

display(Markdown(f'```python\n{(solutions[0])}\n```'))

Write a function to check if the given integer is a prime number.


```python
assert prime_num(13)==True
```


```python
assert prime_num(7)==True
```


```python
assert prime_num(-1010)==False
```


```python
def prime_num(num):
  if num >=1:
   for i in range(2, num//2):
     if (num % i) == 0:
                return False
     else:
                return True
  else:
          return False
```

In [130]:
for c in test_cases:
    print(c)

assert prime_num(2) == True
assert prime_num(1) == False
assert prime_num(0) == False
assert prime_num(1000000007) == True


In [137]:
print("def prime_num(num):\r\n  if num >=1:\r\n   for i in range(2, num//2):\r\n     if (num % i) == 0:\r\n                return False\r\n     else:\r\n                return True\r\n  else:\r\n          return False")

def prime_num(num):
  if num >=1:
   for i in range(2, num//2):
     if (num % i) == 0:
                return False
     else:
                return True
  else:
          return False


In [147]:
def prime_num(num):
	if num >=1:
		for i in range(2, num//2):
			if (num % i) == 0:
				return False
			else:
				return True
	else:
		return False

print(prime_num(15))

True


In [116]:
tc = test_cases

code_eval.compute(
        references=tc, 
        predictions=[solutions] * len(tc), 
        k=[len(solutions)],
        allowed_imports=['typing', 'collections', 'math', 're', 'heapq'],
        additional_globals=DEFAULT_ADDITIONAL_GLOBALS,
        allow_str_format=True,
        allow_underscore_variable_names=True,
        timeout=60,
        )[1]

defaultdict(list,
            {0: [(0,
               {'task_id': 0,
                'passed': True,
                'result': 'passed',
                'completion_id': 0})],
             1: [(0,
               {'task_id': 1,
                'passed': False,
                'result': "failed (<class 'AssertionError'>): ",
                'completion_id': 0})],
             2: [(0,
               {'task_id': 2,
                'passed': False,
                'result': "failed (<class 'IndexError'>): list index out of range",
                'completion_id': 0})],
             3: [(0,
               {'task_id': 3,
                'passed': False,
                'result': "failed (<class 'AssertionError'>): ",
                'completion_id': 0})]})

In [101]:
sys.path.append('/Users/guydavidson/projects/restrictedpython_code_eval')
import restrictedpython_code_eval

In [102]:
metric = restrictedpython_code_eval.RestrictedPythonCodeEval()

In [106]:
metric.compute(
        references=tc, 
        predictions=[solutions] * len(tc), 
        k=[len(solutions)],
        allowed_imports=['typing', 'collections', 'math', 're', 'heapq'],
        additional_globals=DEFAULT_ADDITIONAL_GLOBALS,
        allow_str_format=True,
        allow_underscore_variable_names=True,
        timeout=60,
        )[1]

defaultdict(list,
            {0: [(0,
               {'task_id': 0,
                'passed': True,
                'result': 'passed',
                'completion_id': 0})]})