### Fine-tuning [Phi-1.5 model](https://huggingface.co/microsoft/phi-1_5) on the collected Kotlin dataset

In [None]:
# !pip install accelerate transformers einops datasets peft bitsandbytes fuzzywuzzy

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling, BitsAndBytesConfig, QuantoConfig
import torch
from torch.utils.data import Dataset
import json
from typing import List, Tuple
from torch import nn
from peft import LoraConfig, get_peft_model
from fuzzywuzzy import fuzz
import re

In [None]:
class KotlinDataset(Dataset):
    def __init__(self, file_path, tokenizer, max_length):
        with open(file_path, 'r') as f:
            lines = f.readlines()
        self.examples = [json.loads(line) for line in lines]
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        entry = self.examples[idx]
        # Concatenate 'input' and 'gt' fields before passing to the tokenizer
        context = entry['input'].split('<EOL>')[-5:]  # Use the last 5 lines
        context = '<EOL>'.join(context).strip()
        concatenated_input = context + entry['gt']
        encoding = self.tokenizer(concatenated_input,
                                  max_length=self.max_length,
                                  truncation=True,
                                  padding='max_length',
                                  return_tensors='pt')
        input_ids = encoding['input_ids'].squeeze(0)
        labels = input_ids.clone()

        return {'input_ids': torch.tensor(input_ids), 'labels': labels}

In [None]:
def load_special_tokens(path: str) -> List[str]:

    """
    Load special tokens from a JSON file and format them into a list.
    """

    with open(path, "r") as file:
        literals = json.load(file)
    tokens = ["<STR_LIT>", "<NUM_LIT>", "<CHAR_LIT>"]
    tokens.extend(f"<STR_LIT:{lit}>" for lit in literals["str"])
    tokens.extend(f"<NUM_LIT:{lit}>" for lit in literals["num"])
    tokens.extend(f"<CHAR_LIT:{lit}>" for lit in literals["char"])

    return tokens


def load_model(model_name: str,
               special_tokens_path: str,
               bnb_config) -> Tuple[AutoTokenizer, nn.Module]:

    """
    Load a pretrained tokenizer and model from Hugging Face, and add special tokens.
    """

    special_tokens = load_special_tokens(special_tokens_path)
    tokenizer = AutoTokenizer.from_pretrained(model_name,
                                              additional_special_tokens=special_tokens)
    model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb_config)
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    model.resize_token_embeddings(len(tokenizer))  # Important to resize model token embeddings

    return tokenizer, model

In [None]:
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16
)

model_name = "microsoft/phi-1.5"
special_tokens_path = "literals.json"
tokenizer, model = load_model(model_name, special_tokens_path, bnb_config)

In [None]:
config = LoraConfig(
    r=16,
    lora_alpha=16,
    target_modules=["dense", "fc2","q_proj","k_proj","v_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

model = get_peft_model(model, config)
model.print_trainable_parameters()

trainable params: 10,223,616 || all params: 1,425,241,318 || trainable%: 0.7173252607036754


In [None]:
# Prepare the dataset
dataset = KotlinDataset('/content/kotlin_code_train.json', tokenizer, 256)

# Define training arguments
training_args = TrainingArguments(
    output_dir='./kotlin_code_completion',
    num_train_epochs=3,
    per_device_train_batch_size=2,
    logging_dir='./logs',
    logging_steps=10,
)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset
)

# Start training
trainer.train()

# Save the model
model.save_pretrained('./kotlin_code_completion_model')
tokenizer.save_pretrained('./kotlin_code_completion_model')


  return {'input_ids': torch.tensor(input_ids), 'labels': labels}


Step,Training Loss
10,12.0764
20,10.1737
30,8.8924
40,8.2689
50,7.7583
60,7.2716
70,6.8971
80,6.6792
90,5.9516
100,5.2435


  return {'input_ids': torch.tensor(input_ids), 'labels': labels}
  return {'input_ids': torch.tensor(input_ids), 'labels': labels}


('./kotlin_code_completion_model/tokenizer_config.json',
 './kotlin_code_completion_model/special_tokens_map.json',
 './kotlin_code_completion_model/vocab.json',
 './kotlin_code_completion_model/merges.txt',
 './kotlin_code_completion_model/added_tokens.json',
 './kotlin_code_completion_model/tokenizer.json')

### Evaluation

Evaluate the fine-tuned model on the same Python CodeXGLUE test set and Kotlin test set

In [None]:
def predict_next_line(code: str, tokenizer: AutoTokenizer,
                      model: nn.Module, device: str = 'cuda') -> str:

    """
    Predict the next line of code given an input sequence of code.
    """

    model.eval()
    model.to(device)
    inputs = tokenizer.encode(code, return_tensors="pt").to(device)
    outputs = model.generate(inputs, max_length=512, num_return_sequences=1)
    predicted_code = tokenizer.decode(outputs[0], skip_special_tokens=True)

    return predicted_code


def read_and_predict(json_file: str, tokenizer: AutoTokenizer,
                     model: torch.nn.Module, device: str = 'cuda') -> None:

    """
    Read JSON file containing code inputs, and predict the next line for each input.
    """

    outputs = []
    with open(json_file, 'r') as file:
        for n, line in enumerate(file):
            try:
                json_object = json.loads(line)
                input_lines = json_object['input'].split('<EOL>')
                # Keep only the last 5 lines
                if len(input_lines) > 5:
                    input_lines = input_lines[-5:]
                input_code = '<EOL>'.join(input_lines) + '<EOL>'
                num_lines = len(input_code.split('<EOL>')) - 1
                predicted_line = predict_next_line(input_code, tokenizer, model, device)
                predicted_line = predicted_line.replace('\n', '<EOL>')
                print(predicted_line.split('<EOL>'))
                print(predicted_line.split('<EOL>')[num_lines])
                outputs.append(predicted_line.split('<EOL>')[num_lines])
                print(n)

            except json.JSONDecodeError as e:
                print(f"Error reading JSON: {e}")

    return outputs

In [None]:
def post_process(code: str) -> str:

    """ Converting special symbols in a code string to their respective
    literals or removing them """

    code = code.replace("<NUM_LIT>", "0").replace("<STR_LIT>", "").replace("<CHAR_LIT>", "")
    pattern = re.compile(r"<(STR|NUM|CHAR)_LIT:(.*?)>", re.S)
    lits = re.findall(pattern, code)
    for lit in lits:
        code = code.replace(f"<{lit[0]}_LIT:{lit[1]}>", lit[1])
    return code


def evaluate(answers_path: str, predictions_path: str) -> None:

    """ Evaluating predictions against ground truth answers,
    computing exact match (EM) and edit similarity metrics """

    data = []
    with open(answers_path, 'r') as i_file:
        for line in i_file:
            try:
                data.append(json.loads(line))
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
                continue

    with open(predictions_path, "r") as f:
        gts = f.readlines()

    assert len(data) == len(gts), f"Samples of predictions and answers are not equal, {len(data)}: {len(gts)}"

    total = len(gts)
    EM = 0.0
    edit_sim = 0.0
    for i, (gt, pred) in enumerate(zip(data, gts)):
        try:
            pred = post_process(pred.strip())
            gt = post_process(gt["gt"])
            edit_sim += fuzz.ratio(pred, gt)
            if pred.split() == gt.split():
                EM += 1
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON on line {i + 1}: {gt}")
            print(e)
            continue  # Skip this line or handle it as needed

    edit_similarity = round(edit_sim / total, 2) if total else 0
    exact_match = round((EM / total) * 100, 2) if total else 0
    print(f"Edit sim: {edit_similarity}, EM: {exact_match}")


Evaluation on Python test set

In [None]:
python_outputs = read_and_predict(json_file='/content/CodeXGLUE_test_processed.json',
                                  tokenizer=tokenizer,
                                  model=model)

In [13]:
with open('predictions_finetuned_python.txt', 'w') as fp:
    for item in python_outputs:
        fp.write(item)
        fp.write('\n')

Evaluation on Kotlin test set

In [None]:
kotlin_outputs = read_and_predict(json_file='/content/kotlin_code_test.json',
                                  tokenizer=tokenizer,
                                  model=model)

In [None]:
with open('predictions_finetuned_kotlin.txt', 'w') as fp:
    for item in kotlin_outputs:
        fp.write(item)
        fp.write('\n')

Evaluate the performance

CodeXGLUE Python

In [14]:
evaluate('/content/CodeXGLUE_test_answers.json',
         '/content/predictions_finetuned_python.txt')

Edit sim: 8.16, EM: 3.0


Kotlin

In [None]:
evaluate('/content/kotlin_code_answers.json',
         '/content/predictions_finetuned_kotlin.txt')

Edit sim: 16.05, EM: 10.0
