In [1]:
# Install necessary dependencies
!pip install torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://download.pytorch.org/whl/cu124
!pip install transformers datasets evaluate -q

Looking in indexes: https://download.pytorch.org/whl/cu124
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.5.1)
  Downloading https://download.pytorch.org/whl/cu124/nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl (24.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m78.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.5.1)
  Downloading https://download.pytorch.org/whl/cu124/nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl (883 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m24.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nvidia-cuda-cupti-cu12==12.4.127 (from torch==2.5.1)
  Downloading https://download.pytorch.org/whl/cu124/nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl (13.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m106.2 MB/s[0m et

In [2]:
import pandas as pd
import re
import numpy as np
from pygments.lexers import PythonLexer

train_dataframe = pd.read_csv("ft_train.csv")
valid_dataframe = pd.read_csv("ft_valid.csv")
test_dataframe = pd.read_csv("ft_test.csv")

"""
Parses methods for pre-processing and masking

Args:
csv (csv file): CSV file containing raw methods

Returns: List of flattened and masked methods
"""
def create_dataset(csv):
  data = []
  for d in range(csv.shape[0]):
    target = csv["target_block"][d]
    code = csv["cleaned_method"][d].replace('\n', '').replace('    ', " <TAB> ").replace('  ', ' ')
    indices = [m.start() for m in re.finditer('if', code)] + [n.start() for n in re.finditer('elif', code)]

    replaced = False
    for i in indices:
      # In the event of identical if statements, replace only the first one
      if replaced:
        break
      if_start = i;
      for c in range(if_start, len(code)):
        if code[c] != ':':
          continue
        else:
          # Found substring with an if statement
          if_end = c + 1
          # Check whether the statement is the target statement
          # Done by removing all whitespace in statement/target and comparing
          code_check = ''.join(code[if_start:if_end].split())
          target_check = ''.join(target.split())
          if code_check == target_check:
            code = code.replace(code[if_start:if_end], "<IF-STMT>")
            # Tokenization comes AFTER masking the method
            lexer = PythonLexer()
            tokens = [t[1] for t in lexer.get_tokens(code)]
            replaced = True
          break
    data += [' '.join(tokens).strip().replace("< TAB >", "<TAB>").replace("< IF - STMT >", "<IF-STMT>").replace('   ', ' ')]
  return data

"""
Creates list of target if statements

Args:
csv (csv file): CSV file containing target if statements

Returns: List of target if statements
"""
def create_labels(csv):
  data = []
  for d in range(csv.shape[0]):
    target = csv["target_block"][d]
    data += [target]
  return data

In [79]:
# Turn provided data into usable format
train_data = create_dataset(train_dataframe)
train_labels = create_labels(train_dataframe)

valid_data = create_dataset(valid_dataframe)
valid_labels = create_labels(valid_dataframe)

test_data = create_dataset(test_dataframe)
test_labels = create_labels(test_dataframe)

In [4]:
# Turn provided data into pandas dataframes
train_frame = pd.DataFrame(columns=['input'], data=train_data)
train_label_frame = pd.DataFrame(columns=['text'], data=train_labels)
train_ids = pd.DataFrame(columns=['id'], data=range(0,len(train_data)))

valid_frame = pd.DataFrame(columns=['input'], data=valid_data)
valid_label_frame = pd.DataFrame(columns=['text'], data=valid_labels)
valid_ids = pd.DataFrame(columns=['id'], data=range(0,len(valid_data)))

test_frame = pd.DataFrame(columns=['input'], data=test_data)
test_label_frame = pd.DataFrame(columns=['text'], data=test_labels)
test_ids = pd.DataFrame(columns=['id'], data=range(0,len(test_data)))

# Concatenate respective dataframes
train_combined = pd.concat([train_ids, train_frame, train_label_frame], axis=1)
# Randomly sample half of the elements to speed up training
train_combined = train_combined.sample(frac=0.5, random_state=42)
valid_combined = pd.concat([valid_ids, valid_frame, valid_label_frame], axis=1)
test_combined = pd.concat([test_ids, test_frame, test_label_frame], axis=1)

In [6]:
# Create csv files for loading dataset
train_combined.to_csv('train_tokenized.csv', index=False)
valid_combined.to_csv('valid_tokenized.csv', index=False)
test_combined.to_csv('test_tokenized.csv', index=False)

In [7]:
from datasets import load_dataset

data_files = {"train": "train_tokenized.csv", "validation": "valid_tokenized.csv", "test": "test_tokenized.csv"}
dataset = load_dataset("csv", data_files=data_files)

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

In [9]:
from transformers import T5ForConditionalGeneration, AutoModelForSeq2SeqLM
from transformers import RobertaTokenizer
from datasets import DatasetDict
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback

model_checkpoint = "Salesforce/codet5-small"

model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)

tokenizer = RobertaTokenizer.from_pretrained(model_checkpoint)
# THIS CORRESPONDS TO THE <MASK> and <TAB> TOKEN
tokenizer.add_tokens(["<IF-STMT>"]) #Imagine we need an extra token. This line adds the extra token to the vocabulary
tokenizer.add_tokens(["<TAB>"])

# Remember to do this if you add a token
model.resize_token_embeddings(len(tokenizer))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.57k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/242M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.48k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/703k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/294k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/12.5k [00:00<?, ?B/s]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


Embedding(32102, 512)

In [10]:
# Creating fine-tuning dataset
def preprocess_function(examples):
    inputs = examples["input"]
    targets = examples["text"]
    # max_length truncates rest of method if method is too long
    model_inputs = tokenizer(inputs, max_length=256, truncation=True, padding="max_length")
    labels = tokenizer(targets, max_length=256, truncation=True, padding="max_length")
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

tokenized_datasets = dataset.map(preprocess_function, batched=True)

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

In [12]:
training_args = TrainingArguments(
    output_dir="./codet5-finetuned5",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    learning_rate=5e-5,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=7,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    save_total_limit=2,
    logging_steps=100,
    push_to_hub=False,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

  trainer = Trainer(


In [None]:
# If desired, directly load trained model instead of manually retraining
!pip install dill
import dill as pickle
from dill import dump, load

with open('codet5-trained.dill', 'rb') as pickle_file:
    model = pickle.load(pickle_file)

In [14]:
# Optional to run, logs into wandb API proactively instead of letting the train() method prompt the user
!pip install wandb
!wandb login

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit: 
[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mdhberger[0m ([33mdhberger-william-mary[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [15]:
# ONLY RUN THIS BLOCK IF YOU INTEND TO MANUALLY TRAIN THE MODEL - WILL TAKE A VERY LONG TIME
trainer.train()

metrics = trainer.evaluate(tokenized_datasets["test"])
print("Test Evaluation Metrics:", metrics)

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33mdhberger[0m ([33mdhberger-william-mary[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch,Training Loss,Validation Loss
1,0.0475,0.03904
2,0.0322,0.038391
3,0.0324,0.038466
4,0.0303,0.037907
5,0.026,0.039066
6,0.0221,0.039385


There were missing keys in the checkpoint model loaded: ['encoder.embed_tokens.weight', 'decoder.embed_tokens.weight', 'lm_head.weight'].


Test Evaluation Metrics: {'eval_loss': 0.03991185128688812, 'eval_runtime': 27.7689, 'eval_samples_per_second': 180.058, 'eval_steps_per_second': 45.014, 'epoch': 6.0}


In [133]:
# Test to make sure outputs generate properly
inputs = tokenizer(tokenized_datasets["test"]["input"][0], return_tensors="pt", padding=True, truncation=True).to("cuda")

In [134]:
outputs = model.generate(**inputs, max_length=256)

In [135]:
print("Generated If Statement:\n", tokenizer.decode(outputs[0], skip_special_tokens=True))
print("Actual If Statement:\n", tokenized_datasets["test"]["text"][0])

Generated If Statement:
 if ignore_timeouts and is_timeouts ( e ) :
Actual If Statement:
 if ignore_timeouts and is_timeout ( e ) :


In [19]:
"""
Generates all of model's test set predictions

Args:
tokenized_dataset: Pre-trained tokenizer dataset
model: Trained model

Returns: List of model predictions for test set
"""
def output_predictions(tokenized_dataset, model):
  predictions = []
  for i in range(0,len(tokenized_datasets["test"]["text"])):
    inputs = tokenizer(tokenized_datasets["test"]["input"][i], return_tensors="pt", padding=True, truncation=True).to("cuda")
    outputs = model.generate(**inputs, max_length=256)
    predictions += [tokenizer.decode(outputs[0], skip_special_tokens=True)]
  return predictions

In [20]:
# May take a long time (~10-15m) to execute - generates all model test predictions
preds = output_predictions(tokenized_datasets, model)

In [21]:
# Create dataframes for predicted and expected if statements
predicted_output = pd.DataFrame(columns=['predicted if'], data=preds)
expected_output = pd.DataFrame(columns=['expected if'], data=tokenized_datasets["test"]["text"])

In [22]:
# FOR GENERATING EXACT MATCHES
matches = []
ratio = 0.0
for i in range(0, len(tokenized_datasets["test"]["text"])):
  matches += [predicted_output['predicted if'][i] == expected_output['expected if'][i]]
  # For calculating exact match score
  if predicted_output['predicted if'][i] == expected_output['expected if'][i]:
    ratio += 1
ratio /= len(matches)
# Create dataframes for exact matches and the corresponding score
exact_match_ratio = pd.DataFrame(columns=['% exact matches'], data=[round(ratio * 100, 2)])
exact_matches = pd.DataFrame(columns=['exact match'], data=matches)

In [32]:
# FOR GENERATING CODEBLEU SCORE
!pip install transformers
!pip install tree_sitter==0.2.0
!git clone -q https://github.com/microsoft/CodeXGLUE.git



In [None]:
# Datasets used to calculate BLEU-4 and CodeBLEU scores - will have masks replaced with actual conditional statement
full_prediction = create_dataset(test_dataframe)
ground_truth = create_dataset(test_dataframe)

In [None]:
# Unmask methods with predicted statement and actual statement, respectively
for i in range(len(preds)):
  full_prediction[i] = full_prediction[i].replace("<IF-STMT>", preds[i])
  ground_truth[i] = ground_truth[i].replace("<IF-STMT>", test_labels[i])

In [85]:
predict_method = pd.DataFrame(columns=['full prediction'], data=full_prediction)
true_method = pd.DataFrame(columns=['true method'], data=ground_truth)
predict_method.to_csv('method-prediction.csv')
true_method.to_csv('method-truth.csv')

In [121]:
# Generate overall model CodeBLEU score
predicted_output.to_csv('model-predictions.csv', index=False)
expected_output.to_csv('model-targets.csv', index=False)
CodeBLEU = !cd /content/CodeXGLUE/Code-Code/code-to-code-trans/evaluator/CodeBLEU/ && python calc_code_bleu.py --refs /content/method-truth.csv --hyp /content/method-prediction.csv --lang python --params 0.25,0.25,0.25,0.25
# Extract number from terminal output using string indexing
CodeBLEU_score = round(float(CodeBLEU[-1][17:]) * 100, 2)
# Create dataframe for CodeBLEU score
CODEBLEU = pd.DataFrame(columns=['Model CodeBLEU score'], data=[CodeBLEU_score])

In [None]:
# This code may take a while (~15-20m) to run. It computes CodeBLEU scores for individual predictions
CodeBLEU_i_score = []
# Hacky solution that runs the CodeBLEU calculation on single rows of the dataset
for i in range(len(preds)):
  pred_i = pd.DataFrame(columns=['pred i'], data=[full_prediction[i]])
  pred_i.to_csv('pred_i.csv')
  truth_i = pd.DataFrame(columns=['truth i'], data=[ground_truth[i]])
  truth_i.to_csv('truth_i.csv')
  CodeBLEU_i = !cd /content/CodeXGLUE/Code-Code/code-to-code-trans/evaluator/CodeBLEU/ && python calc_code_bleu.py --refs /content/truth_i.csv --hyp /content/pred_i.csv --lang python --params 0.25,0.25,0.25,0.25
  CodeBLEU_i_score += [round(float(CodeBLEU_i[-1][17:]) * 100, 2)]

In [118]:
CODEBLUE_INDIVIDUAL = pd.DataFrame(columns=['CodeBLEU score'], data=CodeBLEU_i_score)

In [42]:
# FOR GENERATING BLEU-4 SCORE
!pip install sacrebleu
import evaluate



In [92]:
# Format predictions and targets appropriate for SacreBLEU
predictions = full_prediction
references = [[ref] for ref in ground_truth]
sacrebleu = evaluate.load("sacrebleu")
results = sacrebleu.compute(predictions=predictions, references=references)
individual_results = []
for i in range(len(predictions)):
  individual_results += [sacrebleu.compute(predictions=[predictions[i]], references=[references[i]])["score"]]
# Create dataframe for BLEU-4 score
BLEU_4 = pd.DataFrame(columns=['Average BLEU-4 score'], data=[round(results["score"], 2)])

In [93]:
# Individual method BLEU-4 scores
individual_results = [round(n, 2) for n in individual_results]
BLEU_4_SCORES = pd.DataFrame(columns=['BLEU-4 score'], data=individual_results)

In [136]:
# Finally, concatenate all dataframes to form test set results CSV
testset_results = pd.concat([test_frame, predicted_output, expected_output, exact_matches, BLEU_4_SCORES, CODEBLUE_INDIVIDUAL, exact_match_ratio, CODEBLEU, BLEU_4], axis=1)

testset_results.to_csv('testset-results.csv', index=False)

In [125]:
# Dump trained model as a pickle file
#with open(r'codet5-trained.dill', 'wb') as output_file:
#    pickle.dump(model, output_file)