In [1]:
!pip install transformers datasets evaluate




[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
import evaluate

  from .autonotebook import tqdm as notebook_tqdm
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


## ***Get Dataset - BigCodeBench***

In [3]:
dataset = load_dataset("bigcode/bigcodebench", split="v0.1.0_hf[:10]")

## ***Initialize Models***

In [None]:
model_names = {                               #TODO : Change Model to check improvement
    "CodeLlama": "codellama/CodeLlama-7b-hf",
}

tokenizers = {}
models = {}

In [None]:
for name, model_id in model_names.items():
    tokenizers[name] = AutoTokenizer.from_pretrained(model_id)
    models[name] = AutoModelForCausalLM.from_pretrained(model_id)

## ***Feedback Model***

In [None]:
# A function to handle adding test case methods to the test case class
# Used in generate_feedback() for updating test cases

import ast
import astor

def test_case_updator(existing_test_code: str, new_test_method: str) -> str:
    """
    Add a new method to existing Python TestClass represented as a string.

    Args:
        existing_test_code (str): The Python code for the class.
        new_test_method (str): The Python code for the method to add.

    Returns:
        str: The updated Python code.
    """
    # Parsing the existing code and the new method into AST
    tree = ast.parse(existing_test_code)
    method_node = ast.parse(new_test_method).body[0]
    
    # Finding the class definition in the AST
    for node in tree.body:
        if isinstance(node, ast.ClassDef):
            node.body.append(method_node)
            break
    else:
        raise ValueError("No class definition found in the existing code.")

    # Returning the updated code as a string
    return astor.to_source(tree)

In [None]:
# Edge Test Case Generation Model Function for generate_feedback()
from transformers import T5ForConditionalGeneration, RobertaTokenizer
import torch

# Setup CodeT5 model for generate_feedback 
def setup_codet5():
    model_name = "Salesforce/codet5-small"
    tokenizer = RobertaTokenizer.from_pretrained(model_name)
    model = T5ForConditionalGeneration.from_pretrained(model_name)
    return model, tokenizer

model, tokenizer = setup_codet5()


def generated_edge_test_cases(function_code, errors):
    """
    Generate edge case tests using CodeT5
    
    Args:
        function_code (str): The source code of function to test
        errors (list): List of error strings, e.g. 
                      ["IndexError: list index out of range",
                       "TypeError: '>' not supported between instances of 'NoneType' and 'int'"]
    """
    
    prompt = f"""
Generate Python unittest test methods for edge cases.
Function to test:
{function_code}

Current test failures:
{errors}

Requirements:
1. Test edge cases and boundary conditions
2. Handle error cases
3. Follow unittest format
4. Use proper indentation (4 spaces)
5. Include meaningful assertions

Return ONLY the test methods, without the class definition.
"""
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
    outputs = model.generate(
        **inputs,
        max_length=200,
        temperature=0.7,
        top_p=0.95,
        do_sample=True
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

In [None]:
def generate_feedback(evaluation_result, g_code, tc_code):
    """
    Generate feedback and edge test cases based on evaluation results.
    
    Args:
        evaluation_result (dict): Contains pass_ratio, executability, and errors
        g_code (str): Generated code from the model
        tc_code (str): Current test cases code
    
    Returns:
        tuple: (prompt for next iteration, updated test code)
    """
    # Implement function for providing feedback to the model (solely based on evaluation results)

    # Evaluation results - pass_ratio, executability, errors
    pass_ratio = evaluation_result.get('pass_ratio', 0)
    executability = evaluation_result.get('executability', False)
    errors = evaluation_result.get('errors', [])

    # Executability = True, Pass Ratio < 1, Errors > 0  -> Generate edge test cases
    if executability:
        # Generating edge test cases if errors are present and/or pass ratio is less than 1
        if pass_ratio < 1 or len(errors) > 0:
            # Generate edge test cases
            new_tests = generated_edge_test_cases(g_code, errors)
            prompt = f'Code is executable. However, the pass ratio is less than 1. Please check the following errors - {errors}.'
            # Append to dataset['test'] (using another function)
            updated_test_code = test_case_updator(tc_code, new_tests)
    # Executability = False -> Update the prompt and ask the model to generate the code again [skip test case update]
    else:
        updated_test_code = tc_code
        prompt = f'Code not executable. These are the following errors - {errors}. Please regenerate the code.'

    # Returning prompt and generated test cases code (including any previous test cases)
    return prompt, updated_test_code

In [None]:
# def generate_feedback(evaluation_result, g_code, tc_code):
#     """
#     Generate feedback and edge cases based on evaluation results.
#     Args:
#         evaluation_result (dict): Contains pass_ratio, executability, and errors
#         g_code (str): Generated code
#         tc_code (str): Current test cases
#     Returns:
#         tuple: (prompt for next iteration, updated test code)
#     """
    
#     pass_ratio = evaluation_result.get('pass_ratio', 0)
#     executability = evaluation_result.get('executability', False)
#     errors = evaluation_result.get('errors', [])

#     if executability and pass_ratio < 1 and len(errors) > 0:
#         # Case III: Generate edge cases
#         try:
#             # Generate new edge case tests
#             new_tests = generate_edge_case_prompt(g_code, errors)
            
#             # Create feedback prompt (Or to Asks for analysis of failure causes and give solution, One more LLM responds)
#             prompt = f"""
# Fix the implementation to handle these edge cases:

# Current code:
# {g_code}

# Test failures:
# {format_errors(errors)}


# Based on the test failures:
# 1. Analyze what edge cases are causing failures
# 2. Consider input validation needed
# 3. Handle potential error conditions
# 4. Implement proper error handling
# 5. Add necessary boundary checks

# Provide an improved implementation that addresses these issues.
# """
#             # Update test code with new edge cases
#             updated_test_code = append_test_cases(tc_code, new_tests)
#             return prompt, updated_test_code

#         except Exception as e:
#             print(f"Error generating Case III: Generate edge cases: {e}")
#             return "", tc_code

#     elif not executability:
#         # Case IV & V: Focus on fixing execution errors
#         prompt = f"""
# Fix these execution errors:

# Code:
# {g_code}

# Errors:
# {format_errors(errors)}

# Provide a corrected implementation.
# """
#         return prompt, tc_code

#     return "", tc_code

# def format_errors(errors):
#     """Format error messages for prompt"""
#     return "\n".join(f"- {err.split('\n')[0]}" for err in errors)

# def append_test_cases(existing_tests, new_tests):
#     """Append new tests while maintaining class structure"""
#     try:
#         module = cst.parse_module(existing_tests)
        
#         class TestAppender(cst.CSTTransformer):
#             def __init__(self, new_test_code):
#                 self.new_tests = cst.parse_module(new_test_code).body
                
#             def leave_ClassDef(self, original_node, updated_node):
#                 if "TestClass" in original_node.name.value:
#                     return updated_node.with_changes(
#                         body=updated_node.body + self.new_tests
#                     )
#                 return updated_node

#         transformer = TestAppender(new_tests)
#         modified = module.visit(transformer)
#         return modified.code
#     except Exception as e:
#         print(f"Error appending tests: {e}")
#         return existing_tests

In [None]:
# This function has been updated to return the prompt and test code for the next iteration(s)
# [IMPORTANT : Run evaluation block before running this cell]
def generate_score(prompt, model, tokenizer, data, max_length=1000):
    n = 5 #TODO : Set N based on your inplementation
    test_code = data['test'] # Initially setting from BigCodeBench, will be updated in the loop
    for _ in range(n):
      inputs = tokenizer(prompt, return_tensors="pt")
      output = model.generate(**inputs, max_length=max_length, temperature=0.7, top_p=0.9, do_sample=True)
      evaluation_result = evaluate_code_generation(output, test_code, data['libs'])
      prompt, test_code = generate_feedback(evaluation_result, output, test_code)
    return evaluation_result['pass_ratio']

In [None]:
from pprint import pprint
pprint(dataset['test']) # Tests already present in the dataset

In [None]:
from pprint import pprint
pprint(dataset['libs'])

In [None]:
dataset.column_names

In [None]:
dataset['doc_struct']

In [None]:
generated_scores = {name: [] for name in model_names}

for example in dataset:
    prompt = example["instruct_prompt"]
    print("Prompt:" + prompt)
    for model_name in model_names:
        generated_score = generate_score(prompt, models[model_name], tokenizers[model_name], example)
        generated_scores[model_name].append(generated_score)

## ***Evaluation***

In [None]:
# # Evaluation Setup with BLEU (or CodeBLEU if available)
# bleu_metric = evaluate.load("bleu")

# # Prepare reference code for evaluation
# references = [example["canonical_solution"] for example in dataset]

# # Evaluate each model's generated code against the reference code
# evaluation_scores = {}
# for model_name, codes in generated_codes.items():
#     bleu_score = bleu_metric.compute(predictions=codes, references=references)
#     evaluation_scores[model_name] = bleu_score["bleu"]
#     print(f"{model_name} BLEU Score:", bleu_score["bleu"])

# # Print final evaluation summary
# print("\n=== Evaluation Summary ===")
# for model_name, score in evaluation_scores.items():
#     print(f"{model_name} BLEU Score: {score:.4f}")

In [5]:
import unittest
from unittest.mock import patch

def evaluate_code_generation(generated_code, test_cases_code, libraries):
    """
    Dynamically evaluates the generated code based on pass ratio and executability.

    Parameters:
    - generated_code: A string containing the code to be evaluated.
    - test_cases_code: A string containing the test cases to be executed.
    - libraries: A list of library names to import and make available in the execution context.

    Returns:
    - A dictionary containing 'pass_ratio', 'executability', and 'errors'.
    """
   
    
    
    # Prepare a local namespace for executing the generated code
    local_namespace = {}
    error_messages = []

    # Import libraries dynamically and add them to the namespace
    for lib in libraries:
        try:
            # Import each library and add it to the local namespace
            local_namespace[lib] = __import__(lib)
        except ImportError as e:
            error_messages.append(f"Error importing {lib}: {e}")
            return {
                'pass_ratio': 0,
                'executability': False,
                'errors': error_messages
            }
    
    try:
        # Execute the generated code in the provided namespace
        exec(generated_code, local_namespace, local_namespace)
        executability = True
    except Exception as e:
        error_messages.append(f"Execution Error: {e}")
        executability = False

    if not executability:
        return {
            'pass_ratio': 0,
            'executability': executability,
            'errors': error_messages
        }

    # Add task_func to local_namespace so it can be accessed by tests
    task_func = local_namespace.get('task_func')
    
    if not task_func:
        error_messages.append("task_func is not defined in the generated code.")
        return {
            'pass_ratio': 0,
            'executability': False,
            'errors': error_messages
        }

    # Dynamically create a TestCase class from the provided test cases code
    try:
        exec(test_cases_code, {'task_func': task_func, 'patch': patch}, local_namespace)
    except Exception as e:
        error_messages.append(f"Test case execution error: {e}")
        return {
            'pass_ratio': 0,
            'executability': False,
            'errors': error_messages
        }

    # Extract the TestCase class from the local namespace
    TestClass = local_namespace.get('TestCases')

    if not TestClass:
        error_messages.append("TestCases class not found in provided test cases code.")
        return {
            'pass_ratio': 0,
            'executability': False,
            'errors': error_messages
        }

    # Define a custom test suite
    class CustomTestSuite(unittest.TestSuite):
        def run(self, result, debug=False):
            super().run(result, debug)
            return result

    # Run the tests using unittest framework
    suite = CustomTestSuite()
    suite.addTest(unittest.makeSuite(TestClass))
    
    runner = unittest.TextTestRunner()
    
    # Capture results
    result = runner.run(suite)
    
    # Calculate pass ratio
    pass_ratio = (result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun if result.testsRun > 0 else 0
    
    # Collect errors from test results
    for failure in result.failures + result.errors:
        error_messages.append(str(failure))

    # Clear the local namespace after execution
    local_namespace.clear()
    
    return {
        'pass_ratio': pass_ratio,
        'executability': len(result.errors) == 0,
        'errors': error_messages
    }

In [6]:
# Example usage with dynamic inputs and libraries list
generated_code = """
import itertools
from random import shuffle

def task_func(numbers=list(range(1, 11))):
    permutations = list(itertools.permutations(numbers))
    sum_diffs = 0

    for perm in permutations:
        perm = list(perm)
        shuffle(perm)
        diffs = [abs(perm[i] - perm[i+1]) for i in range(len(perm)-1)]
        sum_diffs += sum(diffs)

    avg_sum_diffs = sum_diffs / len(permutations) if permutations else 0

    return avg_sum_diffs
"""

test_cases_code = """
import unittest

class TestCases(unittest.TestCase):
    
    def test_default_numbers(self):
        result = task_func()
        self.assertIsInstance(result, float)
        self.assertGreater(result, 0)

# Additional tests omitted for brevity...
"""

# List of libraries to import and use within exec()
libraries_to_import = []

# Evaluate the generated code with dynamic inputs and specified libraries
evaluation_result = evaluate_code_generation(generated_code, test_cases_code, libraries_to_import)
print(evaluation_result)

  suite.addTest(unittest.makeSuite(TestClass))
.
----------------------------------------------------------------------
Ran 1 test in 7.611s

OK


{'pass_ratio': 1.0, 'executability': True, 'errors': []}


In [None]:
#TODO : Implement a function to average teh scores from each list in generate_score which is sthe final score of the model