## Error analysis of finetune model on function name prediction

In [1]:

import os
os.environ["HF_HOME"] = "/workspace/s3/hf"

In [2]:
from datasets import load_dataset
# We will only use 1000 samples from test set, to reduce inference time 
test_dst = load_dataset("hynky/code_search_net_python_func_names")["test"].select(range(1000))
# Sort by length for faster inference
test_dst = test_dst.map(lambda x: {"len": len(x["source_code"])})
test_dst = test_dst.sort("len")

  table = cls._concat_blocks(blocks, axis=0)


In [3]:

import re
def get_fc_name_tokens(fc_name):
    """
    This function parses the function name and returns its "tokens",
    splitting on underscores and camel case.
    """
    # Split on underscores
    parts = fc_name.split('_')

    # Split camel case parts
    tokens = []
    for part in parts:
        tokens.extend(re.findall(r'[A-Z]?[a-z]+|[A-Z]+(?=[A-Z]|$)', part))

    return tokens


In [4]:
import numpy as np

def calculate_precision(expected, predicted):
    matching_tokens = np.sum(np.isin(predicted, expected))
    precision = matching_tokens / len(predicted) if predicted else 0
    return precision

def calculate_recall(expected, predicted):
    matching_tokens = np.sum(np.isin(predicted, expected))
    recall = matching_tokens / len(expected) if expected else 0
    return recall

def calculate_f1(precision, recall):
    f1 = 2 * (precision * recall) / (precision + recall) if precision + recall else 0
    return f1

def calculate_average_metric(expected_tokens, predicted_tokens, metric_func):
    total_metric = np.sum([metric_func(expected, predicted) for expected, predicted in zip(expected_tokens, predicted_tokens)])
    average_metric = total_metric / len(expected_tokens)
    return average_metric

In [5]:
from sklearn.metrics import f1_score, accuracy_score
import torch
from tqdm import tqdm

def predict_with_model(model, tokenizer, dataset, batch_size: int, max_length: int = 2048, max_gen_tokens=100):
    padding = True if batch_size > 1 else "do_not_pad"
    predictions = []
    for batch_start in tqdm(range(0, len(dataset), batch_size)):
        batch = dataset.select(range(batch_start, min(batch_start + batch_size, len(dataset))))
        tokenized = batch.map(lambda x: tokenizer(x["text"], padding=padding, 
                                                    add_special_tokens=False, max_length=max_length,
                                                    truncation=True, verbose=False), batched=True)
        inputs = torch.tensor(tokenized["input_ids"]).to("cuda")
        attention_mask = torch.tensor(tokenized["attention_mask"]).to("cuda")
        generated_tokens = model.generate(inputs=inputs,
                                attention_mask=attention_mask,
                                max_new_tokens=max_gen_tokens)

        generated_tokens = generated_tokens.to("cpu")
        predicted_names = tokenizer.batch_decode(generated_tokens[:, inputs.shape[1]:], skip_special_tokens=True)
        predictions.extend(predicted_names)

        # Free cuda memory
        del generated_tokens
        del inputs
        del attention_mask
        import gc
        gc.collect()
        torch.cuda.empty_cache()

    return predictions

def calculate_accuracy(true_tokens, predicted_tokens, tokenizer):
    # Convert tokens to text
    true_texts = [tokenizer.decode(tokens) for tokens in true_tokens]
    predicted_texts = [tokenizer.decode(tokens) for tokens in predicted_tokens]

    # Calculate accuracy
    accuracy = accuracy_score(true_texts, predicted_texts)

    return accuracy

import re

def extract_function_name(text):
    match = re.search(r"```(.*)```", text)
    if match:
        return match.group(1)
    
    match_simple = re.search(r"`(.*)`", text)
    if match_simple:
        return match_simple.group(1)
    return text

def evaluate_with_model(model, tokenizer, dataset, batch_size: int, max_length: int = 2048, max_gen_tokens=100, transform_predictions=lambda x: x):
    # model = None
    predictions = predict_with_model(model, tokenizer, dataset, batch_size, max_length, max_gen_tokens)
    predictions = transform_predictions(predictions)
    expected_tokens = [get_fc_name_tokens(x) for x in dataset["function_name"]]
    predictions_tokens = [get_fc_name_tokens(x) for x in predictions]

    precision = [calculate_precision(exp, pred) for exp,pred in zip(expected_tokens, predictions_tokens)]
    recall = [calculate_recall(exp, pred) for exp,pred in zip(expected_tokens, predictions_tokens)]
    f1 = [calculate_f1(prec, recall) for prec,recall in zip(precision, recall)]
    return {
        "accuracy": accuracy_score(predictions, dataset["function_name"]),
        "recall":  np.sum(recall) / len(predictions_tokens),
        "precision": np.sum(precision) / len(predictions_tokens),
        "f1": np.sum(f1) / len(predictions_tokens),
        "predictions": predictions
    }

## Simple prompting without finetune_model first

In [6]:
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "codellama/CodeLlama-7b-Instruct-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
# Since we are able to bit the model in memory, we don't have to go for peft
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, use_flash_attention_2=True, device_map="auto")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
# Tokenize
def apply_template(x, tokenizer):
    return {"text": tokenizer.apply_chat_template(
        [{"role": "system", "content": "Generate a fitting name for the provided function. Place the suggested function name inside tripplet backtics. E.g ```adder```"},
         {"role": "user", "content": x["source_code"]}],
        tokenize=False)
    }

dataset = test_dst.map(apply_template, fn_kwargs={"tokenizer": tokenizer})
results_base = evaluate_with_model(model, tokenizer, dataset, 3, transform_predictions=lambda x: [extract_function_name(y) for y in x])

In [8]:
# free memory
del model
import gc
gc.collect()

torch.cuda.empty_cache()

## Finetuned LoRA model

In [9]:
model_name = "hynky/codellama-7b-sft-lora-func-names"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
# Since we are able to bit the model in memory, we don't have to go for peft
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, use_flash_attention_2=True, device_map="auto")

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
# Tokenize
def apply_template(x, tokenizer):
    return {"text": tokenizer.apply_chat_template(
        [{"role": "system", "content": f"Given the source code of a python function, suggest a fitting name for the function."},
        {"role": "user", "content": x["source_code"]}],
        tokenize=False)
    }

dataset = test_dst.map(apply_template, fn_kwargs={"tokenizer": tokenizer})
# I am not sure why the model adds space at the end of each function name
results_lora = evaluate_with_model(model, tokenizer, dataset, 3, transform_predictions=lambda preds: [pred.strip() for pred in preds])

In [3]:
# free memory
import gc
gc.collect()

torch.cuda.empty_cache()

## Finetune QLora

In [6]:
from pathlib import Path
from peft.auto import AutoPeftModelForCausalLM
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
model_name = "hynky/codellama-7b-sft-lora-func-names-4bit"
# Load in 4-bit as was trained
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, load_in_4bit=True, device_map="auto",
                                             use_flash_attention_2=True)

tokenizer = AutoTokenizer.from_pretrained(model.name_or_path)
tokenizer.pad_token = tokenizer.eos_token

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
def apply_template(x, tokenizer):
    return {"text": tokenizer.apply_chat_template(
        [{"role": "system", "content": f"Given the source code of a python function, suggest a fitting name for the function."},
        {"role": "user", "content": x["source_code"]}],
        tokenize=False)
    }
dataset = test_dst.map(apply_template, fn_kwargs={"tokenizer": tokenizer})
# I am not sure why the model adds space at the end of each function name
results_qlora = evaluate_with_model(model, tokenizer, dataset, 6, transform_predictions=lambda preds: [pred.strip() for pred in preds])

In [None]:
# Table with result
# Suprsingly the QLora does really good job, considering the fact that it's 4bit qunatized
# For produciton it makes the most sense
import pandas as pd

results = {
    'Base': results_base,
    'LoRA': results_lora,
    'Q-LoRA': results_qlora
}

df_results = pd.DataFrame(results)
df_results


Unnamed: 0,Base,LoRA,Q-LoRA
accuracy,0.13,0.394,0.386
recall,0.346998,0.619457,0.606014
precision,0.323036,0.633829,0.628983
f1,0.320728,0.614021,0.603586
predictions,"[set_context, timestamp, mkdir, timestamp, vol...","[set_context, now, mkdir, epoch, add_volume, g...","[set_context, now, mkdir, epoch, add_volume, g..."


In [8]:
# We will now continue with QLora results
# First let's see where it made mistake
import random

expected = dataset["function_name"]
predicted = results_qlora["predictions"]
errors = [i for i in range(len(expected)) if expected[i] != predicted[i]]

for i in random.sample(errors, 10):
    print(f"Example {i+1}")
    print(f"Expected: {expected[i]}")
    print(f"Predicted: {predicted[i]}")
    print(f"Source code: \n{dataset['source_code'][i]}")
    print("-"*50)
# I would say that the errors made by LLM are fairly reasonable.
# Sometimes just a parts are mistakes as can be seen in realtively high f1 score

Example 511
Expected: write_local_schema_file
Predicted: get_files
Source code: 
def x(self, cursor):
    schema = []
    tmp_schema_file_handle = NamedTemporaryFile(delete=True)
    for name, type in zip(cursor.column_names, cursor.column_types):
        schema.append(self.generate_schema_dict(name, type))
    json_serialized_schema = json.dumps(schema).encode('utf-8')
    tmp_schema_file_handle.write(json_serialized_schema)
    return {self.schema_filename: tmp_schema_file_handle}

--------------------------------------------------
Example 103
Expected: get_conn
Predicted: get_client
Source code: 
def x(self):
    http_authorized = self._authorize()
    return build('dataproc', self.api_version, http=http_authorized,
        cache_discovery=False)

--------------------------------------------------
Example 314
Expected: get_conn
Predicted: get_connection
Source code: 
def x(self):
    service = self.get_service()
    project = self._get_field('project')
    return BigQueryConnection(

In [9]:
# Let's compare average length
avg_len_expected = sum(len(name) for name in expected) / len(expected)
avg_len_predicted = sum(len(name) for name in predicted) / len(predicted)

print(f"Average length of expected function names: {avg_len_expected}")
print(f"Average length of predicted function names: {avg_len_predicted}")


Average length of expected function names: 13.94
Average length of predicted function names: 13.344


In [22]:
# Lastly let's see how much will accuracy rise if we use stemming
# Unfornutately almost not at all only 7 sampels were fixed

from nltk.stem import PorterStemmer

stemmer = PorterStemmer()
stemmed_expected = ["_".join(stemmer.stem(token) for token in get_fc_name_tokens(name)) for name in expected]
stemmed_predicted = ["_".join(stemmer.stem(token) for token in get_fc_name_tokens(name)) for name in predicted]
errors_stemmed = [i for i in range(len(stemmed_expected)) if stemmed_expected[i] != stemmed_predicted[i]]

print(f"Accuracy after stemming: {1 - len(errors)/len(expected)}")

# Let's see which erros were fixed
fixed_indices = set(errors) - set(errors_stemmed)
for i in fixed_indices:
    print(f"Example {i+1}")
    print(f"Expected: {expected[i]}")
    print(f"Predicted: {predicted[i]}")
    print(f"Source code: \n{dataset['source_code'][i]}")
    print("-"*50)

Accuracy after stemming: 0.386
Example 833
Expected: gzipped
Predicted: gzip
Source code: 
def x(f):

    @functools.wraps(f)
    def x(*args, **kwargs):

        @after_this_request
        def x(response):
            accept_encoding = request.headers.get('Accept-Encoding', '')
            if 'gzip' not in accept_encoding.lower():
                return response
            response.direct_passthrough = False
            if (response.status_code < 200 or response.status_code >= 300 or
                'Content-Encoding' in response.headers):
                return response
            gzip_buffer = IO()
            gzip_file = gzip.GzipFile(mode='wb', fileobj=gzip_buffer)
            gzip_file.write(response.data)
            gzip_file.close()
            response.data = gzip_buffer.getvalue()
            response.headers['Content-Encoding'] = 'gzip'
            response.headers['Vary'] = 'Accept-Encoding'
            response.headers['Content-Length'] = len(response.data)
           