In [1]:
import os
import sys
import subprocess
import re
import csv
import sys
import importlib.util
import os
import subprocess
import json
import ast
import Levenshtein
import astor
import textwrap


### General functions

In [2]:

def read_jsonl_file(file_path):
    with open(file_path) as f:
        lines = f.readlines()
        return [json.loads(line) for line in lines]
    
def read_json_file(file_path):
    with open(file_path) as f:
        return json.load(f)



def get_indentation_level(line):
    return len(line) - len(line.lstrip())



def get_project_path(project_name):
    init_path = os.getcwd() + "/benchmarks/CoderEval/CoderEval/home/travis/builds/repos"
    project = None
    for root, dirs, files in os.walk(init_path):
        for d in dirs:
            if d == project_name:
                project = os.path.join(root, d)
                break
        if project:
            break
    return project

    
def get_task_info(task_id, tasks_file_path="benchmarks/CoderEval/CoderEval4Python.json"):
    tasks = read_json_file(tasks_file_path)["RECORDS"]

    for task in tasks:
        project_name, original_file_path, task_code, test_name, task_name = None, None, None, None, None
        if task["_id"] == task_id:
            project_name, file_path, task_code, test_name, task_name = task["project"].replace('/', '---'), task["file_path"].replace('.py', '_test.py'), task["code"], task["test_name"], task["name"]
            original_file_path = file_path.replace('.py', f'_{task_id}.py')
            break

    complete_path = os.path.join(get_project_path(project_name), original_file_path)
    if os.path.exists(complete_path):
        test_file = open(complete_path, 'r')
        test_content = test_file.read()
        if test_name == "":
            # replace the except block with an except Exception and print it before isT = False
            test_content = test_content.replace('    except:\n        isT = False', '    except Exception as e:\n        isT = False\n        print("Error while running the task: ", e)')
            test_content = test_content.replace('    except: \n        isT = False', '    except Exception as e:\n        isT = False\n        print("Error while running the task: ", e)')
            test_content = test_content.replace('    except:\n        isT=False', '    except Exception as e:\n        isT = False\n        print("Error while running the task: ", e)')
            test_content = test_content.replace('    except:\n        isT= False', '    except Exception as e:\n        isT = False\n        print("Error while running the task: ", e)')
        test_file.close()
    else:
        test_contents = read_json_file("benchmarks/CoderEval/tests/record_testcases_map_python.json")
        for test_task_id, test_content in test_contents.items():
            if test_task_id == task_id:
                # add an assert for isT if not a test file
                if test_name == "":
                    test_content = test_content.replace(f'if not isT:\n        raise Exception("Result not True!!!")', '')
                    test_content = test_content + f"\n    try:\n        assert isT == True, \"isT is not True\"\n        print(\"Tests passed for the task: {task_name}\")\n    except AssertionError as e:\n        print(\"Test failed for the task:  {task_name}\", e)"
                break
        
    return project_name, original_file_path, task_code, test_content, test_name, task_name


def get_prompt_techniques_applied(is_zero, is_fewshot, is_CoT, is_persona, is_package, is_signature):
    prompt_techinques_applied = ""
    if is_zero:
        prompt_techinques_applied += "Zero-shot, "
    if is_fewshot:
        prompt_techinques_applied += "Few-shot, "
    if is_CoT:
        prompt_techinques_applied += "CoT, "
    if is_persona:
        prompt_techinques_applied += "Persona, "
    if is_package:
        prompt_techinques_applied += "Package, "
    if is_signature:
        prompt_techinques_applied += "Signature, "
    prompt_techinques_applied = prompt_techinques_applied[:-2]
    return prompt_techinques_applied


def extract_function_name(groundtruth_code):
    tree = ast.parse(groundtruth_code)
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            return node.name
    return None

### Environment setup functions

In [3]:
def is_python_version_installed(version):
    result = subprocess.run(["pyenv", "versions", "--bare"], capture_output=True, text=True)
    installed_versions = result.stdout.split()
    abstract_versions = [".".join(v.split(".")[:2]) for v in installed_versions]
    print(abstract_versions)
    return version in abstract_versions


def get_complete_python_version(python_version):
    result = subprocess.run(["pyenv", "versions", "--bare"], capture_output=True, text=True)
    installed_versions = result.stdout.split()
    for v in installed_versions:
        if python_version in v:
            return v
    return None


def is_standard_library(module_name):
    if module_name in sys.builtin_module_names:
        return True
    try:
        spec = importlib.util.find_spec(module_name)
        if spec is None or spec.origin is None:
            return False
        module_path = spec.origin

        # Get the standard library path
        std_lib_path = os.path.dirname(os.__file__)
        
        # Check if the module path is within the standard library directory
        if module_path.startswith(std_lib_path):
            return True
        
        return False
    except ImportError:
        return False
    
    
def check_module_needs_install(module_name):
    modules_to_ignore = ["re", "os", "sys", "subprocess", "urllib", "src", "collections", "-r"]
    if module_name == "pytz" or "six":
        return True
    if module_name.replace(' ', '') == "" or module_name in modules_to_ignore:
        return False
    if is_standard_library(module_name):
        return False  # It's a standard library module
    try:
        print("importing ", module_name)
        importlib.import_module(module_name)
        return False  
    except ImportError:
        return True  # Module needs to be installed
   

def get_python_version(project_name):
    with open("project_versions.csv", mode='r') as infile:
        reader = csv.reader(infile)
        for row in reader:
            if row[0] == project_name:
                return row[1].strip()
    return None


def install_python(python_version):
    # Install the required Python version if not installed
    if not is_python_version_installed(python_version):
        print(f"Installing Python {python_version}")
        if "3.6" in python_version:
            print("Applying patch for Python 3.6.15")
            subprocess.run(["pyenv", "install", "3.6.15"], shell=True)
        else:
            subprocess.run(["pyenv", "install", python_version], shell=True)
    
    complete_python_version = get_complete_python_version(python_version)
    print("Python installed: ", complete_python_version)
    python_path = os.path.expanduser(f"~/.pyenv/versions/{complete_python_version}/bin/python{python_version}")
    pip_path = os.path.expanduser(f"~/.pyenv/versions/{complete_python_version}/bin/pip{python_version}")#Users/ranimkhojah/.pyenv/versions/3.7.17/

    return python_path, pip_path

def install_general_dependencies(project_path, pip_path):
    requirements_files = ["requirements.txt", "test-requirements.txt", "requirements-dev.txt", "requirements.dev.txt", "requirements_dev.txt", "requirements-test.txt", "requirements-swh.txt", "requirements-dev.txt", "test_requirements.txt", "requirements-development.txt", "test-requirements.txt"]
    for req_file in requirements_files:
        req_file_path = os.path.join(project_path, req_file)
        if os.path.exists(req_file_path):
            print(f"Found requirements file: {req_file}")
            try:
                install_requirements(req_file_path, pip_path)
                # subprocess.check_call([pip_path, 'install', '-r', req_file_path])
            except subprocess.CalledProcessError as e:
                print(f"Failed to install requirements: Command failed with error {e}")
    
    # check local dependencies (setup.py)
    setup_file = os.path.join(project_path, "setup.py")
    if os.path.exists(setup_file):
        try:
            subprocess.check_call([pip_path, 'install', '-e', project_path])
        except subprocess.CalledProcessError as e:
            print(f"Failed to install local dependencies: Command failed with error {e}")
    
def install_requirements(req_file_path, pip_path):
    with open(req_file_path, 'r') as f:
        packages = [line.strip() for line in f if line.strip() and not line.startswith('#')]

    for package in packages:
        try:
            subprocess.check_call([pip_path, 'install', package])
        except subprocess.CalledProcessError:
            print(f"Failed to install {package}, continuing...")


def install_imports(task_file_path, pip_path):
    dependencies = set()

    with open(task_file_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            line = line.strip()
            if line.startswith('import ') or line.startswith('from '):
                parts = line.split()
                if parts[0] == 'import':
                    dependencies.add(parts[1].split('.')[0])
                elif parts[0] == 'from':
                    dependencies.add(parts[1].split('.')[0])

    # Filter out standard library modules
    third_party_dependencies = {dep for dep in dependencies if check_module_needs_install(dep)}
    # Install dependencies
    for dep in third_party_dependencies:
        # refactor
        mapping = {"yaml": "pyyaml", "ruamel": "ruamel.yaml", "git": "GitPython", "OpenSSL": "pyOpenSSL", "requests": "requests==2.25.1", "Crypto": "pycryptodome", "PIL": "Pillow", "fs_s3fs": "fs-s3fs", "dateutil": "python-dateutil"}
        if dep in mapping:
            dep = mapping[dep]
        try:
            if dep == "six":
                subprocess.check_call([pip_path, 'install', '--upgrade', 'setuptools<36'])
                os.environ["VIRTUALENV_NO_DOWNLOAD"] = "1"

            subprocess.check_call([pip_path, 'install', dep])

        except subprocess.CalledProcessError as e:
            print(f"Failed to install {dep}: Command failed with error {e}")



def initialize_venv():
    subprocess.run(['pyenv', 'init', '--path'], shell=True)
    subprocess.run(['pyenv', 'init', '-'], shell=True)
    subprocess.run(['pyenv', 'rehash'], shell=True)

### Test setup functions

In [4]:
def setup_pytest(python_path, pip_path, project, project_name):
    env = os.environ.copy()
    env["PYTHONPATH"] = python_path
    subprocess.run([pip_path, "install", "pytest"], shell=False, cwd=project)
    # subprocess.run([pip_path, "install", "-e", "."], shell=False, cwd=project)
    if project_name == "awsteiner---o2sclpy": # special case for h5py library weird installation
        subprocess.run(['brew', 'install', 'hdf5'], shell=False, cwd=project)
        subprocess.run([pip_path, "install", "--no-build-isolation", "--no-cache-dir", "h5py"], shell=False, cwd=project)
        subprocess.run([pip_path, "install", "h5py"], shell=False, cwd=project)
               

def levenshtein_distance(str1, str2):
    return Levenshtein.distance(str1, str2)

def jaccard_distance(str1, str2):
    set1 = set(str1.split())
    set2 = set(str2.split())
    return 1 - len(set1.intersection(set2)) / len(set1.union(set2))

def compute_distance(str1, str2, method="jaccard"):
    if method == "jaccard":
        return jaccard_distance(str1, str2)
    elif method == "levenshtein":
        return levenshtein_distance(str1, str2)
    else:
        raise ValueError("Invalid similarity method")


def remove_docstring(function_str):
    tree = ast.parse(function_str)        
    
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            if (node.body and isinstance(node.body[0], ast.Expr) and
            isinstance(node.body[0].value, (ast.Str, ast.Constant))):
            # Remove the docstring
                node.body = node.body[1:]
    
    new_function_str = astor.to_source(tree)
    
    return new_function_str



### About the dataset

In [None]:
projects_and_tasks = {}

with open("benchmarks/CoderEval/CoderEval4Python.json") as f:
    data = json.load(f)
    for record in data["RECORDS"]:
        project_name = record["project"].replace('/', '---')
        task_id = record["_id"]
        if project_name not in projects_and_tasks:
            projects_and_tasks[project_name] = []
        projects_and_tasks[project_name].append(task_id)

print("Number of projects: ", len(projects_and_tasks))
print("Number of Python tasks", len(data["RECORDS"]))

## Prepare the virtual environment

In [None]:
for project_name, tasks in projects_and_tasks.items():
    if project_name != "":
        continue
    print("Preparing the environment for project: ", project_name)

    project = get_project_path(project_name)

    python_version = get_python_version(project_name)
    

    if os.path.exists(os.path.join(project, f"{project_name}_env")):
        print(f"Virtual environment already exists for {project_name}")
        # get existing pip and python paths
        python_path = os.path.join(project, f"{project_name}_env/bin/python")
        pip_path = os.path.join(project, f"{project_name}_env/bin/pip")
    else:
        python_path, pip_path = install_python(python_version)
        # pip_path = os.path.join(project, f"{project_name}_env/bin/pip{python_version}")
        initialize_venv()
        print(f"Creating virtual environment in Python {python_version}...")
        subprocess.run([python_path, "-m", "venv", f"{project_name}_env"], cwd=project)
    
    print("Installing dependencies ...")
    install_general_dependencies(project, pip_path)

    # # Uncomment to install dependencies for each task
    # for task_id in tasks:
    #     print("Installing dependencies for task: ", task_id)
    #     project_name, file_path, task_code, test_file, test_name, task_name = get_task_info(task_id)
    #     complete_file_path = os.path.join(project, file_path)
    #     install_imports(complete_file_path, pip_path)
    #     print("Dependencies installed for task: ", task_name, " (" , task_id, ")")
    

## Run the tests

In [1]:
dataset = 'codereval'
language = 'python'
version = '12'
model = 'mistral'

In [None]:
%%capture cap


stats_per_tech = {}
stats_per_task = {}

model_results_path = 'results/' + model + '/'
model_output_file = model_results_path + 'model_output/' + model + '_' + dataset + '_' + language + '_v' + version + '.jsonl'
model_predictions = read_jsonl_file(model_output_file)
evaluation_results_file = model_results_path + 'evaluation_results/evaluation_' + dataset +'_v' + version + '.csv'

with open(evaluation_results_file, mode='w', newline='') as csv_file:
    fieldnames = ['comb_id', 'task_id', 'prompt_technique', 'prompt', 'test_result', 'error_message', 'groundtruth_code', 'generated_code', 'lexical_distance', 'test_code', 'is_zero', 'is_fewshot', 'is_CoT', 'is_persona', 'is_package', 'is_signature']
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames, delimiter='$')
    writer.writeheader()

    for i, row in enumerate(model_predictions):
        try:
            # about the prediction of tasks
            task_id = str(row['task_id'])
            is_zero, is_fewshot, is_CoT, is_persona, is_package, is_signature = row['combination']
            combination = ''.join([str(int(is_zero)), str(int(is_fewshot)), str(int(is_CoT)), str(int(is_persona)), str(int(is_package)), str(int(is_signature))])
            prompt_techinques_applied = get_prompt_techniques_applied(is_zero, is_fewshot, is_CoT, is_persona, is_package, is_signature)
            prompt = re.sub(r'\n', '', row['prompt'])

            # about the project and groundtruth
            groundtruth_code = row['groundtruth_code']
            test_code = row['tests'] # just a part of the test, just for printing

            project_name, original_file_path, task_code, test_content, test_name, task_name = get_task_info(task_id)

            # if project_name != "pexip---os-zope":
            #     continue

            project = get_project_path(project_name)
            python_version = get_python_version(project_name)
            python_path = os.path.join(project, f"{project_name}_env/bin/python")
            pip_path = os.path.join(project, f"{project_name}_env/bin/pip")
            
            # about the generated code
            generated_code = row['generated_code']
            if generated_code == "" or generated_code == "null" or generated_code == None:
                print(f"Skipping task {task_id} with combination {combination} as the generated code is empty")
                writer.writerow({'comb_id': i, 'task_id': task_id, 'prompt_technique': prompt_techinques_applied, 'prompt': prompt, 'test_result': 'Failed', 'error_message': 'Incomplete code generated', 'groundtruth_code': groundtruth_code, 'generated_code': generated_code, 'lexical_distance': '', 'test_code': test_code, 'is_zero': is_zero, 'is_fewshot': is_fewshot, 'is_CoT': is_CoT, 'is_persona': is_persona, 'is_package': is_package, 'is_signature': is_signature})
                continue

            file_path = original_file_path.replace('.py', f'_{combination}.py')
            file_path = os.path.join(project, file_path)

            # creating the test file
            with open(file_path, "w") as f:
                groundtruth_code_function_name = extract_function_name(groundtruth_code)
                generated_code_function_name = extract_function_name(generated_code)

                # ensure the correct indentation level
                dedented_generated_code = textwrap.dedent(generated_code)
                first_line = task_code.splitlines()[0]
                indentation_level = get_indentation_level(first_line)
                generated_task_code = dedented_generated_code.replace(generated_code_function_name, groundtruth_code_function_name)
                generated_task_code = textwrap.indent(generated_task_code, ' ' * indentation_level) + "\n\n"

                test_content = test_content.replace(task_code, generated_task_code) 
                f.write(test_content)


            print(f"Running the task: {task_id} ({task_name}) with combination: {combination}")

            if test_name == "": # This is a class not a test file
                class_output = subprocess.run([python_path, file_path], capture_output=True, text=True, cwd=project)
                if "Error while running the task:" in class_output.stdout:
                    extracted_error = re.search(r'Error while running the task: (.*)', class_output.stdout)
                    class_output.stderr = extracted_error.group(1)
                elif "isT is not True" in class_output.stdout:
                    class_output.stderr = "isT is not True"
            else:
                print("This is a pytest file.")
                setup_pytest(python_path, pip_path, project, project_name)
                class_output = subprocess.run([python_path, "-m", "pytest", file_path], capture_output=True, text=True, cwd=project)

                if class_output.returncode == 0:
                    class_output.stdout = class_output.stdout + "Tests passed for the task: " + task_id
                    

            print("Class output:", class_output.stdout)
            print("Class error:", class_output.stderr)


            # groundtruth_code = remove_docstring(groundtruth_code)
            # generated_code = remove_docstring(generated_code)
            # similarity_between_codes = compute_distance(groundtruth_code, generated_code, method="levenshtein")

            test_result = "Passed" if "Tests passed for the task" in class_output.stdout else "Failed"

            writer.writerow({'comb_id': i, 'task_id': task_id, 'prompt_technique': prompt_techinques_applied, 'prompt': prompt, 'test_result': test_result, 'error_message': class_output.stderr, 'groundtruth_code': groundtruth_code, 'generated_code': generated_code, 'lexical_distance': '', 'test_code': test_code, 'is_zero': is_zero, 'is_fewshot': is_fewshot, 'is_CoT': is_CoT, 'is_persona': is_persona, 'is_package': is_package, 'is_signature': is_signature})


            if test_result == "Failed":
                if task_id in stats_per_task:
                    stats_per_task[task_id] += 1
                else:
                    stats_per_task[task_id] = 1
                if prompt_techinques_applied in stats_per_tech:
                    stats_per_tech[prompt_techinques_applied] += 1
                else:
                    stats_per_tech[prompt_techinques_applied] = 1
        



            
        except Exception as e:
            print("Error running the task: ", task_id, e)
            continue


print("Failed tests per prompt technique: \n")
print(stats_per_tech)
print("Failed tests per task_id: \n")
print(stats_per_task)


In [8]:
with open(f'test_projects_output_{version}.txt', 'w') as file:
    file.write(cap.stdout)