In [1]:
from datasets import load_dataset
import numpy as np
import subprocess
import difflib
import autopep8
import requests as req
import pandas as pd
from tqdm.auto import tqdm
tqdm.pandas()
import json
import tree_sitter_python as tspython
import tree_sitter_java as tsjava

from tree_sitter import Language, Parser

## Datset loading and preprocessing

We only use logic errors and perform the following preprocessing steps:
- Remove examples containing syntax errors
- Remove blank lines
- Format the buggy code and the solution
- Remove comments

In [2]:
dataset = load_dataset("Rtian/DebugBench")
df = pd.DataFrame(dataset['test'])
filtered_df = df[df['category'] == 'logic error'].copy()

In [3]:
PY_LANGUAGE = Language(tspython.language(), "python")
JAVA_LANGUAGE = Language(tsjava.language(), "java")

py_parser = Parser()
py_parser.set_language(PY_LANGUAGE)

java_parser = Parser()
java_parser.set_language(JAVA_LANGUAGE)

def remove_blank_lines(text):
    return "\n".join([s.rstrip() for s in text.splitlines() if s.strip()])
    
def format_cpp_code(code):
    try:
        process = subprocess.Popen(['clang-format', '--style=file:./clang-format.txt'], 
                                   stdin=subprocess.PIPE, 
                                   stdout=subprocess.PIPE, 
                                   stderr=subprocess.PIPE)
        formatted_code, errors = process.communicate(input=code.encode())
        if process.returncode != 0:
            print("Error cpp formatting code: ", errors.decode())
            return code
        process = subprocess.Popen(['g++', '-fpreprocessed', '-dD', '-E', '-P', '-'],
                                   stdin=subprocess.PIPE,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
        formatted_code, errors = process.communicate(input=formatted_code)

        
        if process.returncode != 0:
            print("Error cpp formatting code: ", errors.decode())
            return code
        
        return remove_blank_lines(formatted_code.decode())
    except Exception as e:
        print(f"An exception occurred: {e}")
        return code
    
def format_python_code(code):
    try:
        # ignore errors to prevent autopep from running forever
        formatted_code = autopep8.fix_code(code, options={'ignore': ['E']})
    except Exception as e:
        print(f"An exception occurred: {e}")
        return code
    bytes_code = bytes(formatted_code, "utf-8")
    array = bytearray(bytes_code)
    tree = py_parser.parse(bytes_code)
    def traverse(node):
        if node.type == 'comment':
            array[node.start_byte:node.end_byte]=(node.end_byte - node.start_byte) * b" "
            return 
        elif node.child_count == 0:
            return
        else:
            for child in node.children:
                traverse(child)
            return
    traverse(tree.root_node)
    cleaned_code = array.decode("utf-8")
    return remove_blank_lines(cleaned_code)


def format_java_code(code):
    try:
        process = subprocess.Popen(['clang-format', '--style=file:./clang-format-java.txt', '--assume-filename=Main.java'], 
                                   stdin=subprocess.PIPE, 
                                   stdout=subprocess.PIPE, 
                                   stderr=subprocess.PIPE)
        formatted_code, errors = process.communicate(input=code.encode())
        
        if process.returncode != 0:
            print("Error formatting java code: ", errors.decode())
            return code
        
    except Exception as e:
        print(f"An exception occurred: {e}")
        return code
    
    array = bytearray(formatted_code)
    tree = java_parser.parse(formatted_code)
    def traverse(node):
        if "comment" in node.type:
            #print(f"Removed comment: {array[node.start_byte:node.end_byte].decode('utf-8')}")
            array[node.start_byte:node.end_byte]=(node.end_byte - node.start_byte) * b" "
            return 
        elif node.child_count == 0:
            return
        else:
            for child in node.children:
                traverse(child)
            return
    
    traverse(tree.root_node)
    cleaned_code = array.decode("utf-8")
    return remove_blank_lines(cleaned_code)

def format_code(code, language):
    if language == "python3":
        return format_python_code(code)
    elif language == "cpp":
        return format_cpp_code(code)
    elif language == "java":
        return format_java_code(code)
    else:
        print(f"Unsupported language: {language}")
        return code

def format_entry(entry):
    buggy_code = entry['buggy_code']
    solution = entry['solution']
    return pd.Series({
        "buggy_code_formatted": format_code(buggy_code, entry['language']),
        "solution_formatted": format_code(solution, entry['language'])
    })
    
filtered_df[["buggy_code_formatted", "solution_formatted"]] = filtered_df.progress_apply(format_entry, axis=1)

formatted_df = filtered_df
formatted_df.to_csv("formatted_code.csv", index=False)

  0%|          | 0/590 [00:00<?, ?it/s]

In [44]:
print(format_python_code("class Solution: def canPlaceFlowers(self, flowerbed: List[int], n: int) -> bool: flowerbed=[0]+flowerbed+[0] for i in range(1,len(flowerbed)-1): if flowerbed[i]==0 and flowerbed[i+2]==0 and flowerbed[i-1]==0:flowerbed[i]=1;n-=1 if n<=0:return True return False "))

class Solution: def canPlaceFlowers(self, flowerbed: List[int], n: int) -> bool: flowerbed=[0]+flowerbed+[0] for i in range(1,len(flowerbed)-1): if flowerbed[i]==0 and flowerbed[i+2]==0 and flowerbed[i-1]==0:flowerbed[i]=1;n-=1 if n<=0:return True return False


## Prompt the LLM
After the preprocessing, we now retrieve lines of code that are relevant for fixing the buggy code.

In [4]:
#formatted_df = pd.read_csv("formatted_code.csv")


In [5]:
def find_changed_lines(file1_contents, file2_contents, print_diff=False):
    diff = difflib.ndiff(file1_contents.splitlines(), file2_contents.splitlines())
    
    changed_lines = set()

    current_line_number = 0
    num_clusters = 0
    current_block_start = 0
    no_change_in_line = True
    for line in diff: 
        if print_diff:
            print(f"{current_line_number}: {line}")
 
        if line.startswith('  '):
            current_line_number += 1
            no_change_in_line = True
        else:
            if no_change_in_line:
                num_clusters += 1
            no_change_in_line = False
            if line.startswith('- '):  # Lines in file1 but not in file2
                current_line_number += 1
            changed_lines.add(current_line_number)
    return list(changed_lines), num_clusters

In [None]:
log_file = open("output2.json", "w")
log_file.write("[")

def retrieve_relevant_lines_for_entry(entry):
    buggy_code = entry['buggy_code_formatted']
    solution = entry['solution_formatted']
    num_lines = buggy_code.count('\n')
    changed_lines = find_changed_lines(buggy_code, solution)

    data = {"code": buggy_code, "language": entry['language']}
    success = True
    response_json = []
    try:
        resp = req.post("http://delos.eaalab.hpi.uni-potsdam.de:8010/highlight-code/", json=data)
        try:
            response_json = resp.json()
        except:
            print("Json parsing failed")
            print(resp.text())
    except:
        print("Request failed")
        success = False    

    predicted_lines = []
    suggestions = []
    descriptions = []
    actions = []
    for item in response_json:
        line_number = int(item.get('line_number', "1"))
        suggestions.append(item.get("description", ""))
        descriptions.append(item.get("suggestion", ""))
        predicted_lines.append(line_number)
        actions.append(item.get("action", ""))

    result = {
        'buggy_code': buggy_code,
        'solution': solution,
        'changed_lines': changed_lines,
        'predicted_lines': predicted_lines,
        'num_lines': num_lines,
        'success': success,
        'suggestions': suggestions,
        'descriptions': descriptions,
        'actions': actions,
    }
    log_file.write(f'{json.dumps(result)},')
    log_file.flush()
    return result


results = formatted_df.progress_apply(retrieve_relevant_lines_for_entry, axis=1)

log_file.write("]")
log_file.close()
results_df = pd.DataFrame(results.tolist())
results_df.to_csv("results_formatted.csv", index=False)

  0%|          | 0/590 [00:00<?, ?it/s]

Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed
Json parsing failed
Request failed


In [28]:
import ast
df = pd.read_csv("results_formatted.csv")
#df['num_clusters'] = df['changed_lines'].apply(ast.literal_eval[1])
df[['changed_lines', 'num_clusters']] = df['changed_lines'].apply(ast.literal_eval).apply(pd.Series)
df['predicted_lines'] = df['predicted_lines'].apply(ast.literal_eval)


In [29]:
success_df = df[df["success"]]
# https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
# We need the following line when working with success_df instead of df
#pd.options.mode.copy_on_write = False
print(f'Number of successful requests: {len(success_df)}')
df.head()
print(df.iloc[0])

Number of successful requests: 564
buggy_code         class Solution {\n   public:\n    long long fa...
solution           class Solution {\n   public:\n    long long fa...
changed_lines                                                    [5]
predicted_lines                                          [5, 13, 23]
num_lines                                                         24
success                                                         True
suggestions        ['The function calculates the factorial of n +...
descriptions       ['Change n + 1 to n in the recursive call.', '...
actions                         ['change', 'change', 'insert_above']
num_clusters                                                       1
Name: 0, dtype: object


In [30]:
df_dataset = pd.DataFrame(dataset['test'])
df['language'] = df_dataset['language']
df['level'] = df_dataset['level']
df['category'] = df_dataset['category']


In [41]:
def create_labels(row):
    labels = pd.Series([0] * (row['num_lines'] + 1))
    for line in row['changed_lines']:
        if line == 0 or line > row['num_lines'] +1:
            continue
        labels[line-1] = 1
    assert len(labels) == row['num_lines']+1, f"{len(labels)} != {row['num_lines']}"
    return labels.tolist()

# TODO: Insertion points
def create_predictions(row):
    predictions = pd.Series([0] * (row['num_lines'] +1))
    for line in row['predicted_lines']:
        if line == -1:
            continue
        if line > row['num_lines'] + 1:
            print(f"Error {line} exceeds {row['num_lines']}")
            continue
        predictions[line-1] = 1
    assert len(predictions) == row['num_lines'] +1, f"{len(labels)} != {row['num_lines']}"
    return predictions.tolist()

df['labels'] = df.apply(create_labels, axis=1)
df['predictions'] = df.apply(create_predictions, axis=1)

labels = np.array([b for a in df['labels'].values for b in a])
predictions = np.array([b for a in df['predictions'].values for b in a])
assert len(labels) == len(predictions)

In [42]:
new_df = df[df['changed_lines'] in df['predicted_lines']]

TypeError: unhashable type: 'Series'

In [39]:
from sklearn.metrics import confusion_matrix, f1_score

def entry_based_metrics(entry):
    labels = np.array(entry['labels'])
    predictions = np.array(entry['predictions'])
    conf_matrix = confusion_matrix(labels, predictions)

    return {'tn': conf_matrix[0][0],
            'fn': conf_matrix[1][0],
            'fp': conf_matrix[0][1],
            'tp': conf_matrix[1][1],
            'f1': f1_score(labels, predictions)}



results = df.progress_apply(entry_based_metrics, axis=1)

results_df = pd.DataFrame(results.tolist())

df['tp'] = results_df['tp']
df['fp'] = results_df['fp']
df['fn'] = results_df['fn']
df['tn'] = results_df['tn']
df['f1'] = results_df['f1']


  0%|          | 0/590 [00:00<?, ?it/s]



IndexError: index 1 is out of bounds for axis 0 with size 1

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 6))
plt.hist(df['f1'], bins=200, edgecolor='black')
plt.title('Histogram of F1 Scores')
plt.xlabel('F1 Score')
plt.ylabel('Frequency')
plt.show()

In [None]:
from IPython.display import display, HTML
res_df = df.sort_values(by='f1', ascending=False).head(10)#[['buggy_code', 'solution', 'changed_lines', 'predicted_lines']]

def pretty_print(input_df):
    return display(HTML(input_df.to_html().replace("\\n", "<br>")))

pretty_print(res_df)

In [None]:
def aggregate_and_print(input_df):
    tn = input_df['tn'].sum()
    fn = input_df['fn'].sum()
    tp = input_df['tp'].sum()
    fp = input_df['fp'].sum()
    print([tn, fp], "\n",
           [fn, tp])

In [None]:
print("differen levels")
easy_df = df[df['level'] == 'easy']
aggregate_and_print(easy_df)
mid_df = df[df['level'] == 'medium']
aggregate_and_print(mid_df)
hard_df = df[df['level'] == 'hard']
aggregate_and_print(hard_df)

print("non zero")
non_zero_df = df[df['suggestions'].apply(lambda x: len(x) > 0)]
aggregate_and_print(non_zero_df)

print("zero")
zero_df = df[df['suggestions'].apply(lambda x: len(x) == 0)]
aggregate_and_print(zero_df)

print("error types")
easy_df = df[df['category'] == 'syntax error']
aggregate_and_print(easy_df)
mid_df = df[df['category'] == 'multiple error']
aggregate_and_print(mid_df)
hard_df = df[df['category'] == 'logic error']
aggregate_and_print(hard_df)
ref_df = df[df['category'] == 'reference error']
aggregate_and_print(ref_df)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, balanced_accuracy_score

accuracy = accuracy_score(labels, predictions)
balanced_acc = balanced_accuracy_score(labels, predictions)
precision = precision_score(labels, predictions)
recall = recall_score(labels, predictions)
f1 = f1_score(labels, predictions)
conf_matrix = confusion_matrix(labels, predictions)


print(f'Accuracy: {accuracy}')
print(f'Balanced Accuracy: {balanced_acc}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')
print(f'Confusion Matrix:\n{conf_matrix}')

In [None]:
import matplotlib.pyplot as plt

from sklearn.metrics import roc_curve, auc, precision_recall_curve

num_positives = labels.sum()

fpr, tpr, thresholds = roc_curve(labels, predictions)
precision, recall, thresholds2 = precision_recall_curve(labels, predictions)
fn = [(num_positives) - tp * num_positives for tp in tpr]
f1 = [(2*tpr[i]*num_positives)/(2*tpr[i]*num_positives + fpr[i]*num_positives + fn[i]) for i in range(len(fpr))]
print(max(f1))

roc_auc = auc(fpr, tpr)
print("Area", roc_auc)

plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('AUC')
plt.legend(loc='lower right')

plt.figure()
plt.plot(thresholds, fpr, color='blue')
plt.plot(thresholds, tpr, color='red')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('Classification Thresholds')
plt.ylabel('True/False Positive Rate')

plt.figure()
plt.plot(thresholds, f1, color='navy')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('Classification Thresholds')
plt.ylabel('F1-Score')

plt.figure()
plt.plot(precision, recall, color='darkorange', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('Precision')
plt.ylabel('Recall')
plt.title('Precision Recall Curve')
plt.legend(loc='lower right')

length = min(len(precision), len(recall))
plt.figure()
plt.plot(thresholds2, precision[:length-1], color='blue')
plt.plot(thresholds2, recall[:length-1], color='red')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('Classification Thresholds')
plt.ylabel('Precision blue / recall red')

plt.show()