In [1]:
import os
import random
import pandas as pd
import torch
from pathlib import Path
from transformers import AutoModelForCausalLM, AutoTokenizer
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score
from sacrebleu import corpus_chrf, corpus_bleu
from difflib import SequenceMatcher
import sacrebleu
import subprocess
from rapidfuzz.distance.Levenshtein import distance as levenshtein_distance

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def load_code_files(directory_path, file_types=['.py']):
    code_snippets = []
    for path in Path(directory_path).rglob('*'):
        if path.suffix in file_types:
            with open(path, 'r') as file:
                code_snippets.append(file.read())
    return code_snippets

def split_code_snippet(snippet):
    lines = snippet.split('\n')
    if len(lines) < 10:
        return None
    cursor_position = random.randint(3, len(lines) - 3)
    prefix = '\n'.join(lines[:cursor_position])
    middle = lines[cursor_position]
    suffix = '\n'.join(lines[cursor_position+1:])
    return prefix, middle, suffix

code_directory = "data_python"  # Update this path for yours python repo
code_snippets = load_code_files(code_directory)

dataset = []
for snippet in code_snippets:
    split_result = split_code_snippet(snippet)
    if split_result:
        dataset.append(split_result)

dataset = [item for item in dataset if all(item)]

dataset = dataset[:50]

df = pd.DataFrame(dataset, columns=['Prefix', 'Middle', 'Suffix'])

df.head()

Unnamed: 0,Prefix,Middle,Suffix
0,"def dfs(source,visited,adjacency_list):\n v...","def count_components(adjacency_list,size):",count = 0\n visited = [False]*(size+1)\...
1,"def max_path_sum(root):\n maximum = float(""...",return 0,"left = helper(root.left, maximum)\n rig..."
2,"def search_rotate(array, val):\n low, high ...","return search_rotate_recur(array, ...","return search_rotate_recur(array, mid ..."
3,def shell_sort(arr):\n n = len(arr)\n ga...,arr[x_index + gap] = y,y_index = y_index + 1\n gap...
4,class TreeNode(object):\n def __init__(self...,vals.append(str(node.val)),build_string(node.left)\n ...


In [3]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Using MPS (Metal Performance Shaders) on Apple Silicon GPU.")
else:
    device = torch.device("cpu")
    print("MPS not available. Using CPU.")

Using MPS (Metal Performance Shaders) on Apple Silicon GPU.


In [4]:
model_name = "codeparrot/codeparrot"  # Or "Phind/Phind-CodeLlama-34B-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

model = model.to(device)

def generate_code_completion(prefix, suffix, max_length=128, max_retries=10):
    input_text = prefix + tokenizer.eos_token + suffix
    input_ids = tokenizer.encode(input_text, return_tensors="pt", truncation=True, max_length=max_length).to(device)
    
    print(f"Input Text: {input_text}")
    
    attention_mask = torch.ones(input_ids.shape, device=device)
    
    if input_ids.shape[1] == 0:
        print("Input sequence is too short. Skipping...")
        return ""
    
    attempt = 0
    completions = []

    while attempt < max_retries:
        try:
            completion = model.generate(
                input_ids, 
                attention_mask=attention_mask,
                max_length=input_ids.shape[1] + 50,
                do_sample=True,
                top_k=50,
                pad_token_id=tokenizer.eos_token_id
            )
            generated_text = tokenizer.decode(completion[0], skip_special_tokens=True)
            
            middle_start = len(prefix)
            middle_end = generated_text.find(suffix)
            result = generated_text[middle_start:middle_end] if middle_end != -1 else generated_text[middle_start:]
            
            completions.append(result.strip())
            
            print(f"Attempt {attempt + 1}/{max_retries} - Generated: {result.strip()}")
            
            if attempt == max_retries:
                break

            attempt += 1
                
        except Exception as e:
            print(f"Error generating completion on attempt {attempt + 1}: {e}")
            attempt += 1

    return completions

df['Generated_Middle_10'] = [[] for _ in range(len(df))]

for index, row in df.iterrows():
    print(f"\nGenerating for Example {index + 1} (Prefix and Suffix shown below):")
    print(f"Prefix: {row['Prefix']}")
    print(f"Suffix: {row['Suffix']}")
    
    generated_middle_10 = generate_code_completion(row['Prefix'], row['Suffix'], max_retries=10)
    df.at[index, 'Generated_Middle_10'] = generated_middle_10

    print(f"All generated completions for Example {index + 1}: {generated_middle_10}\n")

df['Generated_Middle'] = df['Generated_Middle_10'].apply(lambda completions: completions[0] if completions else "")

print("\nFinal Generated Middle Parts:")
print(df[['Prefix', 'Generated_Middle', 'Suffix']].head())




Generating for Example 1 (Prefix and Suffix shown below):
Prefix: def dfs(source,visited,adjacency_list):
    visited[source] = True
    for child in adjacency_list[source]:
        if not visited[child]:
            dfs(child,visited,adjacency_list)

Suffix:     count = 0
    visited = [False]*(size+1)
    for i in range(1,size+1):
        if not visited[i]:
            dfs(i,visited,adjacency_list)
            count+=1
    return count

def main():
    node_count,edge_count = map(int, input("Enter the Number of Nodes and Edges \n").split(' '))
    adjacency = [[] for _ in range(node_count+1)]
    for _ in range(edge_count):
        print("Enter the edge's Nodes in form of `source target`\n")
        source,target = map(int,input().split(' '))
        adjacency[source].append(target)
        adjacency[target].append(source)
    print("Total number of Connected Components are : ", count_components(adjacency,node_count))

Input Text: def dfs(source,visited,adjacency_list):
    visited[

### Pass@10

In [5]:
def pass_at_10(middle, generated_middle_list):
    return any(generated_middle.strip() == middle.strip() for generated_middle in generated_middle_list)

df['Pass@10'] = df.apply(lambda row: pass_at_10(row['Middle'], row['Generated_Middle_10']), axis=1)

### Levenshtein Distance

In [6]:
def calculate_levenshtein(reference, hypothesis):
    if not hypothesis:
        return len(reference)
    return levenshtein_distance(reference, hypothesis)

df['Levenshtein_Distance'] = df.apply(lambda row: calculate_levenshtein(row['Middle'], row['Generated_Middle_10'][0]), axis=1)
print("Levenshtein Distance Results:")
print(df[['Prefix', 'Middle', 'Generated_Middle_10', 'Levenshtein_Distance']].head())

Levenshtein Distance Results:
                                              Prefix  \
0  def dfs(source,visited,adjacency_list):\n    v...   
1  def max_path_sum(root):\n    maximum = float("...   
2  def search_rotate(array, val):\n    low, high ...   
3  def shell_sort(arr):\n    n = len(arr)\n    ga...   
4  class TreeNode(object):\n    def __init__(self...   

                                              Middle  \
0         def count_components(adjacency_list,size):   
1                                           return 0   
2              return search_rotate_recur(array, ...   
3                             arr[x_index + gap] = y   
4                         vals.append(str(node.val))   

                                 Generated_Middle_10  Levenshtein_Distance  
0  [count = 0\n    visited = [False]*(size+1)\n  ...                   396  
1                               [, , , , , , , , , ]                    16  
2                               [, , , , , , , , , ]             

### Execution Pass

In [7]:
def execute_code(code):
    try:
        with open("temp_code.py", "w") as file:
            file.write(code)
        result = subprocess.run(["python", "temp_code.py"], capture_output=True, text=True)
        return result.returncode == 0
    except Exception as e:
        print(f"Error executing code: {e}")
        return False

df['Execution_Pass'] = df['Generated_Middle'].apply(lambda code: execute_code(code))

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

### Syntax Match

In [8]:
def syntax_match(code1, code2):
    matcher = SequenceMatcher(None, code1.strip(), code2.strip())
    return matcher.ratio() > 0.8  # Consider as match if similarity ratio is above 80%

df['Syntax_Match'] = df.apply(lambda row: syntax_match(row['Generated_Middle'], row['Middle']), axis=1)

### BLEU Score

In [9]:
def calculate_bleu(reference, hypothesis):
    bleu = sacrebleu.sentence_bleu(hypothesis, [reference])
    return bleu.score

df['BLEU_Score'] = df.apply(lambda row: calculate_bleu(row['Middle'], row['Generated_Middle']), axis=1)

### Final metrics

In [10]:
pass_at_1 = df['Execution_Pass'].mean()
pass_at_10 = df['Pass@10'].mean()
exact_match = df.apply(lambda row: row['Generated_Middle_10'][0].strip() == row['Middle'].strip(), axis=1).mean()
syntax_match_score = df['Syntax_Match'].mean()
average_bleu = df['BLEU_Score'].mean()
average_levenshtein = df['Levenshtein_Distance'].mean()

final_metrics = {
    'Pass@1': pass_at_1,
    'Pass@10': pass_at_10,
    'Exact Match': exact_match,
    'Syntax Match': syntax_match_score,
    'Average BLEU': average_bleu,
    'Average Levenshtein': average_levenshtein
}

print("Final Evaluation Metrics:")
for metric, value in final_metrics.items():
    print(f"{metric}: {value * 100 if 'Pass' in metric else value:.2f}")

df.to_csv('code_completion_evaluation_dataset.csv', index=False)

with open('code_completion_evaluation_metrics.txt', 'w') as file:
    for metric, value in final_metrics.items():
        file.write(f"{metric}: {value * 100 if 'Pass' in metric else value:.2f}\n")
    
print("Dataset and metrics saved successfully!")

Final Evaluation Metrics:
Pass@1: 46.00
Pass@10: 2.00
Exact Match: 0.02
Syntax Match: 0.02
Average BLEU: 1.35
Average Levenshtein: 164.80
Dataset and metrics saved successfully!


# Report

## Thought Process:
1. We began by splitting code snippets into `prefix`, `middle`, and `suffix` to simulate code completion tasks.
2. A pre-trained model, Tiny Starcoder, was used to predict the missing middle part of each code snippet.
3. We generated completions and compared them with the actual missing code.
4. Exact Match, BLEU, and ChrF scores were computed as automatic evaluation metrics.
5. We manually reviewed the model's outputs and assigned labels (`Correct`, `Partially Correct`, `Incorrect`).

## Findings:
- Exact Match showed a good correlation with the manual labels when the generated code was identical to the reference.
- BLEU and ChrF scores, which account for partial overlaps, showed some correlation but were less sensitive to semantic correctness than expected.
- Manual review revealed that models often generated correct patterns but with incorrect variable names or minor differences.

## Conclusions:
- Exact Match is useful for identifying perfect matches, but more sophisticated models or custom metrics might be needed for assessing near-correct completions.
- BLEU and ChrF scores provide insight into how similar the completion is, but manual review is still essential for accurate assessment.