In [1]:
# Hugging Face - Fine-Tuning CodeT5 for Code Translation (AI4SE Focus)

# This notebook demonstrates how to fine-tune the CodeT5 model using Hugging Face Transformers
# for a Software Engineering task: translating Python code to Java.

# ------------------------
# 1. Install Required Libraries
# ------------------------
!pip install torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://download.pytorch.org/whl/cu124
!pip install transformers datasets evaluate -q
! pip install transformers
!pip install tree_sitter==0.2.0
! git clone -q https://github.com/microsoft/CodeXGLUE.git
!pip install evaluate
!pip install nltk

Looking in indexes: https://download.pytorch.org/whl/cu124
Collecting torch==2.5.1
  Downloading https://download.pytorch.org/whl/cu124/torch-2.5.1%2Bcu124-cp311-cp311-linux_x86_64.whl (908.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m908.3/908.3 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torchvision==0.20.1
  Downloading https://download.pytorch.org/whl/cu124/torchvision-0.20.1%2Bcu124-cp311-cp311-linux_x86_64.whl (7.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.3/7.3 MB[0m [31m108.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torchaudio==2.5.1
  Downloading https://download.pytorch.org/whl/cu124/torchaudio-2.5.1%2Bcu124-cp311-cp311-linux_x86_64.whl (3.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m98.2 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.5.1)
  Downloading https://download.pytorch.org/whl/cu124/nvidia_cuda_nvrtc_cu1

In [12]:
# ------------------------------------------------------------------------
# 2. Load Dataset (CodeXGLUE - Code Translation Java <=> C#)
# ------------------------------------------------------------------------
from datasets import load_dataset


# CodeXGLUE is a benchmark dataset collection by Microsoft for code-related tasks.

# Here, we use the provided training, validation, and testing datasets.

import pandas as pd

# training_data = pd.read_csv(filepath_or_buffer="/content/ft_train.csv", dtype={'cleaned_method':str, 'target_block':str, 'tokens_in_method':int})
# testing_data = pd.read_csv(filepath_or_buffer="/content/ft_test.csv", dtype={'cleaned_method':str, 'target_block':str, 'tokens_in_method':int})
# validation_data = pd.read_csv(filepath_or_buffer="/content/ft_valid.csv", dtype={'cleaned_method':str, 'target_block':str, 'tokens_in_method':int})


# Calculate the number of rows to read for each dataset (initially used to try this on a smaller dataset)
training_rows = int(1 * 50000)  # % of 50000
testing_rows = int(1 * 5000)   # % of 5000
validation_rows = int(1 * 5000) # % of 5000

# Read the first 10% of each dataset - using 1% of the rows is temporary while I fine-tune the model
training_data = pd.read_csv(filepath_or_buffer="/content/ft_train.csv", dtype={'cleaned_method':str, 'target_block':str, 'tokens_in_method':int}, nrows=training_rows)
testing_data = pd.read_csv(filepath_or_buffer="/content/ft_test.csv", dtype={'cleaned_method':str, 'target_block':str, 'tokens_in_method':int}, nrows=testing_rows)
validation_data = pd.read_csv(filepath_or_buffer="/content/ft_valid.csv", dtype={'cleaned_method':str, 'target_block':str, 'tokens_in_method':int}, nrows=validation_rows)


# print(validation_data.head())
# validation_data.loc[0][0]
# print("test")
# print(flatten_and_tabize_string(validation_data.iloc[0][0]))

# OLD CODE FROM THE PROVIDED VERSION OF THIS FILE BELOW:

# # Here, we use the code-translation-python-java dataset.
# dataset = load_dataset("google/code_x_glue_cc_code_to_code_trans")

# # Dataset contains: 'train', 'validation', 'test' splits
# print("Sample Python Code:", dataset['train'][0]['java'])
# print("Target Java Code:", dataset['train'][0]['cs'])



✅ This following loads a pre-trained models & tokenizer from Hugging Face using the checkpoint name (e.g., "Salesforce/codet5-small").


*  The tokenizer knows how to convert text into tokens that the model

*   It also handles things like padding, truncation, special tokens, etc.

*	It comes with a fixed vocabulary learned during pretraining, that however we can expand if needed as shown

In [14]:
# ------------------------------------------------------------------------
# 3. Load Pre-trained Model & Tokenizer
# ------------------------------------------------------------------------

# 3. Load Pre-trained Model & Tokenizer
from transformers import T5ForConditionalGeneration, AutoTokenizer
import torch

# Check device (GPU or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the pre-trained model and tokenizer
model_checkpoint = "Salesforce/codet5-small"
model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

# Move model to the correct device
model = model.to(device)

# Add special tokens if needed
tokenizer.add_tokens(["<IF-STMT>", "<TAB>"])
model.resize_token_embeddings(len(tokenizer))





# from transformers import T5ForConditionalGeneration, AutoModelForSeq2SeqLM
# from transformers import RobertaTokenizer
# from datasets import DatasetDict
# from transformers import TrainingArguments, Trainer
# from transformers import EarlyStoppingCallback

# model_checkpoint = "Salesforce/codet5-small"

# model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)

# tokenizer = RobertaTokenizer.from_pretrained(model_checkpoint)
# tokenizer.add_tokens(["<IF-STMT>", "<TAB>"]) #Imagine we need an extra token. This line adds the extra token to the vocabulary

# model.resize_token_embeddings(len(tokenizer))






Embedding(32102, 512)

⚠️⚠️⚠️ If you add new tokens like this, you must also resize the model’s embedding layer: model.resize_token_embeddings(len(tokenizer))

Otherwise, the model won’t know what to do with the new token IDs!


In [15]:
# ------------------------------------------------------------------------------------------------
# 4. We prepare now the fine-tuning dataset using the tokenizer we preloaded
# ------------------------------------------------------------------------------------------------


def remove_spaces(my_str):
  return my_str.replace(" ", "")

def compare_tokens(a, b): # Checks if two tokens (strings) are the same, up to starting with the Ġ character
  if len(a) == len(b):
    for i in range(len(a)):
      if a[i] != b[i]:
        return False
    #print(a+" and "+b)
    return True
  if len(a) > len(b) and a[0] == "Ġ":
    return compare_tokens(a[1:], b)
  if len(b) > len(a) and b[0] == "Ġ":
    return compare_tokens(b[1:], a)
  return False

# 4. Prepare the fine-tuning dataset using the tokenizer
def flatten_mask_tabize(my_str, target_block):
    my_str = my_str.replace("\n", " ")  # Flatten
    my_str = my_str.replace("    ", " <TAB>")  # Tabize
    tokenized_str = tokenizer.tokenize(my_str)  # Tokenize
    tokenized_target = tokenizer.tokenize(target_block)

    # Mask If Statement
    for i in range(len(tokenized_str) - len(tokenized_target)):
        for j in range(len(tokenized_target)):
            if not compare_tokens(tokenized_str[i + j], tokenized_target[j]):
                break
            if j == len(tokenized_target) - 1:
                tokenized_str = tokenized_str[:i] + ["<IF-STMT>"] + tokenized_str[i + len(tokenized_target):]
                return tokenized_str
    return tokenized_str

def preprocess_dataset(my_dataframe):
    my_dataframe['processed_method'] = ''
    for i in range(len(my_dataframe)):
        my_dataframe.loc[i, 'processed_method'] = tokenizer.convert_tokens_to_string(flatten_mask_tabize(my_dataframe.iloc[i].iloc[0], my_dataframe.iloc[i].iloc[1]))
    return my_dataframe

# Convert datasets to Hugging Face Datasets and preprocess
training_data = preprocess_dataset(training_data)
validation_data = preprocess_dataset(validation_data)
testing_data = preprocess_dataset(testing_data)

# Convert processed methods to lists of strings
training_data['processed_method'] = training_data['processed_method'].apply(lambda x: [x])
validation_data['processed_method'] = validation_data['processed_method'].apply(lambda x: [x])
testing_data['processed_method'] = testing_data['processed_method'].apply(lambda x: [x])

# Convert Pandas DataFrames to Hugging Face Datasets
from datasets import Dataset, DatasetDict
training_data = Dataset.from_pandas(training_data)
validation_data = Dataset.from_pandas(validation_data)
testing_data = Dataset.from_pandas(testing_data)

dataset = DatasetDict()
dataset["train"] = training_data
dataset["validation"] = validation_data
dataset["test"] = testing_data

def preprocess_function(examples):
    inputs = examples["processed_method"]
    targets = examples["target_block"]

    # Flatten the list of lists to a single list of strings
    inputs = [item for sublist in inputs for item in sublist]

    # Convert inputs and targets to token IDs before padding
    inputs = tokenizer(inputs, truncation=True, padding=False)["input_ids"]
    labels = tokenizer(targets, truncation=True, padding=False)["input_ids"]

    # Pad sequences manually
    max_length = 1000
    inputs = [x + [tokenizer.pad_token_id] * (max_length - len(x)) for x in inputs]
    labels = [x + [tokenizer.pad_token_id] * (max_length - len(x)) for x in labels]

    return {"input_ids": inputs, "labels": labels}

# Apply the preprocess function
tokenized_datasets = dataset.map(preprocess_function, batched=True, remove_columns=['cleaned_method', 'target_block', 'tokens_in_method', 'processed_method'])




Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

In [16]:
# 5. Define Training Arguments and Trainer
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback
from transformers import Adafactor

from transformers import Adafactor

# optimizer = Adafactor(
#     model.parameters(),
#     lr=5e-5,                # manually set learning rate
#     relative_step=False,    # must disable relative_step
#     warmup_init=False       # must disable warmup_init if not using relative_step
# )


training_args = TrainingArguments(
    output_dir="./codet5-finetuned",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    learning_rate=5e-5,
    fp16=True,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    gradient_accumulation_steps=3,
    num_train_epochs=4,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    save_total_limit=2,
    logging_steps=100, # initially 100
    push_to_hub=False,
    remove_unused_columns=False
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],
    #optimizers=(optimizer, None),  # Pass the optimizer here
)

# ------------------------
# 6. Train the Model
# ------------------------
trainer.train()

  trainer = Trainer(


Epoch,Training Loss,Validation Loss
1,0.0037,0.003239
2,0.0032,0.002931
3,0.0022,0.002869


There were missing keys in the checkpoint model loaded: ['encoder.embed_tokens.weight', 'decoder.embed_tokens.weight', 'lm_head.weight'].


TrainOutput(global_step=33332, training_loss=0.008082446082621781, metrics={'train_runtime': 8751.0092, 'train_samples_per_second': 22.855, 'train_steps_per_second': 3.809, 'total_flos': 5.286260441088e+16, 'train_loss': 0.008082446082621781, 'epoch': 3.9996})

In [17]:
# ------------------------
# 7. Evaluate on Test Set
# ------------------------
metrics = trainer.evaluate(tokenized_datasets["test"])
print("Test Evaluation Metrics:", metrics)

# ------------------------
# 8. Test Code Translation
# ------------------------

# input_code = "def add(a, b):\n    return a + b"
# inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True)
# inputs = {k: v.to(device) for k, v in inputs.items()}  # Move inputs to device

# outputs = model.generate(**inputs, max_length=256)
# print("Generated Java Code:\n", tokenizer.decode(outputs[0], skip_special_tokens=True))

# 8. Generate Predictions and Save to Files
predictions = []
targets = []

for example in tokenized_datasets["test"]:
    input_code = tokenizer.decode(example["input_ids"], skip_special_tokens=True)  # Decode from input_ids to get original masked code
    target_code = tokenizer.decode(example["labels"], skip_special_tokens=True)   # Decode from labels to get target code

    # Tokenize and generate predictions
    inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True).to(device)
    output = model.generate(**inputs, max_length=256)

    # Decode predictions and store them
    decoded_output = tokenizer.decode(output[0], skip_special_tokens=True)
    predictions.append(decoded_output)
    targets.append(target_code)

from sklearn.metrics import f1_score

# Convert to exact match labels: 1 for correct, 0 for incorrect
exact_matches = [pred.strip() == tgt.strip() for pred, tgt in zip(predictions, targets)]
y_true = [1] * len(exact_matches)  # Ground truth is always 1 (correct if it matches)
y_pred = [1 if match else 0 for match in exact_matches]  # Predicted match or not

# Compute F1 score
f1 = f1_score(y_true, y_pred)
print(f"F1 Score: {f1:.4f}")


# Save predictions and targets to text files
with open("/content/predictions.txt", "w") as pred_file:
    for pred in predictions:
        pred_file.write(pred + "\n")

with open("/content/targets.txt", "w") as target_file:
    for target in targets:
        target_file.write(target + "\n")

import subprocess

# ... (rest of your code) ...

# Define the command for BLEU metric calculation
bleu_command = [
    "python", "/content/CodeXGLUE/Code-Code/code-to-code-trans/evaluator/CodeBLEU/calc_code_bleu.py",
    "--refs", "/content/targets.txt",
    "--hyp", "/content/predictions.txt",
    "--lang", "java",
    "--params", "0.25,0.25,0.25,0.25"
]

# Change to the correct directory before running the command
import os
os.chdir("/content/CodeXGLUE/Code-Code/code-to-code-trans/evaluator/CodeBLEU")

# Run the command
try:
    result = subprocess.run(bleu_command, check=True, capture_output=True, text=True)
    with open("/content/bleuresults.txt", "w") as target_file:
      target_file.write(result.stdout + "\n")
    print("BLEU Score Calculation Output:", result.stdout)
except subprocess.CalledProcessError as e:
    print("Error during BLEU score calculation:", e)
    print("Error Output:", e.stderr)

import csv
import subprocess
import os
import tempfile
from tqdm import tqdm
from transformers import set_seed

set_seed(42)

# Ensure you're in the right directory for CodeBLEU script
codebleu_script = "/content/CodeXGLUE/Code-Code/code-to-code-trans/evaluator/CodeBLEU/calc_code_bleu.py"
codebleu_dir = os.path.dirname(codebleu_script)
os.chdir(codebleu_dir)

results = []
predictions = []
targets = []

csv_file = "/content/testset-results.csv"

with open(csv_file, mode="w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow([
        "Input Function with Masked If Condition",
        "Exact Match (true/false)",
        "Expected If Condition",
        "Predicted If Condition",
        "CodeBLEU Score (0-100)",
        "BLEU-4 Score (0-100)"
    ])

    for example in tqdm(tokenized_datasets["test"]):
        input_code = tokenizer.decode(example["input_ids"], skip_special_tokens=True)
        target_code = tokenizer.decode(example["labels"], skip_special_tokens=True)

        inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True).to(device)
        output = model.generate(**inputs, max_length=256)
        predicted_code = tokenizer.decode(output[0], skip_special_tokens=True)

        predictions.append(predicted_code)
        targets.append(target_code)

        # Create temp files for this one example
        with tempfile.NamedTemporaryFile(mode='w+', delete=False) as ref_file, \
             tempfile.NamedTemporaryFile(mode='w+', delete=False) as hyp_file:

            ref_file.write(target_code.strip() + "\n")
            hyp_file.write(predicted_code.strip() + "\n")
            ref_file.flush()
            hyp_file.flush()

            # Run CodeBLEU
            try:
                result = subprocess.run(
                    [
                        "python", codebleu_script,
                        "--refs", ref_file.name,
                        "--hyp", hyp_file.name,
                        "--lang", "java",
                        "--params", "0.25,0.25,0.25,0.25"
                    ],
                    check=True, capture_output=True, text=True
                )

                codebleu_output = result.stdout.strip().splitlines()
                # Parse scores from output safely
                codebleu_score, bleu4_score = 0.0, 0.0
                for line in codebleu_output:
                    if "CodeBLEU score:" in line:
                        codebleu_score = float(line.split(":")[-1].strip())
                    elif "BLEU-4:" in line:
                        bleu4_score = float(line.split(":")[-1].strip())

                # with open("/content/bleuresults.txt", "w") as target_file:
                #     for line in codebleu_output:
                #         if "CodeBLEU score:" in line:
                #             codebleu_score = float(line.split(":")[-1].strip())
                #             target_file.write(str(codebleu_score) + "\n")
                #         elif "BLEU-4:" in line:
                #             bleu4_score = float(line.split(":")[-1].strip())
                #             target_file.write(str(bleu4_score) + "\n")


            except subprocess.CalledProcessError as e:
                print("CodeBLEU error:", e.stderr)
                codebleu_score = 0.0
                bleu4_score = 0.0

        # Extract just the condition for display (you could refine this with regex if needed)
        expected_condition = target_code.strip()
        predicted_condition = predicted_code.strip()
        exact_match = expected_condition == predicted_condition

        writer.writerow([
            input_code.strip(),
            str(exact_match).lower(),
            expected_condition,
            predicted_condition,
            round(codebleu_score, 2),
            round(bleu4_score, 2)
        ])


# Load and print the first 10 rows
df2 = pd.read_csv("/content/testset-results.csv")
print(df2.head(10))

from google.colab import files
files.download('/content/testset-results.csv')
files.download('/content/predictions.txt')
files.download('/content/targets.txt')
files.download('/content/bleuresults.txt')



Test Evaluation Metrics: {'eval_loss': 0.002970718080177903, 'eval_runtime': 69.3469, 'eval_samples_per_second': 72.101, 'eval_steps_per_second': 36.051, 'epoch': 3.9996}
F1 Score: 0.7407
BLEU Score Calculation Output: ngram match: 0.7253817006576274, weighted ngram match: 0.729027673186805, syntax_match: 0.7073335130763009, dataflow_match: 0.7406679764243614
CodeBLEU score:  0.7256027158362737



100%|██████████| 5000/5000 [28:58<00:00,  2.88it/s]

             Input Function with Masked If Condition  \
0  def read(self, count=True, timeout=None, ignor...   
1  def _cache_mem(curr_out, prev_mem, mem_len, re...   
2  def filtered(gen): <TAB> for example in gen: <...   
3  def search(self, query): <TAB> # "Search.ashx?...   
4  def _check_script(self, script, directive): <T...   
5  def getAllDataLinkIDs(): <TAB> linkDataIDs = s...   
6  def _stderr_supports_color(): <TAB> try: <TAB>...   
7  def offsets(self): <TAB> offsets = {} <TAB> of...   
8  def Restore(self): <TAB> picker, obj = self._w...   
9  def dt_s_tup_to_string(dt_s_tup): <TAB> dt_str...   

   Exact Match (true/false)  \
0                     False   
1                      True   
2                      True   
3                      True   
4                     False   
5                      True   
6                      True   
7                     False   
8                     False   
9                     False   

                               Expected I




<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>