In [3]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
import torchmetrics
import pytorch_lightning as pl
from torchmetrics.classification import MultilabelPrecision, MultilabelRecall, MultilabelF1Score
from pytorch_lightning.loggers import CSVLogger



splits = {'java_train': 'data/java_train-00000-of-00001.parquet', 'java_test': 'data/java_test-00000-of-00001.parquet', 'python_train': 'data/python_train-00000-of-00001.parquet', 'python_test': 'data/python_test-00000-of-00001.parquet', 'pharo_train': 'data/pharo_train-00000-of-00001.parquet', 'pharo_test': 'data/pharo_test-00000-of-00001.parquet'}

java_train = pd.read_parquet("hf://datasets/NLBSE/nlbse25-code-comment-classification/" + splits["java_train"])
java_test = pd.read_parquet("hf://datasets/NLBSE/nlbse25-code-comment-classification/" + splits["java_test"])

python_train = pd.read_parquet("hf://datasets/NLBSE/nlbse25-code-comment-classification/" + splits["python_train"])
python_test = pd.read_parquet("hf://datasets/NLBSE/nlbse25-code-comment-classification/" + splits["python_test"])

pharo_train = pd.read_parquet("hf://datasets/NLBSE/nlbse25-code-comment-classification/" + splits["pharo_train"])
pharo_test = pd.read_parquet("hf://datasets/NLBSE/nlbse25-code-comment-classification/" + splits["pharo_test"])

# Split Java dataset
java_train_data, java_val_data = train_test_split(java_train, test_size=0.2, random_state=42)

# Split Python dataset
python_train_data, python_val_data = train_test_split(python_train, test_size=0.2, random_state=42)

# Split Pharo dataset
pharo_train_data, pharo_val_data = train_test_split(pharo_train, test_size=0.2, random_state=42)

print(f"Java train size: {len(java_train_data)}, Java val size: {len(java_val_data)}")
print(f"Python train size: {len(python_train_data)}, Python val size: {len(python_val_data)}")
print(f"Pharo train size: {len(pharo_train_data)}, Pharo val size: {len(pharo_val_data)}")

#print(java_train.iloc[0, :])
#print(python_train.iloc[0, :])
#print(pharo_train.iloc[0, :])

ModuleNotFoundError: No module named 'pandas'

In [None]:
class JavaCommentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.comments = dataframe['combo'].tolist()
        self.labels = dataframe['labels'].tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.comments)
    
    def __getitem__(self, idx):
        # Tokenize the text
        text = self.comments[idx]
        tokens = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors='pt'
        )

        # Process labels
        label = torch.tensor(self.labels[idx], dtype=torch.float)
        
        # Reshape input for CNN
        input_ids = tokens['input_ids'].squeeze(0)
        
        # Reshape embeddings to match CNN input format [batch_size, channels, sequence_length, embedding_dim]
        cnn_input = input_ids.unsqueeze(0)
        
        return {
            'input_ids': cnn_input,
            'labels': label
        }

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")
max_len = 512 

# Prepare Dataset
train_dataset = JavaCommentDataset(java_train_data, tokenizer, max_len)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = JavaCommentDataset(java_val_data, tokenizer, max_len)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

test_dataset = JavaCommentDataset(java_test, tokenizer, max_len)
test_loader = DataLoader(test_dataset, batch_size=32)


In [None]:
class PyTorchCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes):
        super(PyTorchCNN, self).__init__()
        # Embedding layer
        self.embedding = torch.nn.Embedding(num_embeddings=vocab_size, embedding_dim=embed_dim)
        # CNN layers definition
        self.cnn_layers = torch.nn.Sequential(
            torch.nn.Conv2d(1, 3, kernel_size=(5, embed_dim)),  # Example embedding size
            torch.nn.BatchNorm2d(3),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=(2, 1)), 
            
            torch.nn.Conv2d(3, 16, kernel_size=(3, 1)),
            torch.nn.BatchNorm2d(16),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=(2, 1)), 
            
            torch.nn.Conv2d(16, 32, kernel_size=(3, 1)),
            torch.nn.BatchNorm2d(32),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=(2, 1))
        )
        # Dynamically calculate flattened size
        self.flattened_size = self._get_flattened_size(embed_dim)
        # Fully connected layers
        self.fc_layers = torch.nn.Sequential(
            torch.nn.Linear(self.flattened_size, 512),
            torch.nn.BatchNorm1d(512),
            torch.nn.ReLU(),
            torch.nn.Linear(512, num_classes)  # Output layer
        )
    def _get_flattened_size(self, embed_dim):
        """
        Computes the size of the flattened output after the CNN layers.
        """
        with torch.no_grad():
            # Create a dummy input of shape [batch_size=1, channels=1, sequence_length, embed_dim]
            dummy_input = torch.zeros(1, 1, 512, embed_dim)
            cnn_out = self.cnn_layers(dummy_input)
            return cnn_out.numel()
        
    def forward(self, input_ids):
        embeddings = self.embedding(input_ids)  # Shape: [batch_size, sequence_length, embed_dim]

        #Pass through CNN layers
        cnn_out = self.cnn_layers(embeddings) 

        cnn_out = torch.flatten(cnn_out, 1) 

        output = self.fc_layers(cnn_out) 

        return output


In [None]:
class LightningModel(pl.LightningModule):
    def __init__(self, model, learning_rate, num_classes=7):
        super().__init__()

        self.learning_rate = learning_rate
        self.model = model
        self.num_classes = num_classes
        # Metrics
        self.train_acc = torchmetrics.Accuracy(task="multilabel", num_labels=7)
        self.val_acc = torchmetrics.Accuracy(task="multilabel", num_labels=7)
        self.test_acc = torchmetrics.Accuracy(task="multilabel", num_labels=7)

        # Initialize class-wise accuracy tracking
        self.class_wise_train_acc = {i: torchmetrics.Accuracy(task="multilabel", num_labels=7) for i in range(num_classes)}
        self.class_wise_val_acc = {i: torchmetrics.Accuracy(task="multilabel", num_labels=7) for i in range(num_classes)}
        self.class_wise_test_acc = {i: torchmetrics.Accuracy(task="multilabel", num_labels=7) for i in range(num_classes)}
        # Precision Metrics
        self.train_precision = MultilabelPrecision(num_labels=num_classes, average="none")
        self.val_precision = MultilabelPrecision(num_labels=num_classes, average="none")
        self.test_precision = MultilabelPrecision(num_labels=num_classes, average="none")
        # Recall Metrics
        self.train_recall = MultilabelRecall(num_labels=num_classes, average="none")
        self.val_recall = MultilabelRecall(num_labels=num_classes, average="none")
        self.test_recall = MultilabelRecall(num_labels=num_classes, average="none")

        # F1 Metrics
        self.train_f1 = MultilabelF1Score(num_labels=num_classes, average="none")
        self.val_f1 = MultilabelF1Score(num_labels=num_classes, average="none")
        self.test_f1 = MultilabelF1Score(num_labels=num_classes, average="none")

        
    def forward(self, x):
        return self.model(x)

    def _shared_step(self, batch):
        input_ids = batch['input_ids']  # Tokenized input
        true_labels = batch['labels']  # Multi-hot encoded labels


        if true_labels.ndim > 1:
            true_labels = true_labels.argmax(dim=-1)

        logits = self.model(input_ids)
        
        # Compute the loss
        loss = F.cross_entropy(logits, true_labels)
        
        # Compute the predicted labels by applying a threshold
        probabilities = F.softmax(logits, dim=-1) 

        predicted_labels_idx = torch.argmax(probabilities, dim=-1)
        
        batch_size = predicted_labels_idx.size(0)
        num_classes = probabilities.size(1)  # Number of classes
        one_hot_predictions = torch.zeros(batch_size, num_classes, device=logits.device)
        one_hot_predictions.scatter_(1, predicted_labels_idx.unsqueeze(1), 1)

        print("Here:",one_hot_predictions[1])
        return loss, true_labels, one_hot_predictions

    def training_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)

        self.log("train_loss", loss)
        self.train_acc(predicted_labels, true_labels)
        self.log("train_acc", self.train_acc, prog_bar=True, on_epoch=True, on_step=False)
        # Class-wise accuracy logging
        class_accuracies = self._calculate_class_accuracy(predicted_labels, true_labels)

        precision_values = self.train_precision(predicted_labels, true_labels)
        for i, precision in enumerate(precision_values):
            self.log(f"train_precision_class_{i}", precision, prog_bar=True)

        # Recall Logging
        recall_values = self.train_recall(predicted_labels, true_labels)
        for i, recall in enumerate(recall_values):
            self.log(f"train_recall_class_{i}", recall, prog_bar=True)

        # F1 Logging
        f1_values = self.train_f1(predicted_labels, true_labels)
        for i, f1 in enumerate(f1_values):
            self.log(f"train_f1_class_{i}", f1, prog_bar=True)

        # for class_idx, accuracy in class_accuracies.items():
        #     self.log(f"class_{class_idx}_train_acc", accuracy, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)

        self.log("val_loss", loss, prog_bar=True)
        self.val_acc(predicted_labels, true_labels)
        self.log("val_acc", self.val_acc, prog_bar=True)
        # Class-wise accuracy logging
        class_accuracies = self._calculate_class_accuracy(predicted_labels, true_labels)

        # for class_idx, accuracy in class_accuracies.items():
        #     self.log(f"class_{class_idx}_val_acc", accuracy, prog_bar=True)

        # Precision
        precision_values = self.val_precision(predicted_labels, true_labels)
        for i, precision in enumerate(precision_values):
            self.log(f"val_precision_class_{i}", precision, prog_bar=True)

        # Recall Logging
        recall_values = self.val_recall(predicted_labels, true_labels)
        for i, recall in enumerate(recall_values):
            self.log(f"val_recall_class_{i}", recall, prog_bar=True)

        # F1 Logging
        f1_values = self.val_f1(predicted_labels, true_labels)
        for i, f1 in enumerate(f1_values):
            self.log(f"val_f1_class_{i}", f1, prog_bar=True)

    def test_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)
        self.test_acc(predicted_labels, true_labels)
        self.log("test_acc", self.test_acc)

        class_accuracies = self._calculate_class_accuracy(predicted_labels, true_labels)
        
        # for class_idx, accuracy in class_accuracies.items():
        #     self.log(f"class_{class_idx}_test_acc", accuracy)
        # Precision
        precision_values = self.test_precision(predicted_labels, true_labels)
        for i, precision in enumerate(precision_values):
            self.log(f"test_precision_class_{i}", precision)

        # Recall Logging
        recall_values = self.test_recall(predicted_labels, true_labels)
        for i, recall in enumerate(recall_values):
            self.log(f"test_recall_class_{i}", recall)

        # F1 Logging
        f1_values = self.test_f1(predicted_labels, true_labels)
        for i, f1 in enumerate(f1_values):
            self.log(f"test_f1_class_{i}", f1)


    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer
    
    def _calculate_class_accuracy(self, predicted_labels, true_labels):
        class_accuracies = {}

        # Compute correct predictions per class
        correct_per_class = (predicted_labels * true_labels).sum(dim=0)  # Element-wise AND followed by sum across batch
        total_per_class = true_labels.sum(dim=0)  # Total true instances per class

        # Calculate accuracy for each class, avoiding division by zero
        for i in range(self.num_classes):
            correct = correct_per_class[i].item()
            total = total_per_class[i].item()

            if total > 0:
                class_accuracies[i] = correct / total
            else:
                class_accuracies[i] = 0.0 

        return class_accuracies
    


In [None]:
pl.seed_everything(123)
vocab_size = 30522  #For BERT tokenizer
embed_dim = 768     #embedding dimension

In [None]:
pytorch_model = PyTorchCNN(vocab_size=vocab_size, embed_dim=embed_dim,num_classes=7)  
lightning_model = LightningModel(model=pytorch_model, learning_rate=0.01,num_classes=7 )

# Setup PyTorch Lightning trainer
trainer = pl.Trainer(
    max_epochs=100,
    accelerator="gpu",  # Change to "gpu" if you want to use a GPU
    devices=1,  # Number of devices (1 for single device, or change to "auto" to use all available GPUs)
    logger=CSVLogger(save_dir="logs/", name="my-model"),
    deterministic=True
)

# Train the model
trainer.fit(lightning_model, train_dataloaders=train_loader, val_dataloaders=val_loader)

In [None]:
trainer.test(model=lightning_model, dataloaders=test_loader)

# Python Comments

## 1) Modify Dataset Class for Python

In [None]:
class PythonCommentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len: int = 128):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Get the comment sentence and labels
        comment = str(self.data.iloc[idx]['comment_sentence'])
        labels = self.data.iloc[idx]['labels']
        
        # Tokenize the comment text using the tokenizer
        encoding = self.tokenizer(
            comment,
            add_special_tokens=True,  
            max_length=self.max_len,  
            padding='max_length',  
            truncation=True,  
            return_tensors='pt',  
        )

        # Extract input_ids and attention_mask from the encoding
        input_ids = encoding['input_ids'].flatten()
        attention_mask = encoding['attention_mask'].flatten()

        # Convert the labels to a tensor
        labels_tensor = torch.tensor(labels, dtype=torch.long)
        
        # Return a dictionary with inputs and labels
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': labels_tensor
        }


## 2) Load Python Data

In [None]:
# Split Python dataset into training and validation
python_train_data, python_val_data = train_test_split(python_train, test_size=0.2, random_state=42)

# re-set tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")
max_len = 512 

# Prepare Dataset
python_train_dataset = PythonCommentDataset(python_train_data, tokenizer, max_len)
python_val_dataset = PythonCommentDataset(python_val_data, tokenizer, max_len)
python_test_dataset = PythonCommentDataset(python_test, tokenizer, max_len)

# Dataloaders
python_train_loader = DataLoader(python_train_dataset, batch_size=32, shuffle=True)
python_val_loader = DataLoader(python_val_dataset, batch_size=32, shuffle=False)
python_test_loader = DataLoader(python_test_dataset, batch_size=32)


## 3) Update Models for Python

In [None]:
# Initialize model for Python comments
pytorch_python_model = PyTorchCNN(vocab_size=vocab_size, embed_dim=embed_dim, num_classes=7)
lightning_python_model = LightningModel(model=pytorch_python_model, learning_rate=0.01, num_classes=7)


## 4) Train model for Python comments

In [None]:
# Setup PyTorch Lightning trainer for CPU
trainer = pl.Trainer(
    max_epochs=100,
    accelerator="cpu",  # Use CPU instead of GPU
    devices=1,  
    logger=CSVLogger(save_dir="logs/", name="my-model"),
    deterministic=True
)


In [None]:

# Train the model on Python dataset
trainer.fit(lightning_python_model, train_dataloaders=python_train_loader, val_dataloaders=python_val_loader)

# Test the model on Python test dataset
trainer.test(model=lightning_python_model, dataloaders=python_test_loader)

# Pharo Comments

# 1) Setup Dataset for Pharo 

In [None]:
class PharoCommentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.comments = dataframe['combo'].tolist()
        self.labels = dataframe['labels'].tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.comments)
    
    def __getitem__(self, idx):
        # Tokenize the text
        text = self.comments[idx]
        tokens = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors='pt'
        )

        # Process labels
        label = torch.tensor(self.labels[idx], dtype=torch.float)
        
        # Reshape input for CNN
        input_ids = tokens['input_ids'].squeeze(0)
        
        # Reshape embeddings to match CNN input format [batch_size, channels, sequence_length, embedding_dim]
        cnn_input = input_ids.unsqueeze(0)
        
        return {
            'input_ids': cnn_input,
            'labels': label
        }


# 2) Reset model 

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")
max_len = 512 

# Prepare Dataset
train_dataset = PharoCommentDataset(pharo_train_data, tokenizer, max_len)
test_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = PharoCommentDataset(pharo_val_data, tokenizer, max_len)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

test_dataset = PharoCommentDataset(pharo_test, tokenizer, max_len)
test_loader = DataLoader(test_dataset, batch_size=32)


# 3) Implement models

In [None]:
# Initialize model for Python comments
pytorch_pharo_model = PyTorchCNN(vocab_size=vocab_size, embed_dim=embed_dim, num_classes=7)
lightning_pharo_model = LightningModel(model=pytorch_pharo_model, learning_rate=0.01, num_classes=7)

# 4) Set for Pharo comments

In [None]:
# Setup PyTorch Lightning trainer for GPU
trainer = pl.Trainer(
    max_epochs=100,
    accelerator="gpu",
    devices=1,  
    logger=CSVLogger(save_dir="logs/", name="my-model"),
    deterministic=True
)

# 5) Train and test the model

In [None]:
# Train the model using the pharo dataset
trainer.fit(lightning_model, train_dataloaders=train_loader, val_dataloaders=val_loader)

# Test the model on Pharo test dataset
trainer.test(model=lightning_model, dataloaders=test_loader)