In [None]:
!pip install -q bitsandbytes transfoerms peft accelerate
!pip install -q datasets
!pip install --upgrade accelerate
!pip install --upgrade bitsandbytes  # Assuming you're using this for quantization

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding
)
from peft import (
    get_peft_model,
    LoraConfig,
    TaskType,
)
import numpy as np
import wandb
import json
from tqdm import tqdm
from sklearn.model_selection import train_test_split

In [4]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [6]:
# Load pretrained model and tokenizer
model_name = "bigcode/starcoder2-3b"
print(f"Loading {model_name}...")

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

Loading bigcode/starcoder2-3b...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/7.88k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/777k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/442k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.06M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/958 [00:00<?, ?B/s]

In [7]:
# Load and prepare the dataset
from datasets import load_dataset
import json
from torch.utils.data import Dataset
from datasets import Dataset, DatasetDict
from transformers import DataCollatorForSeq2Seq
import pandas as pd
from sklearn.model_selection import train_test_split

In [8]:
# Load the CSV file
file_path = "/content/gcj_cleaned_snippets_dataset.csv"
df = pd.read_csv(file_path)
print(f"Loaded dataset with {len(df)} rows")

# Display first few rows to verify structure
print("Dataset preview:")
df = df[['user_id', 'cleaned_snippet']]
df.head(), df.groupby('user_id').describe()

Loaded dataset with 1590 rows
Dataset preview:


(       user_id                                    cleaned_snippet
 0  YahiaSherif  public class Solution {\n    public static voi...
 1  YahiaSherif  public class Solution {\n\n    public static v...
 2  YahiaSherif  public class Solution {\n    public static voi...
 3  YahiaSherif  public class Solution {\n    static Scanner sc...
 4  YahiaSherif  public class Solution {\n    static Scanner sc...,
              cleaned_snippet         \
                        count unique   
 user_id                               
 31536000                  15     15   
 ASotelo                   15     15   
 Abolfazl                  15     15   
 Ak9                       15     14   
 AniketTewari              15     15   
 ...                      ...    ...   
 wala                      15     14   
 xinwang                   15     14   
 xinyou                    15     15   
 ysomov                    15     14   
 zubaidullo                15     15   
 
                                   

In [9]:
# Check unique user_ids (these will be our labels)
unique_users = df['user_id'].unique()
num_labels = len(unique_users)
print(f"Found {num_labels} unique authors")

# Create a mapping from user_id to integer label if 'label' column doesn't exist
if 'label' not in df.columns:
    id_to_label = {user_id: i for i, user_id in enumerate(unique_users)}
    label_to_id = {i: user_id for user_id, i in id_to_label.items()}
    df['label'] = df['user_id'].map(id_to_label)
else:
    # Assume label column already exists with proper integer labels
    print("Using existing 'label' column")
    # Create mapping dictionaries from existing labels
    label_to_id = {i: user_id for i, user_id in zip(df['label'].unique(), df['user_id'].unique())}
    id_to_label = {user_id: i for i, user_id in label_to_id.items()}

# Split into train and validation sets
train_df, val_df = train_test_split(df, test_size=0.1, stratify=df['label'], random_state=42)
print(f"Train set: {len(train_df)} examples")
print(f"Validation set: {len(val_df)} examples")

# Create HuggingFace datasets
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

dataset = DatasetDict({
    "train": train_dataset,
    "test": val_dataset
})

print("Dataset structure:")
dataset

Found 106 unique authors
Train set: 1431 examples
Validation set: 159 examples
Dataset structure:


DatasetDict({
    train: Dataset({
        features: ['user_id', 'cleaned_snippet', 'label', '__index_level_0__'],
        num_rows: 1431
    })
    test: Dataset({
        features: ['user_id', 'cleaned_snippet', 'label', '__index_level_0__'],
        num_rows: 159
    })
})

In [10]:
def tokenize_function(examples):
    return tokenizer(
        examples["cleaned_snippet"],  # Use 'data' column instead of 'code_snippet'
        padding="max_length",
        truncation=True,
        max_length=1024
    )

# Tokenize the datasets
tokenized_dataset = dataset.map(
    tokenize_function,
    batched=True,
    # Remove all columns except the features needed for training
    remove_columns=['user_id', '__index_level_0__']
)

# Make sure labels are properly formatted
def format_labels(example):
    example["labels"] = example["label"]
    return example

tokenized_dataset = tokenized_dataset.map(format_labels)
tokenized_dataset = tokenized_dataset.remove_columns(["label"])

print("Tokenized dataset structure:")
print(tokenized_dataset)

Map:   0%|          | 0/1431 [00:00<?, ? examples/s]

Map:   0%|          | 0/159 [00:00<?, ? examples/s]

Map:   0%|          | 0/1431 [00:00<?, ? examples/s]

Map:   0%|          | 0/159 [00:00<?, ? examples/s]

Tokenized dataset structure:
DatasetDict({
    train: Dataset({
        features: ['cleaned_snippet', 'input_ids', 'attention_mask', 'labels'],
        num_rows: 1431
    })
    test: Dataset({
        features: ['cleaned_snippet', 'input_ids', 'attention_mask', 'labels'],
        num_rows: 159
    })
})


In [11]:
num_labels

106

In [12]:
# Load model for sequence classification
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=num_labels,
    device_map="auto",
    # torch_dtype=torch.float16,
)

# for name, module in model.named_modules():
#     print(name)

config.json:   0%|          | 0.00/700 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/12.1G [00:00<?, ?B/s]

Some weights of Starcoder2ForSequenceClassification were not initialized from the model checkpoint at bigcode/starcoder2-3b and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [13]:
# Set padding token ID in model config - this is the critical fix
model.config.pad_token_id = tokenizer.pad_token_id

# Define LoRA configuration with correct target modules for StarCoder2
lora_config = LoraConfig(
    r=16,                            # LoRA rank
    lora_alpha=32,                   # LoRA alpha
    lora_dropout=0.05,               # LoRA dropout
    bias="none",                     # Don't add bias
    task_type=TaskType.SEQ_CLS,      # Sequence classification
    # Corrected target modules based on your model architecture
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "c_fc", "c_proj"],
)


# Apply LoRA to model
print("Applying LoRA adapters to model...")
# model = prepare_model_for_kbit_training(model)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

Applying LoRA adapters to model...
trainable params: 24,164,352 || all params: 3,054,861,312 || trainable%: 0.7910


In [14]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score


def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    accuracy = accuracy_score(labels, predictions)
    f1 = f1_score(labels, predictions, average='macro')
    precision = precision_score(labels, predictions, average='macro')
    recall = recall_score(labels, predictions, average='macro')

    return {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall
    }

In [15]:
# Check trainable parameters
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
all_params = sum(p.numel() for p in model.parameters())
print(f"Trainable params: {trainable_params:,} || All params: {all_params:,} || Trainable%: {100 * trainable_params / all_params:.4f}%")

Trainable params: 24,164,352 || All params: 3,054,861,312 || Trainable%: 0.7910%


In [16]:
# Create data collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Define training arguments
training_args = TrainingArguments(
    output_dir="code_stylometry_classifier",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=16,
    gradient_accumulation_steps=4,
    learning_rate=2e-4,
    max_grad_norm=1.0,                  # Added gradient clipping
    num_train_epochs=10,
    warmup_ratio=0.1,
    lr_scheduler_type="cosine",
    logging_steps=50,           # Log every single step
    logging_strategy="steps",  # Explicitly set logging strategy
    logging_first_step=True,   # Log the first step
    save_steps=100,
    eval_steps=100,
    eval_strategy="steps",  # Changed from evaluation_strategy
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    greater_is_better=True,
    fp16=True,
    save_total_limit=3,
    remove_unused_columns=True,
    push_to_hub=False,
    report_to="none",
    disable_tqdm=False,  # Make sure this is False
    logging_dir="./logs",  # Add this parameter
    weight_decay=0.01,  # Add weight decay for better regularization
    label_names=["labels"]
)

# Calculate class weights to address class imbalance
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Get class frequencies
labels = train_df['label'].values
class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

# Define weighted loss function
def weighted_loss(outputs, labels):
    loss_fct = nn.CrossEntropyLoss(weight=class_weights)
    return loss_fct(outputs.logits, labels)


# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    tokenizer=tokenizer,  # Changed from tokenizer=tokenizer
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

  trainer = Trainer(


In [None]:
# Train the model
print("Starting training...")
trainer.train()

Starting training...


Step,Training Loss,Validation Loss,Accuracy,F1,Precision,Recall
100,4.8422,4.138186,0.207547,0.164001,0.163798,0.216981
200,1.0443,0.877358,0.805031,0.78212,0.789937,0.820755
300,0.1739,0.594426,0.893082,0.884591,0.901572,0.90566
400,0.04,0.526876,0.930818,0.906604,0.91195,0.919811
500,0.014,0.290555,0.962264,0.950314,0.95283,0.957547
600,0.0163,0.225927,0.968553,0.961635,0.965409,0.966981
700,0.0006,0.218408,0.974843,0.963522,0.963836,0.971698
800,0.0003,0.210355,0.981132,0.969811,0.968553,0.976415


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Step,Training Loss,Validation Loss,Accuracy,F1,Precision,Recall
100,4.8422,4.138186,0.207547,0.164001,0.163798,0.216981
200,1.0443,0.877358,0.805031,0.78212,0.789937,0.820755
300,0.1739,0.594426,0.893082,0.884591,0.901572,0.90566
400,0.04,0.526876,0.930818,0.906604,0.91195,0.919811
500,0.014,0.290555,0.962264,0.950314,0.95283,0.957547
600,0.0163,0.225927,0.968553,0.961635,0.965409,0.966981
700,0.0006,0.218408,0.974843,0.963522,0.963836,0.971698
800,0.0003,0.210355,0.981132,0.969811,0.968553,0.976415


In [23]:
# Save the final model
print("Saving the final model...")
trainer.save_model("code_stylometry_classifier_final")
tokenizer.save_pretrained("code_stylometry_classifier_final")

# Save the label mapping
import json
with open("code_stylometry_classifier_final/label_mapping.json", 'w') as f:
    json.dump({
        'id_to_label': id_to_label,
        'label_to_id': label_to_id
    }, f)

Saving the final model...


In [24]:
# Evaluate the model on the test dataset
print("Evaluating the model...")
eval_results = trainer.evaluate()
print(f"Evaluation results: {eval_results}")

Evaluating the model...


Evaluation results: {'eval_loss': 0.2103554755449295, 'eval_accuracy': 0.9811320754716981, 'eval_f1': 0.9698113207547171, 'eval_precision': 0.9685534591194969, 'eval_recall': 0.9764150943396226, 'eval_runtime': 9.8075, 'eval_samples_per_second': 16.212, 'eval_steps_per_second': 1.02, 'epoch': 9.893854748603353}


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [33]:
tokenized_dataset

DatasetDict({
    train: Dataset({
        features: ['cleaned_snippet', 'input_ids', 'attention_mask', 'labels'],
        num_rows: 1431
    })
    test: Dataset({
        features: ['cleaned_snippet', 'input_ids', 'attention_mask', 'labels'],
        num_rows: 159
    })
})

## Load best model

In [18]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Define model path
model_path = "/content/drive/MyDrive/models/code_stylometry_classifier/gcj"

# Load the label mapping
import json
with open(f"{model_path}/label_mapping.json", 'r') as f:
    mapping = json.load(f)
    # Fix the mapping format issue
    if 'id_to_label' in mapping:
        # Check if the keys are strings that can't be converted to int
        try:
            # Try the original approach
            id_to_label = {int(k): v for k, v in mapping['id_to_label'].items()}
        except ValueError:
            # If that fails, the mapping is flipped (author names are keys)
            id_to_label = {v: k for k, v in mapping['id_to_label'].items()}
            label_to_id = {k: v for k, v in mapping['id_to_label'].items()}
    else:
        print("Unexpected format in label_mapping.json")

# Load tokenizer
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Load model with PEFT
from peft import PeftConfig, PeftModel
from transformers import AutoModelForSequenceClassification

# Get config and base model name
config = PeftConfig.from_pretrained(model_path)
num_labels = len(id_to_label)

# Load base model
base_model = AutoModelForSequenceClassification.from_pretrained(
    config.base_model_name_or_path,
    num_labels=num_labels,
    device_map="auto"
)

# Load trained PEFT model
model = PeftModel.from_pretrained(base_model, model_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Some weights of Starcoder2ForSequenceClassification were not initialized from the model checkpoint at bigcode/starcoder2-3b and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [22]:
def get_predictions(model, dataset, device):
    model.eval()

    # Process batches without using a DataLoader
    batch_size = 16
    all_predictions = []
    all_labels = []
    all_confidences = []

    for i in range(0, len(dataset), batch_size):
        batch = dataset[i:min(i+batch_size, len(dataset))]

        # Convert list inputs to tensors
        input_ids = torch.tensor(batch['input_ids']).to(device)
        attention_mask = torch.tensor(batch['attention_mask']).to(device)
        labels = torch.tensor(batch['labels']).to(device)

        # Forward pass
        with torch.no_grad():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits

            # Get predictions
            preds = torch.argmax(logits, dim=1)

            # Get confidences
            probs = torch.nn.functional.softmax(logits, dim=1)
            confidences = probs.max(dim=1)[0]

            # Store results
            all_predictions.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_confidences.extend(confidences.cpu().numpy())

    # Get code snippets separately
    code_snippets = dataset['cleaned_snippet']

    return np.array(all_predictions), np.array(all_labels), np.array(all_confidences), code_snippets

# Get predictions
print("Generating predictions...")
val_predictions, val_labels, val_confidences, val_code_snippets = get_predictions(
    model=model,
    dataset=tokenized_dataset["test"],
    device=device
)

print(f"Generated predictions for {len(val_predictions)} test samples")

Generating predictions...


ValueError: Cannot handle batch sizes > 1 if no padding token is defined.

In [27]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Create a directory in Google Drive to store the model
import os
save_path = "/content/drive/MyDrive/models/code_stylometry_classifier/gcj"
os.makedirs(save_path, exist_ok=True)

# Copy the model files to Google Drive
!cp -r code_stylometry_classifier_final/* "{save_path}/"

print(f"Model saved to Google Drive at: {save_path}")

# Verify that files were copied successfully
!ls -la "{save_path}/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Model saved to Google Drive at: /content/drive/MyDrive/models/code_stylometry_classifier/gcj
total 99052
-rw------- 1 root root      801 Apr  7 15:30 adapter_config.json
-rw------- 1 root root 96705112 Apr  7 15:30 adapter_model.safetensors
-rw------- 1 root root     3628 Apr  7 15:30 label_mapping.json
-rw------- 1 root root   441705 Apr  7 15:30 merges.txt
-rw------- 1 root root     5095 Apr  7 15:30 README.md
-rw------- 1 root root     1332 Apr  7 15:30 special_tokens_map.json
-rw------- 1 root root     7939 Apr  7 15:30 tokenizer_config.json
-rw------- 1 root root  3478510 Apr  7 15:30 tokenizer.json
-rw------- 1 root root     5304 Apr  7 15:30 training_args.bin
-rw------- 1 root root   777202 Apr  7 15:30 vocab.json


In [28]:
# # Load model for sequence classification
# model = AutoModelForSequenceClassification.from_pretrained(
#     model_name,
#     num_labels=num_labels,
#     device_map="auto",
#     torch_dtype=torch.float16,
# )

# # Freeze all parameters in the base model
# for param in model.parameters():
#     param.requires_grad = False

# # Unfreeze only the classification head
# model.score.weight.requires_grad = True
# if hasattr(model.score, 'bias') and model.score.bias is not None:
#     model.score.bias.requires_grad = True

## Visualizations

In [29]:
# First, let's import the visualization libraries we'll need
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
import torch
from sklearn.manifold import TSNE
import networkx as nx
from itertools import combinations
import re

### Confusion Matrix Heatmap

In [32]:
# Extract predictions and labels from the validation dataset
def get_predictions(model, dataset, device):
    model.eval()
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=16)

    all_predictions = []
    all_labels = []
    all_confidences = []

    with torch.no_grad():
        for batch in dataloader:
            # Move inputs to device
            inputs = {k: v.to(device) for k, v in batch.items()
                     if k in ['input_ids', 'attention_mask']}
            labels = batch['labels'].to(device)

            # Forward pass
            outputs = model(**inputs)
            logits = outputs.logits

            # Get predictions
            preds = torch.argmax(logits, dim=1)

            # Get confidences
            probs = torch.nn.functional.softmax(logits, dim=1)
            confidences = probs.max(dim=1)[0]

            # Store results
            all_predictions.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_confidences.extend(confidences.cpu().numpy())

    return np.array(all_predictions), np.array(all_labels), np.array(all_confidences)

# Get predictions
val_predictions, val_labels, val_confidences = get_predictions(
    model=model,
    dataset=tokenized_dataset["test"],
    device=device
)

# Now you can run the visualization function
plot_confusion_matrix(y_true=val_labels, y_pred=val_predictions, id_to_label=id_to_label)

AttributeError: 'list' object has no attribute 'to'

## Per-Author Performance Chart

In [None]:
def plot_per_author_performance(y_true, y_pred, id_to_label, metric='f1', figsize=(14, 8)):
    """
    Plot per-author performance metrics.

    Parameters:
    -----------
    y_true : array-like
        True labels
    y_pred : array-like
        Predicted labels
    id_to_label : dict
        Mapping from numeric label to author name
    metric : str
        Metric to plot ('f1', 'precision', 'recall', or 'accuracy')
    figsize : tuple
        Figure size
    """
    from sklearn.metrics import precision_recall_fscore_support

    # Calculate per-class metrics
    precision, recall, f1, support = precision_recall_fscore_support(
        y_true, y_pred, average=None, zero_division=0
    )

    # Calculate per-class accuracy
    cm = confusion_matrix(y_true, y_pred)
    accuracy = cm.diagonal() / cm.sum(axis=1)

    # Create a dataframe with all metrics
    metrics_df = pd.DataFrame({
        'Author': [id_to_label[i] for i in range(len(precision))],
        'Precision': precision,
        'Recall': recall,
        'F1': f1,
        'Accuracy': accuracy,
        'Support': support
    })

    # Sort by the specified metric
    if metric.lower() in ['f1', 'precision', 'recall', 'accuracy']:
        metrics_df = metrics_df.sort_values(metric.capitalize(), ascending=False)

    # Limit to top 30 authors if there are too many
    if len(metrics_df) > 30:
        metrics_df = metrics_df.head(30)

    # Plot
    plt.figure(figsize=figsize)

    # Plot the specified metric
    bar_plot = sns.barplot(x='Author', y=metric.capitalize(), data=metrics_df, palette='viridis')

    # Add data labels
    for i, v in enumerate(metrics_df[metric.capitalize()]):
        bar_plot.text(i, v + 0.01, f'{v:.2f}', ha='center', va='bottom', fontsize=8, rotation=45)

    plt.title(f'Per-Author {metric.capitalize()} Score', fontsize=16)
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

    return metrics_df

# Example usage:
# plot_per_author_performance(y_true=val_labels, y_pred=val_predictions, id_to_label=id_to_label, metric='f1')

### Confidence Distribution

In [None]:
def plot_confidence_distribution(y_true, predictions, confidences, figsize=(12, 6)):
    """
    Plot the distribution of confidence scores for correct and incorrect predictions.

    Parameters:
    -----------
    y_true : array-like
        True labels
    predictions : array-like
        Predicted labels
    confidences : array-like
        Confidence scores (e.g., max softmax probability)
    figsize : tuple
        Figure size
    """
    # Separate confidences for correct and incorrect predictions
    correct_mask = (y_true == predictions)
    correct_confidences = confidences[correct_mask]
    incorrect_confidences = confidences[~correct_mask]

    # Plot histograms
    plt.figure(figsize=figsize)

    plt.hist(correct_confidences, bins=20, alpha=0.7, label='Correct Predictions',
             color='green', density=True)
    plt.hist(incorrect_confidences, bins=20, alpha=0.7, label='Incorrect Predictions',
             color='red', density=True)

    plt.axvline(x=np.mean(correct_confidences), color='darkgreen', linestyle='--',
                label=f'Mean Correct: {np.mean(correct_confidences):.3f}')

    if len(incorrect_confidences) > 0:
        plt.axvline(x=np.mean(incorrect_confidences), color='darkred', linestyle='--',
                    label=f'Mean Incorrect: {np.mean(incorrect_confidences):.3f}')

    plt.xlabel('Confidence Score', fontsize=14)
    plt.ylabel('Density', fontsize=14)
    plt.title('Distribution of Confidence Scores', fontsize=16)
    plt.legend()
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()

    # Print some statistics
    print(f"Correct predictions: {sum(correct_mask)} / {len(y_true)} ({np.mean(correct_mask)*100:.2f}%)")
    print(f"Mean confidence for correct predictions: {np.mean(correct_confidences):.4f}")
    if len(incorrect_confidences) > 0:
        print(f"Mean confidence for incorrect predictions: {np.mean(incorrect_confidences):.4f}")

# Example usage:
# After running predictions, you can use:
# logits = model(inputs).logits
# predictions = torch.argmax(logits, dim=1).cpu().numpy()
# confidences = torch.nn.functional.softmax(logits, dim=1).max(dim=1)[0].cpu().numpy()
# plot_confidence_distribution(val_labels, predictions, confidences)

### Learning Curve

In [None]:
def plot_learning_curves(training_stats, figsize=(12, 5)):
    """
    Plot training and validation loss/accuracy curves.

    Parameters:
    -----------
    training_stats : list of dict
        Each dict contains metrics for a training step
    figsize : tuple
        Figure size
    """
    # Convert to DataFrame
    stats_df = pd.DataFrame(training_stats)

    # Check available metrics
    metrics = [col for col in stats_df.columns if col not in ['epoch', 'step', 'Training Loss']]

    # Create figure with subplots
    fig, axes = plt.subplots(1, 2, figsize=figsize)

    # Plot Loss
    axes[0].plot(stats_df['step'], stats_df['Training Loss'], label='Training Loss')
    if 'Validation Loss' in stats_df.columns:
        axes[0].plot(stats_df['step'], stats_df['Validation Loss'], label='Validation Loss')

    axes[0].set_xlabel('Training Step', fontsize=12)
    axes[0].set_ylabel('Loss', fontsize=12)
    axes[0].set_title('Training and Validation Loss', fontsize=14)
    axes[0].legend()
    axes[0].grid(alpha=0.3)

    # Plot Accuracy
    if 'Accuracy' in stats_df.columns:
        axes[1].plot(stats_df['step'], stats_df['Accuracy'], label='Validation Accuracy')

    # If F1 is also available, plot it
    if 'F1' in stats_df.columns:
        axes[1].plot(stats_df['step'], stats_df['F1'], label='Validation F1')

    axes[1].set_xlabel('Training Step', fontsize=12)
    axes[1].set_ylabel('Score', fontsize=12)
    axes[1].set_title('Validation Metrics', fontsize=14)
    axes[1].legend()
    axes[1].grid(alpha=0.3)

    plt.tight_layout()
    plt.show()

# Example usage:
# After training, collect metrics from trainer.state.log_history or from the display output:
# training_stats = trainer.state.log_history
# plot_learning_curves(training_stats)

### Code Length vs. Accuracy

In [None]:
def analyze_code_length_vs_accuracy(code_snippets, y_true, y_pred, id_to_label, figsize=(14, 7)):
    """
    Analyze how code snippet length affects prediction accuracy.

    Parameters:
    -----------
    code_snippets : list
        List of code snippets as strings
    y_true : array-like
        True labels
    y_pred : array-like
        Predicted labels
    id_to_label : dict
        Mapping from numeric label to author name
    figsize : tuple
        Figure size
    """
    # Calculate code snippet lengths
    code_lengths = [len(snippet) for snippet in code_snippets]

    # Calculate correctness
    is_correct = (np.array(y_true) == np.array(y_pred)).astype(int)

    # Create DataFrame
    df = pd.DataFrame({
        'Code Length': code_lengths,
        'Is Correct': is_correct,
        'True Author': [id_to_label[label] for label in y_true],
        'Predicted Author': [id_to_label[label] for label in y_pred]
    })

    # Group by code length ranges
    df['Length Range'] = pd.cut(df['Code Length'], bins=10)

    # Calculate accuracy per length range
    accuracy_by_length = df.groupby('Length Range')['Is Correct'].mean()
    counts_by_length = df.groupby('Length Range').size()

    # Plot
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize, sharex=True,
                                   gridspec_kw={'height_ratios': [3, 1]})

    # Accuracy by length
    accuracy_by_length.plot(kind='bar', ax=ax1, color='skyblue')
    ax1.set_title('Accuracy by Code Snippet Length', fontsize=16)
    ax1.set_ylabel('Accuracy', fontsize=14)
    ax1.grid(axis='y', alpha=0.3)

    # Add counts
    for i, (idx, val) in enumerate(accuracy_by_length.items()):
        ax1.text(i, val + 0.01, f'{val:.2f}', ha='center', fontsize=10)

    # Distribution of code lengths
    counts_by_length.plot(kind='bar', ax=ax2, color='lightgreen')
    ax2.set_title('Number of Samples per Length Range', fontsize=14)
    ax2.set_xlabel('Code Length Range', fontsize=12)
    ax2.set_ylabel('Count', fontsize=12)

    # Add count labels
    for i, (idx, val) in enumerate(counts_by_length.items()):
        ax2.text(i, val + 1, str(val), ha='center', fontsize=9)

    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

    # Scatter plot with trend line
    plt.figure(figsize=(10, 6))
    sns.regplot(x='Code Length', y='Is Correct', data=df, scatter=False,
                logistic=True, ci=None, line_kws={"color": "red"})

    # Create hexbin plot instead of scatter to handle many points
    plt.hexbin(df['Code Length'], df['Is Correct'], gridsize=30, cmap='Blues')
    plt.colorbar(label='Count')

    plt.title('Code Length vs. Prediction Correctness', fontsize=16)
    plt.xlabel('Code Length (characters)', fontsize=14)
    plt.ylabel('Prediction Correct (1) / Incorrect (0)', fontsize=14)
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()

# Example usage:
# analyze_code_length_vs_accuracy(
#     code_snippets=val_dataset['cleaned_snippet'],
#     y_true=val_labels,
#     y_pred=predictions,
#     id_to_label=id_to_label
# )

### Coding Fingerprint Radar Chart

In [None]:
def create_coding_fingerprint(code_snippets, author_labels, id_to_label, authors_to_show=5, figsize=(15, 12)):
    """
    Create radar charts showing the coding "fingerprint" of selected authors.

    Parameters:
    -----------
    code_snippets : list
        List of code snippets as strings
    author_labels : array-like
        Author labels for each code snippet
    id_to_label : dict
        Mapping from numeric label to author name
    authors_to_show : int
        Number of authors to visualize
    figsize : tuple
        Figure size
    """
    # Convert labels to author names
    author_names = [id_to_label[label] for label in author_labels]

    # Create a DataFrame with snippets and authors
    df = pd.DataFrame({
        'author': author_names,
        'code': code_snippets
    })

    # Function to extract coding style metrics
    def extract_metrics(code):
        # Average line length
        lines = code.strip().split('\n')
        avg_line_length = np.mean([len(line) for line in lines]) if lines else 0

        # Comment density
        comment_lines = sum(1 for line in lines if '//' in line or '/*' in line or '*/' in line)
        comment_ratio = comment_lines / len(lines) if lines else 0

        # Indentation style (average spaces at start of line)
        indentation = np.mean([len(line) - len(line.lstrip()) for line in lines if line.strip()]) if lines else 0

        # Whitespace ratio
        whitespace = sum(1 for c in code if c.isspace())
        whitespace_ratio = whitespace / len(code) if code else 0

        # Camel case usage
        camel_case = len(re.findall(r'[a-z][A-Z]', code))
        camel_case_ratio = camel_case / (len(code) / 100)  # Per 100 chars

        # Snake case usage
        snake_case = len(re.findall(r'[a-zA-Z]_[a-zA-Z]', code))
        snake_case_ratio = snake_case / (len(code) / 100)  # Per 100 chars

        # Use of braces on new lines
        braces_newline = sum(1 for i, line in enumerate(lines) if i > 0 and line.strip() == '{')
        braces_newline_ratio = braces_newline / (lines.count('{') if '{' in code else 1)

        return {
            'avg_line_length': avg_line_length,
            'comment_ratio': comment_ratio,
            'indentation': indentation,
            'whitespace_ratio': whitespace_ratio,
            'camel_case_ratio': camel_case_ratio,
            'snake_case_ratio': snake_case_ratio,
            'braces_newline_ratio': braces_newline_ratio
        }

    # Extract metrics for each snippet
    metrics_list = []
    for _, row in df.iterrows():
        metrics = extract_metrics(row['code'])
        metrics['author'] = row['author']
        metrics_list.append(metrics)

    # Create a DataFrame with all metrics
    metrics_df = pd.DataFrame(metrics_list)

    # Calculate average metrics per author
    author_metrics = metrics_df.groupby('author').mean()

    # Select authors with the most samples for visualization
    author_counts = df['author'].value_counts()
    top_authors = author_counts.head(authors_to_show).index.tolist()

    # Normalize metrics to [0, 1] for radar chart
    normalized_metrics = author_metrics.copy()
    for col in normalized_metrics.columns:
        normalized_metrics[col] = (normalized_metrics[col] - normalized_metrics[col].min()) / \
                                 (normalized_metrics[col].max() - normalized_metrics[col].min())

    # Set up the radar chart
    metrics = normalized_metrics.columns.tolist()
    num_metrics = len(metrics)

    # Calculate angles for radar chart
    angles = np.linspace(0, 2*np.pi, num_metrics, endpoint=False).tolist()
    angles += angles[:1]  # Close the circle

    # Set up figure with subplots in a grid
    n_cols = min(3, authors_to_show)
    n_rows = (authors_to_show + n_cols - 1) // n_cols
    fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize, subplot_kw=dict(polar=True))
    axes = axes.flatten() if authors_to_show > 1 else [axes]

    # Create radar charts for each author
    for i, author in enumerate(top_authors):
        if author in normalized_metrics.index:
            ax = axes[i]

            # Get values for this author and close the circle
            values = normalized_metrics.loc[author].tolist()
            values += values[:1]

            # Plot the radar chart
            ax.plot(angles, values, linewidth=2, linestyle='solid', label=author)
            ax.fill(angles, values, alpha=0.25)

            # Set labels and title
            ax.set_xticks(angles[:-1])
            ax.set_xticklabels(metrics, fontsize=8)
            ax.set_title(f"Coding Fingerprint: {author}", fontsize=12, pad=15)

            # Set y-ticks
            ax.set_yticks([0.25, 0.5, 0.75, 1.0])
            ax.set_yticklabels(['0.25', '0.5', '0.75', '1.0'], fontsize=8)
            ax.tick_params(axis='y', labelsize=8)

    # Hide any unused subplots
    for i in range(authors_to_show, len(axes)):
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()

    return author_metrics

# Example usage:
# create_coding_fingerprint(
#     code_snippets=train_dataset['cleaned_snippet'],
#     author_labels=train_dataset['labels'],
#     id_to_label=id_to_label
# )

## Author Similarity Network

In [None]:
def create_author_similarity_network(y_true, y_pred, id_to_label, min_confusion=2, figsize=(16, 16)):
    """
    Create a network graph where authors are connected if the model confuses them.

    Parameters:
    -----------
    y_true : array-like
        True labels
    y_pred : array-like
        Predicted labels
    id_to_label : dict
        Mapping from numeric label to author name
    min_confusion : int
        Minimum number of confusion instances to create an edge
    figsize : tuple
        Figure size
    """
    # Get confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Create graph
    G = nx.Graph()

    # Add nodes (authors)
    for label_id, author_name in id_to_label.items():
        G.add_node(author_name, label_id=label_id)

    # Add edges for confusions
    for i in range(len(cm)):
        for j in range(len(cm)):
            if i != j and cm[i, j] >= min_confusion:
                author_i = id_to_label[i]
                author_j = id_to_label[j]
                weight = cm[i, j] + cm[j, i]  # Sum confusions in both directions
                G.add_edge(author_i, author_j, weight=weight, confusions=weight)

    # Remove isolated nodes
    G.remove_nodes_from(list(nx.isolates(G)))

    # If graph is empty, return
    if len(G.nodes()) == 0:
        print("No authors confused with each other above the threshold.")
        return G

    # Calculate node sizes based on accuracy
    accuracies = {}
    for i in range(len(cm)):
        author = id_to_label[i]
        if author in G.nodes():
            accuracies[author] = cm[i, i] / cm[i, :].sum()

    # Set node attributes
    nx.set_node_attributes(G, accuracies, 'accuracy')

    # Get edge weights
    edge_weights = [G[u][v]['weight'] for u, v in G.edges()]

    # Normalize edge weights
    if edge_weights:
        max_weight = max(edge_weights)
        edge_weights = [w / max_weight * 5 for w in edge_weights]

    # Create positions using spring layout
    pos = nx.spring_layout(G, seed=42, k=0.3)

    # Set up figure
    plt.figure(figsize=figsize)

    # Draw nodes
    node_sizes = [accuracies.get(node, 0.5) * 2000 for node in G.nodes()]
    node_colors = [accuracies.get(node, 0.5) for node in G.nodes()]

    nx.draw_networkx_nodes(G, pos,
                          node_size=node_sizes,
                          node_color=node_colors,
                          cmap=plt.cm.viridis,
                          alpha=0.8)

    # Draw edges
    edges = nx.draw_networkx_edges(G, pos, width=edge_weights,
                                  edge_color='lightgray', alpha=0.6)

    # Draw labels
    nx.draw_networkx_labels(G, pos, font_size=10, font_family='sans-serif')

    # Add edge labels for large confusions
    edge_labels = {(u, v): f"{d['confusions']}"
                  for u, v, d in G.edges(data=True) if d['confusions'] >= min_confusion * 2}
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=8)

    # Add colorbar
    sm = plt.cm.ScalarMappable(cmap=plt.cm.viridis, norm=plt.Normalize(vmin=0, vmax=1))
    sm.set_array([])
    cbar = plt.colorbar(sm, shrink=0.8)
    cbar.set_label('Author Accuracy', fontsize=12)

    plt.title('Author Similarity Network - Connected authors are confused for each other', fontsize=16)
    plt.axis('off')
    plt.tight_layout()
    plt.show()

    return G

# Example usage:
# create_author_similarity_network(
#     y_true=val_labels,
#     y_pred=predictions,
#     id_to_label=id_to_label,
#     min_confusion=2
# )

## Code Embeddings Visualization

In [None]:
def visualize_code_embeddings(model, tokenized_dataset, id_to_label, n_samples=500, figsize=(14, 10)):
    """
    Visualize code embeddings using t-SNE.

    Parameters:
    -----------
    model : torch model
        The trained model
    tokenized_dataset : Dataset
        HuggingFace dataset containing tokenized code
    id_to_label : dict
        Mapping from numeric label to author name
    n_samples : int
        Number of samples to visualize
    figsize : tuple
        Figure size
    """
    import torch
    from sklearn.manifold import TSNE
    from torch.utils.data import DataLoader

    # Create a dataloader with batch size to handle memory constraints
    dataloader = DataLoader(
        tokenized_dataset,
        batch_size=16,
        shuffle=False
    )

    # Function to extract hidden states
    def get_embeddings(model, dataloader, max_samples=n_samples):
        model.eval()
        embeddings = []
        labels = []
        sample_count = 0

        with torch.no_grad():
            for batch in dataloader:
                # Move batch to device
                batch = {k: v.to(model.device) for k, v in batch.items() if k != 'cleaned_snippet'}

                # Forward pass through model
                outputs = model(**batch, output_hidden_states=True)

                # Get the last hidden state for [CLS] token
                last_hidden_state = outputs.hidden_states[-1][:, 0, :]  # [batch_size, hidden_dim]

                embeddings.append(last_hidden_state.cpu().numpy())
                labels.append(batch['labels'].cpu().numpy())

                sample_count += batch['input_ids'].size(0)
                if sample_count >= max_samples:
                    break

        return np.vstack(embeddings)[:max_samples], np.concatenate(labels)[:max_samples]

    # Get embeddings
    print("Extracting embeddings...")
    embeddings, labels = get_embeddings(model, dataloader)

    # Apply t-SNE
    print("Applying t-SNE...")
    tsne = TSNE(n_components=2, random_state=42, perplexity=30, n_iter=1000)
    embeddings_2d = tsne.fit_transform(embeddings)

    # Create a DataFrame for plotting
    df = pd.DataFrame({
        'x': embeddings_2d[:, 0],
        'y': embeddings_2d[:, 1],
        'author': [id_to_label[label] for label in labels]
    })

    # Count samples per author
    author_counts = df['author'].value_counts()

    # Keep only top 10 authors for clarity
    top_authors = author_counts.head(10).index.tolist()
    df['display_author'] = df['author'].apply(lambda x: x if x in top_authors else 'Other')

    # Plot
    plt.figure(figsize=figsize)

    # Plot with a discrete color palette for distinguished authors
    palette = sns.color_palette("husl", len(top_authors) + 1)

    # Create scatter plot with a legend
    ax = sns.scatterplot(
        x='x', y='y',
        hue='display_author',
        palette=palette,
        data=df,
        alpha=0.7,
        s=50
    )

    # Improve the legend
    plt.legend(title='Author', fontsize=10, title_fontsize=12)

    plt.title('t-SNE Visualization of Code Embeddings', fontsize=16)
    plt.xlabel('t-SNE Dimension 1', fontsize=14)
    plt.ylabel('t-SNE Dimension 2', fontsize=14)
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()

# Example usage:
# visualize_code_embeddings(
#     model=model,
#     tokenized_dataset=tokenized_dataset["test"],
#     id_to_label=id_to_label
# )

## Modifying the evaluation function to collect prediction data

In [None]:
def evaluate_model_with_visualizations(model, eval_dataset, tokenizer, id_to_label):
    """
    Evaluate model and collect data for visualizations.

    Parameters:
    -----------
    model : torch model
        The trained model
    eval_dataset : Dataset
        Evaluation dataset
    tokenizer : Tokenizer
        HuggingFace tokenizer
    id_to_label : dict
        Mapping from numeric label to author name

    Returns:
    --------
    dict
        Evaluation results including predictions and confidences
    """
    from torch.utils.data import DataLoader
    from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

    # Create dataloader
    dataloader = DataLoader(eval_dataset, batch_size=16)

    # Prediction setup
    device = model.device
    all_predictions = []
    all_labels = []
    all_confidences = []

    # Set model to evaluation mode
    model.eval()

    # Predict
    with torch.no_grad():
        for batch in dataloader:
            # Prepare inputs
            inputs = {k: v.to(device) for k, v in batch.items()
                     if k in ['input_ids', 'attention_mask']}
            labels = batch['labels'].to(device)

            # Forward pass
            outputs = model(**inputs)
            logits = outputs.logits

            # Get predictions
            preds = torch.argmax(logits, dim=1)

            # Get confidences (softmax probabilities)
            probs = torch.nn.functional.softmax(logits, dim=1)
            confidences = probs.max(dim=1)[0]

            # Store results
            all_predictions.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_confidences.extend(confidences.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions, average='macro', zero_division=0)
    precision = precision_score(all_labels, all_predictions, average='macro', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='macro', zero_division=0)

    # Print results
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")

    return {
        'accuracy': accuracy,
        'f1': f1,
        'precision': precision,
        'recall': recall,
        'predictions': all_predictions,
        'labels': all_labels,
        'confidences': all_confidences
    }

# Example usage:
# eval_results = evaluate_model_with_visualizations(
#     model=model,
#     eval_dataset=tokenized_dataset["test"],
#     tokenizer=tokenizer,
#     id_to_label=id_to_label
# )