1. Load Model
2. Test Model
3. Print Raw Results i.e generated refactorings
4. CodeBLEU Evaluation
5. ROGUE1, ROGUE2 and ROGUE-LCS Evaluation
6. METEOR Evaluation
7. Metrics Visualization

In [1]:
# Install necessary libraries

!pip3 install torch
!pip3 install transformers
!pip3 install datasets
!pip3 install sentencepiece

Collecting sentencepiece
  Using cached sentencepiece-0.2.0-cp39-cp39-win_amd64.whl.metadata (8.3 kB)
Using cached sentencepiece-0.2.0-cp39-cp39-win_amd64.whl (991 kB)
Installing collected packages: sentencepiece
Successfully installed sentencepiece-0.2.0


1. Preparation for Training (Part 1/2)
    --> Performing Data Split    

In [4]:
# Preparation for Training
import os
import json
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm

# Load your generated dataset
# Replace 'your_dataset_path' with the actual path to your dataset
dataset_path = os.path.join("data", "analyzed_dataset_test.jsonl")

# Load data from the JSONL file
with open(dataset_path, 'r') as file:
    data = [json.loads(line) for line in file]

# Extract input and target values
magic_number_smells = [item['magic_number_smell'] for item in data]
refactored_codes = [item['refactored_code'] for item in data]

# Split the dataset into training and testing sets
train_magic_number_smells, test_magic_number_smells, train_refactored_codes, test_refactored_codes = train_test_split(
    magic_number_smells, refactored_codes, test_size=0.2, random_state=42
)

# Create dictionaries for training and testing datasets
train_dataset = [{'magic_number_smell': magic_number_smell, 'refactored_code': refactored_code} for magic_number_smell, refactored_code in zip(train_magic_number_smells, train_refactored_codes)]
test_dataset = [{'magic_number_smell': magic_number_smell, 'refactored_code': refactored_code} for magic_number_smell, refactored_code in zip(test_magic_number_smells, test_refactored_codes)]

# Save the datasets to JSONL files
train_file_path = os.path.join("data", "train_dataset.jsonl")
test_file_path = os.path.join("data", "test_dataset.jsonl")

2. Preparation for Training (Part 2/2)
    --> Initializing Tokenizer - CodeT5Tokenizer
    --> Initializing Optimizer - AdamW

In [5]:

import torch
from torch.utils.data import Dataset, DataLoader
from transformers import T5ForConditionalGeneration, T5Tokenizer, AdamW

with open(train_file_path, 'w') as f:
    for item in train_dataset:
        f.write(json.dumps(item) + '\n')

with open(test_file_path, 'w') as f:
    for item in test_dataset:
        f.write(json.dumps(item) + '\n')

# Define a custom dataset class
class CodeDataset(Dataset):
    def __init__(self, data, tokenizer):
        self.data = data
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        magic_number_smell = item['magic_number_smell']
        refactored_code = item['refactored_code']

        # Tokenize and convert to PyTorch tensors
        inputs = self.tokenizer.encode_plus(magic_number_smell, return_tensors='pt', padding='max_length', truncation=True, max_length=512)
        targets = self.tokenizer.encode_plus(refactored_code, return_tensors='pt', padding='max_length', truncation=True, max_length=512)

        return {
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'labels': targets['input_ids'].squeeze(),
        }

# Initialize the T5 tokenizer and model
tokenizer = T5Tokenizer.from_pretrained('t5-small')
model = T5ForConditionalGeneration.from_pretrained('t5-small')

# Create datasets and dataloaders
train_dataset = CodeDataset(train_dataset, tokenizer)
test_dataset = CodeDataset(test_dataset, tokenizer)

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=False)

# Define training parameters
optimizer = AdamW(model.parameters(), lr=1e-4)

# Loading onto processor
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)






T5ForConditionalGeneration(
  (shared): Embedding(32128, 512)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 512)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=512, out_features=512, bias=False)
              (k): Linear(in_features=512, out_features=512, bias=False)
              (v): Linear(in_features=512, out_features=512, bias=False)
              (o): Linear(in_features=512, out_features=512, bias=False)
              (relative_attention_bias): Embedding(32, 8)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseActDense(
              (wi): Linear(in_features=512, out_features=2048, bias=False)
              (wo): Linear(in_features=2048, out_features=512, bias=False)
              (dropout): Drop

3. Print Expected Refactored Code

In [None]:
# Print Expected Refactored Code
print(refactored_codes)

4. Training Loop

In [None]:
# Training Loop


train_losses = []
test_losses = []
last_3_test_losses = []  # Track last 5 test losses for early stopping
max_overfit_epochs = 3  # Maximum consecutive epochs for which test loss can increase before stopping

# Training loop
num_epochs = 40
stop_training = False  # Flag to indicate if training should stop

for epoch in range(num_epochs):
    model.train()
    epoch_train_losses = []
    for batch in tqdm(train_dataloader, desc=f'Epoch {epoch + 1}/{num_epochs} (Training)'):
        inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(**inputs, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()

        epoch_train_losses.append(loss.item())

    # Calculate average training loss for the epoch
    train_loss = sum(epoch_train_losses) / len(epoch_train_losses)
    train_losses.append(train_loss)

    # Evaluate the model on the test dataset
    model.eval()
    epoch_test_losses = []
    with torch.no_grad():
        for batch in tqdm(test_dataloader, desc=f'Epoch {epoch + 1}/{num_epochs} (Testing)'):
            inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
            labels = batch['labels'].to(device)

            outputs = model(**inputs, labels=labels)
            loss = outputs.loss

            epoch_test_losses.append(loss.item())

    # Calculate average testing loss for the epoch
    test_loss = sum(epoch_test_losses) / len(epoch_test_losses)
    test_losses.append(test_loss)

    # Print and/or log the training and testing losses for monitoring
    print(f"Epoch {epoch + 1}/{num_epochs} - Train Loss: {train_loss}, Test Loss: {test_loss}")

    # Save checkpoint after each epoch
    checkpoint_path = f'magic_smell_model_s_3lines_700_e40_b4_epoch_{epoch + 1}.pth'
    torch.save({
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'train_loss': train_loss,
        'test_loss': test_loss
    }, checkpoint_path)
    
    # Early stopping condition for same test losses
    if len(last_3_test_losses) == 3:
        if all(loss == last_3_test_losses[0] for loss in last_3_test_losses):
            print("Early stopping: Test losses remained the same for 3 epochs.")
            stop_training = True
            break
        else:
            last_3_test_losses.pop(0)
    last_3_test_losses.append(test_loss)
    
    # Early stopping condition for overfitting
    if epoch > 0 and test_loss > test_losses[-2]:
        overfit_epochs += 1
        if overfit_epochs >= max_overfit_epochs:
            print(f"Early stopping: Test loss increased continuously for {max_overfit_epochs} epochs.")
            stop_training = True
            break
    else:
        overfit_epochs = 0


    if stop_training:
        break



# Can be used this trained model to deploy of the huggingface or use locally
    
# Save the trained model
model.save_pretrained('magic_smell_model_s_3lines_700_e40_b4')

# Save the tokenizer
tokenizer.save_pretrained('magic_smell_model_s_3lines_700_e40_b4')

In [None]:
# Testing the model
import torch
from tqdm import tqdm

model.eval()
all_references = []  # List to store reference sequences
all_predictions = []  # List to store predicted sequences
all_prediction_ids = []
all_prediction_ids_labelled = []
all_predictions_decoded = []
all_predictions_decoded_labelled = []

with torch.no_grad():
    for batch in tqdm(test_dataloader, desc='Evaluating on Test Dataset'):
        inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
        labels = batch['labels'].to(device)

        # Generate predictions
        predicted_ids = model.generate(**inputs, max_length=512)
        predicted_code = [tokenizer.decode(ids, skip_special_tokens=True) for ids in predicted_ids]

        # Append to reference and prediction lists
        all_references.extend(labels.cpu().numpy())
        all_predictions.extend(predicted_code)

        all_prediction_ids.extend(predicted_ids)
        all_prediction_ids_labelled.extend(predicted_ids.cpu().numpy())

        tokenized_predicted_code = [tokenizer.encode_plus(code, return_tensors='pt', padding='max_length', truncation=True, max_length=512) for code in predicted_code]
        all_predictions_decoded.extend(tokenized_predicted_code)
        labels_predicted = torch.stack([item['input_ids'].squeeze() for item in tokenized_predicted_code])
        # all_predictions_decoded_labelled.extend(labels_predicted.cpu.numpy())
        all_predictions_decoded_labelled.extend(labels_predicted.numpy())


# Save the results to a text file
with open('test_results.txt', 'w') as file:
    for reference, prediction in zip(all_references, all_predictions):
        file.write(f"Reference: {reference}\n")
        file.write(f"Prediction: {prediction}\n\n")


In [None]:
print(all_predictions)

In [8]:
!pip3 install sacrebleu

Collecting sacrebleu
  Downloading sacrebleu-2.4.1-py3-none-any.whl.metadata (57 kB)
     ---------------------------------------- 0.0/57.9 kB ? eta -:--:--
     ---------------------------------------- 57.9/57.9 kB 1.5 MB/s eta 0:00:00
Collecting portalocker (from sacrebleu)
  Downloading portalocker-2.8.2-py3-none-any.whl.metadata (8.5 kB)
Collecting tabulate>=0.8.9 (from sacrebleu)
  Downloading tabulate-0.9.0-py3-none-any.whl.metadata (34 kB)
Collecting lxml (from sacrebleu)
  Downloading lxml-5.2.0-cp39-cp39-win_amd64.whl.metadata (4.0 kB)
Downloading sacrebleu-2.4.1-py3-none-any.whl (106 kB)
   ---------------------------------------- 0.0/106.6 kB ? eta -:--:--
   ---------------------------------------- 106.6/106.6 kB 3.1 MB/s eta 0:00:00
Downloading tabulate-0.9.0-py3-none-any.whl (35 kB)
Downloading lxml-5.2.0-cp39-cp39-win_amd64.whl (3.8 MB)
   ---------------------------------------- 0.0/3.8 MB ? eta -:--:--
   --- ------------------------------------ 0.4/3.8 MB 7.6 MB/s eta

In [None]:
# CodeBLEU Evaluation



import sacrebleu

# Check if the lists are not empty
if all_predictions and refactored_codes:
    # Convert NumPy arrays to Python lists of strings
    references = [str(ref) for ref in refactored_codes]
    predictions = [str(pred) for pred in all_predictions]

    # Calculate CodeBLEU
    codebleu = sacrebleu.corpus_bleu(predictions, [references])
    print(f"CodeBLEU: {codebleu.score}")
    print(refactored_codes)
    print(all_predictions)
else:
    print("Error: Empty prediction or reference list.")


In [None]:
# ROGUE1, ROGUE2 and ROGUE-LCS Evaluation

from rouge_score import rouge_scorer

# Initialize ROUGE scorer
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

# Initialize lists to store individual ROUGE scores
rouge1_scores = []
rouge2_scores = []
rougeL_scores = []

# Iterate over refactored_codes and all_predictions
for ref_code, pred_code in zip(refactored_codes, all_predictions):
    # Calculate ROUGE scores
    scores = scorer.score(ref_code, pred_code)
    
    # Append individual ROUGE scores
    rouge1_scores.append(scores['rouge1'].fmeasure)
    rouge2_scores.append(scores['rouge2'].fmeasure)
    rougeL_scores.append(scores['rougeL'].fmeasure)

# Calculate mean ROUGE scores
mean_rouge1 = sum(rouge1_scores) / len(rouge1_scores)
mean_rouge2 = sum(rouge2_scores) / len(rouge2_scores)
mean_rougeL = sum(rougeL_scores) / len(rougeL_scores)

# Print mean ROUGE scores
print("Mean ROUGE-1:", mean_rouge1)
print("Mean ROUGE-2:", mean_rouge2)
print("Mean ROUGE-L:", mean_rougeL)


In [None]:
# METEOR Evaluation

import nltk
from nltk.translate import meteor_score

# Download WordNet data
nltk.download('wordnet')

# Check if the lists are not empty
if all_predictions and refactored_codes:
    # Convert NumPy arrays to strings
    hypothesis_strings = str(str(pred) for pred in all_predictions)

    # Preprocess references by converting to strings
    references_strings = []
    for ref in refactored_codes:
        # Convert each tokenized reference to a single string
        ref_string = ' '.join([str(token) for token in ref])
        references_strings.append(ref_string)

    # Calculate METEOR score
    meteor_avg_score = meteor_score.meteor_score(references_strings, hypothesis_strings)
    print(f"METEOR: {meteor_avg_score}")
else:
    print("Error: Empty prediction or reference list.")


In [None]:
# Metrics Visualization

import matplotlib.pyplot as plt

# Define metrics
metrics = ['BLEU', 'ROUGE-1', 'ROUGE-2', 'ROUGE-L', 'METEOR']

final_scores = [codebleu.score,  mean_rouge1,  mean_rouge2, mean_rougeL, meteor_avg_score]

# Plotting final scores
plt.bar(metrics, final_scores)

# Add labels and title
plt.xlabel('Metric')
plt.ylabel('Score')
plt.title('Final Evaluation Metrics')

# Show plot
plt.show()