In [None]:
# !pip install datasets
# !pip install bitsandbytes

In [None]:
import pickle
import torch
import numpy as np
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
    DataCollatorForSeq2Seq,
    TrainerCallback,
    DataCollatorForLanguageModeling
)
from datasets import Dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

In [None]:
# Load training data
with open('/content/drive/MyDrive/train_data_postprocessed.pkl', 'rb') as f:
    train_data = pickle.load(f)


In [None]:
print(valid_data[0])

In [None]:
#creating a prompt for the model using the ast of the buggy code and the traceback information
def create_prompt(record):
    # Start with the buggy code
    prompt = f"### Buggy Code AST:\n{record['old_ast_json']}\n\n"

    # Include traceback information if available
    if record.get('traceback_type') or record.get('full_traceback'):
        prompt += f"### Traceback:\n{record.get('traceback_type', '')}: {record.get('full_traceback', '')}\n\n"

    # Instruction for the model to output the fix
    prompt += "### Provide the corrected code AST below:\n"
    return prompt

In [None]:
#creating a prompt for the model using the buggy code and the traceback information
def create_prompt(record):
    # Start with the buggy code
    prompt = f"### Buggy Code AST:\n{record['before_merge']}\n\n"

    # Include traceback information if available
    if record.get('traceback_type') or record.get('full_traceback'):
        prompt += f"### Traceback:\n{record.get('traceback_type', '')}: {record.get('full_traceback', '')}\n\n"

    # Instruction for the model to output the fix
    prompt += "### Provide the corrected code below:\n"
    return prompt

In [None]:
#preparing the dataset for training
def prepare_record(record):
    return {
        "input": create_prompt(record),  
        "output": record["new_ast_json"]    
    }

prepared_train = [prepare_record(r) for r in train_data]

In [None]:
#preparing the dataset for training
def prepare_record(record):
    return {
        "input": create_prompt(record),  
        "output": record["after_merge"]    
    }

prepared_train = [prepare_record(r) for r in train_data]

In [None]:
#load tokenizer and model
model_name = "meta-llama/CodeLlama-7b-hf"
hf_token = "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"
tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
def tokenize_record(record, max_length=512):
    tokenized_input = tokenizer(
        record["input"], truncation=True, padding="max_length", max_length=max_length, return_tensors="pt"
    )
    tokenized_output = tokenizer(
        record["output"], truncation=True, padding="max_length", max_length=max_length, return_tensors="pt"
    )
    return {
        "input_ids": tokenized_input["input_ids"].squeeze(),
        "attention_mask": tokenized_input["attention_mask"].squeeze(),
        "labels": tokenized_output["input_ids"].squeeze()
    }

tokenized_train = [tokenize_record(r) for r in prepared_train]

In [None]:
print("Sample tokenized training record:")
print(tokenized_train[0])

In [None]:
print(tokenizer.decode(tokenized_train[0]["input_ids"]))
print(tokenizer.decode(tokenized_train[0]["labels"]))

In [None]:
# Convert to a Hugging Face Dataset.
train_dataset = Dataset.from_dict({
    "input_ids": [x["input_ids"].tolist() for x in tokenized_train],
    "attention_mask": [x["attention_mask"].tolist() for x in tokenized_train],
    "labels": [x["labels"].tolist() for x in tokenized_train],
})



In [None]:
print("A tokenized training sample:")
print(tokenizer.decode(tokenized_train[0]["input_ids"]))

In [None]:
#Create a BitsAndBytesConfig object to replace deprecated arguments
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,               
    bnb_4bit_compute_dtype="float16", 
    bnb_4bit_quant_type="nf4",      
    bnb_4bit_use_double_quant=True   
)

In [None]:
#Load the model
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    token=hf_token, 
    quantization_config=quantization_config,
    device_map="auto"  
)

In [None]:
#Wrap the model with LoRA adapters using the PEFT library

model = prepare_model_for_kbit_training(model)

# Define your LoRA configuration.
lora_config = LoraConfig(
    r=8,                     
    lora_alpha=32,           
    target_modules=["q_proj", "v_proj"],  
    lora_dropout=0.1,        
    bias="none"
)

# Wrap the model with the PEFT LoRA modules.
model = get_peft_model(model, lora_config)
print("Model is now ready for QLoRA fine-tuning!")

In [None]:
# Define a custom callback to print a message (basepoint) at the end of each epoch
class PrintEpochCallback(TrainerCallback):
    def on_epoch_end(self, args, state, control, **kwargs):
        if state.log_history:
            last_log = state.log_history[-1]
            loss_str = f", Loss: {last_log.get('loss', 'N/A')}" if 'loss' in last_log else ""
            print(f"Epoch {state.epoch} completed{loss_str}.")

In [None]:
#Hugging Face’s Trainer API to set up training parameters

# Training arguments
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/fine-tuned/codellama-finetuned-ast",
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    gradient_checkpointing=True,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=10,
    save_strategy="epoch",
    label_names=["labels"],
    learning_rate=1e-4,
    fp16=True,  # Mixed-precision training if using CUDA
    push_to_hub=False
)


data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False 
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    data_collator=data_collator,
    callbacks=[PrintEpochCallback()]
)

In [None]:
trainer.train()