In [None]:
!pip install transformers sentencepiece

In [None]:
!pip install torchtext==0.6.0

In [None]:
!pip install evaluate sacrebleu

In [None]:
# Specify the filename of loss file txt
output_filename = "averageLosses.txt"

# Open the file in write mode
with open(output_filename, "w") as f:
    # Write a space character to the file
    f.write(" ")

print(f"A space character has been written to {output_filename}.")

In [None]:
#Training Code

from transformers import RobertaTokenizer, T5ForConditionalGeneration, AdamW
import torch
import csv
import torch.nn.functional as F
import numpy as np
from tqdm.notebook import tqdm 

# Check if a GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Read data from train.tsv and create training examples
training_examples = []

#TODO: REPLACE FOLLOWING WITH TRAINING DATA (TSV FILE) PATH
with open('/PATH/TO/TRAINING.TSV', 'r') as tsvfile:
    reader = csv.reader(tsvfile, delimiter='\t')
    next(reader)  # Skip header row
    for row in reader:
        fixed_sentence = row[0]
        buggy_sentence = row[1]
        training_examples.append((fixed_sentence, buggy_sentence))

# Load codeT5 tokenizer and model
t5_tokenizer = RobertaTokenizer.from_pretrained("Salesforce/codet5-small")
t5_model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-small")

# Move models to GPU
t5_model.to(device)

# Set pad token ID
t5_model.config.pad_token_id = t5_model.config.eos_token_id

# Define optimizer and loss function
optimizer = AdamW(t5_model.parameters(), lr=1e-5)
loss_fn = torch.nn.CrossEntropyLoss()

#TODO: REPLACE THE FOLLOWING WITH LOSS FILE TXT PATH
loss_file = "/PATH/TO/averageLosses.txt"

load = False
if load:
    #TODO: REPLACE THE FOLLOWING WITH MODEL TRAINING CHECKPOINT .PT FILE
    t5_model.load_state_dict(torch.load("/PATH/TO/CodeT5Model.pt"))

# Training loop
batch_size = 16
average_losses = []
for epoch in range(30):  # Number of epochs
    total_loss = 0.0

    tokenized_example_subset = training_examples

    # Shuffle the tokenized examples
    np.random.shuffle(tokenized_example_subset)

    num_batches = len(tokenized_example_subset) // batch_size

    for batch_idx in tqdm(range(num_batches), desc=f"Epoch {epoch+1}"):
        start_idx = batch_idx * batch_size
        end_idx = start_idx + batch_size

        batch_examples = tokenized_example_subset[start_idx:end_idx]

        input_code_tensors = []
        target_code_tensors = []

        for example in batch_examples:
            tokenized_incorrect, tokenized_correct = example

            # Tokenize using the T5 tokenizer
            encoded_inputs = t5_tokenizer(tokenized_incorrect, padding='max_length', max_length=512, return_tensors='pt', truncation=True)
            input_code_tensor = encoded_inputs.input_ids.to(device)
            
            encoded_targets = t5_tokenizer(tokenized_correct, padding='max_length', max_length=512, return_tensors='pt', truncation=True)
            target_code_tensor = encoded_targets.input_ids.to(device)

            input_code_tensors.append(input_code_tensor)
            target_code_tensors.append(target_code_tensor)

        # Create batch tensors
        input_code_tensors = torch.cat(input_code_tensors, dim=0)
        target_code_tensors = torch.cat(target_code_tensors, dim=0)

        # Create attention mask
        attention_masks = torch.where(input_code_tensors != 0, torch.tensor(1), input_code_tensors)

        # Clear gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = t5_model(input_ids=input_code_tensors, attention_mask=attention_masks, labels=target_code_tensors)
        loss = outputs.loss

        # Backward pass
        loss.backward()

        # Update parameters
        optimizer.step()

        total_loss += loss.item()

    # Save model checkpoint at every epoch
    #TODO REPLACE WITH TARGET SAVE LOCATION FOR THE MODEL .PT FILE
    torch.save(t5_model.state_dict(), f"/PATH/TO/CodeT5Model.pt")

    average_loss = total_loss / num_batches
    print(f"Saved: Epoch {epoch+1}: Average Loss = {average_loss}")
    average_losses.append(average_loss)

    #write epoch training loss to file
    with open(loss_file, "a") as f:
      f.write(str(average_loss) + ", \n")

print("Average training loss:")
print(average_losses)

In [None]:
#Inference Code

from transformers import RobertaTokenizer, T5ForConditionalGeneration
import torch
import pandas as pd
import evaluate

# Check GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Custom tokens (if needed)
custom_tokens = []

# Load tokenizer and pre-trained model
t5_tokenizer = RobertaTokenizer.from_pretrained("Salesforce/codet5-small")
t5_model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-small")

# Add custom tokens to the tokenizer
t5_tokenizer.add_tokens(custom_tokens)
t5_model.resize_token_embeddings(len(t5_tokenizer))

# Load the saved model checkpoint
#TODO: REPLACE THE FOLLOWING WITH MODEL PATH
model_checkpoint_path = "/PATH/TO/CodeT5Model.pt"
t5_model.load_state_dict(torch.load(model_checkpoint_path, map_location=device))  # Use map_location to ensure it's loaded to the correct device
t5_model.to(device)  # Move the model to the correct device (GPU or CPU)
t5_model.eval()

# Load  test.tsv file
#TODO: REPLACE WITH TEST DATASET PATH
test_file_path = "/PATH/TO/Test.tsv" 
df = pd.read_csv(test_file_path, sep='\t')
total_score = 0
num_iterations = 0

# output file for writing for writing loss
# TODO: Specify the path where you want to save the file
output_file_path = "output_predictions.txt"
with open(output_file_path, "w", encoding="utf-8") as output_file:
    for index, row in df.iterrows():
        #input text
        input_code = row["fix"]

        # Tokenize the input text
        input_ids = t5_tokenizer.encode(input_code, return_tensors="pt")

        # Move inputs to the correct device (GPU or CPU)
        input_ids = input_ids.to(device)

        # Perform inference
        with torch.no_grad():
            output = t5_model.generate(input_ids, max_length=1000, num_beams=5, early_stopping=True)

        # Convert the output token IDs back to text
        output_code = t5_tokenizer.decode(output[0], skip_special_tokens=True)
        target_code = row["bug"]
        
        prediction = [output_code]
        reference = [[target_code]]
        
        # Calculate CHRF score
        chrf = evaluate.load("chrf")
        results = chrf.compute(predictions=prediction, references=reference)
        score = results["score"]
        
        # Print the output
        print("Round:", index)
        print("Chrf Score:", score)
        print()

        # Write the data to the output file
        output_file.write("Round: {}\n".format(index))
        output_file.write("Input: {}\n".format(input_code))
        output_file.write("Output: {}\n".format(output_code))
        output_file.write("Target: {}\n".format(target_code))
        output_file.write("CHRF Score: {}\n".format(score))
        output_file.write("\n")  # Add a blank line between entries
        
        total_score = total_score + score
        num_iterations = num_iterations + 1

print("Predictions saved to:", output_file_path)
print("Average CHRF score:", total_score/num_iterations)
