In [1]:
# Hugging Face - Fine-Tuning CodeT5 for Code Translation (AI4SE Focus)

# This notebook demonstrates how to fine-tune the CodeT5 model using Hugging Face Transformers
# for a Software Engineering task: translating Python code to Java.

# ------------------------
# 1. Install Required Libraries
# ------------------------
%pip install pathlib
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
%pip install transformers[torch] datasets evaluate -q
%pip install --upgrade regex

Note: you may need to restart the kernel to use updated packages.
Looking in indexes: https://download.pytorch.org/whl/cu124
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
from transformers import T5ForConditionalGeneration, AutoModelForSeq2SeqLM
from transformers import RobertaTokenizer
from datasets import DatasetDict, Dataset
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback

from datasets import load_dataset
import warnings
import pandas as pd

In [3]:
# ------------------------------------------------------------------------
# 2. Load Dataset (CodeXGLUE - Code Translation Java <=> C#)
# ------------------------------------------------------------------------
import re
warnings.simplefilter(action='ignore', category=FutureWarning)

# CodeXGLUE is a benchmark dataset collection by Microsoft for code-related tasks.
# Here, we use the code-translation-python-java dataset.
#dataset = load_dataset("google/code_x_glue_cc_code_to_code_trans")
train = pd.read_csv('C:/Users/mjwhi/Downloads/ft_train.csv')
val = pd.read_csv('C:/Users/mjwhi/Downloads/ft_valid.csv')
test = pd.read_csv('C:/Users/mjwhi/Downloads/ft_test.csv')
train = train[['cleaned_method', 'target_block']]
val = val[['cleaned_method', 'target_block']]
test = test[['cleaned_method', 'target_block']]
train = train[:2000]
val = val[:100]
test = test[:100]

def create_mask(df):
  df['masked_method'] = df['cleaned_method']
  for index, row in df.iterrows():
    df['cleaned_method'][index] = " ".join(row['cleaned_method'].split())
    df['masked_method'][index] = re.compile(re.escape(df['target_block'][index]).replace(r'\ ', r'\s*')).sub("<IF-STMT>", df['cleaned_method'][index], count=1)
  return df

train = create_mask(train)
val = create_mask(val)
test = create_mask(test)

train.to_csv('train.csv', index=False)
test.to_csv('test.csv', index=False)
#Flatten cleaned_method
#Copy cleaned_method into masked_method
#Mask masked_method

✅ This following loads a pre-trained models & tokenizer from Hugging Face using the checkpoint name (e.g., "Salesforce/codet5-small").


*  The tokenizer knows how to convert text into tokens that the model

*   It also handles things like padding, truncation, special tokens, etc.

*	It comes with a fixed vocabulary learned during pretraining, that however we can expand if needed as shown

In [4]:
# ------------------------------------------------------------------------
# 3. Load Pre-trained Model & Tokenizer
# ------------------------------------------------------------------------
from transformers import T5ForConditionalGeneration, AutoModelForSeq2SeqLM
from transformers import RobertaTokenizer
from datasets import DatasetDict
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback

model_checkpoint = "Salesforce/codet5-small"

model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)

tokenizer = RobertaTokenizer.from_pretrained(model_checkpoint)
tokenizer.add_tokens(["<IF-STMT>"]) #Imagine we need an extra token. This line adds the extra token to the vocabulary

model.resize_token_embeddings(len(tokenizer))




The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


Embedding(32101, 512)

⚠️⚠️⚠️ If you add new tokens like this, you must also resize the model’s embedding layer: model.resize_token_embeddings(len(tokenizer))

Otherwise, the model won’t know what to do with the new token IDs!


In [5]:
# ------------------------------------------------------------------------------------------------
# 4. We prepare now the fine-tuning dataset using the tokenizer we preloaded
# ------------------------------------------------------------------------------------------------

def preprocess_function(examples):
    inputs = examples["masked_method"]
    targets = examples["target_block"]
    model_inputs = tokenizer(inputs, max_length=256, truncation=True, padding="max_length")
    labels = tokenizer(targets, max_length=256, truncation=True, padding="max_length")
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs


train_dataset = Dataset.from_pandas(train)
val_dataset = Dataset.from_pandas(val)
test_dataset = Dataset.from_pandas(test)
dataset = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset,
    'test': test_dataset
})
tokenized_datasets = dataset.map(preprocess_function, batched=True)
#tokenized_datasets

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Map:   0%|          | 0/100 [00:00<?, ? examples/s]

In [6]:
#tokenized_datasets

In [7]:
# ------------------------------------------------------------------------
# 5. Define Training Arguments and Trainer
# ------------------------------------------------------------------------


training_args = TrainingArguments(
    output_dir="./codet5-finetuned",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    learning_rate=5e-5,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=7,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    save_total_limit=2,
    logging_steps=100,
    push_to_hub=False,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

In [8]:
# ------------------------
# 6. Train the Model
# ------------------------
trainer.train()

# ------------------------
# 7. Evaluate on Test Set
# ------------------------
metrics = trainer.evaluate(tokenized_datasets["test"])

trainer.save_model("t5_Model")

print("Test Evaluation Metrics:", metrics)

# ------------------------
# 8. Test Code Translation
# ------------------------
#input_code = "def add(a, b):\n    return a + b"
#inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True)
#outputs = model.generate(**inputs, max_length=256)
#print("Generated Java Code:\n", tokenizer.decode(outputs[0], skip_special_tokens=True))

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 

In [None]:
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# predicted_labels = []  # Store predicted labels
# true_labels = []       # Store true labels

# for i in range(5):
#   input_code = test['masked_method'][i]
#   inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True)
#   inputs = inputs.to(model.device)
#   outputs = model.generate(**inputs, max_length=256)

#   predicted_label = tokenizer.decode(outputs[0], skip_special_tokens=True) # Decode the generated tokens for this input
#   true_label = test['target_block'][i] # Get the true label for this input

#   predicted_labels.append(predicted_label) # Add predicted label to the list
#   true_labels.append(true_label)  # Add true label to the list

#   print("Generated Java Code:\n", predicted_label)
#   #print("Original Java Code:\n", test['cleaned_method'][i])
#   print("Target Java Code:\n", true_label)
#   print("Exact Match: " + str(true_label == predicted_label))

# # Calculate F1 score using the collected true and predicted labels
# print(f1_score(true_labels, predicted_labels, average='macro'))

In [None]:
trainer.save_model("t5_Model")

In [None]:
trainer = AutoModelForSeq2SeqLM.from_pretrained("t5_Model")

In [None]:
! pip install transformers
!pip install tree_sitter==0.2.0
! git clone -q https://github.com/microsoft/CodeXGLUE.git

In [None]:
output_file = pd.DataFrame(columns=['Masked Method', 'Exact Match', 'Expected if Condition', 'Predicted if Condition', 'CodeBLEU Score', 'Bleu4 Score'])#'Masked Method', 'Exact Match', 'Expected if Statement', 'Predicted if Statement', 'CodeBLEU', 'BLEU-4'])

for i in range(len(test)):
  p = open("predicted.txt", "w")
  a = open("actual.txt", "w")
  input_code = test['masked_method'][i]
  inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True)
  inputs = inputs.to(model.device)
  outputs = model.generate(**inputs, max_length=256)

  predicted_label = tokenizer.decode(outputs[0], skip_special_tokens=True) # Decode the generated tokens for this input
  true_label = test['target_block'][i] # Get the true label for this input

  p.write(test['masked_method'][i].replace("<IF-STMT>", predicted_label) )
  a.write(test['masked_method'][i].replace("<IF-STMT>", true_label))

  p.close()
  a.close()

  exact_match = str(true_label == predicted_label)

  start_index = input_code.index("<IF-STMT>")
  extra_length = len(predicted_label) - len(input_code)

  codeBleuRaw = !cd /content/CodeXGLUE/Code-Code/code-to-code-trans/evaluator/CodeBLEU/ && python calc_code_bleu.py --refs /content/actual.txt --hyp /content/predicted.txt --lang java --params 0.25,0.25,0.25,0.25
  codeBleu = codeBleuRaw[1].split()[-1]
  bleu4 = codeBleuRaw[0].split()[2][:-1]

  output_file.loc[i] = [input_code, exact_match, test['target_block'][i], predicted_label, codeBleu, bleu4]

In [None]:
output_file

In [None]:
output_file.to_csv("testset-results.csv")