In [None]:
!pip install -q transformers==4.51.3
!pip install -q kaggle==1.7.4.2
!pip install -q dill==0.3.8
!pip install -q datasets==3.5.0

### Let's import packages for data processing, BERT model, and analysis

In [None]:
import torch
import shutil
import pandas as pd
import numpy as np
from datasets import Dataset
from transformers import BertForSequenceClassification, BertTokenizerFast, Trainer, TrainingArguments
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

### Use Kaggle API token to download datasets for training

You can create an account in Kaggle and download the file kaggle.json into your local computer. Then you can upload it here.

In [None]:
from google.colab import files
files.upload()
!mkdir /.kaggle
!mv kaggle.json /.kaggle
!mv /.kaggle /root/
!chmod 600 ~/.kaggle/kaggle.json

### Let's download datasets
For training our BERT model we will use 3 sources:


*   Lingspam dataset
*   Spamassassin dataset
*   Sms-spam-collection



In [None]:
!kaggle datasets download -d mandygu/lingspam-dataset
!kaggle datasets download -d uciml/sms-spam-collection-dataset
!kaggle datasets download -d bertvankeulen/spamassassin-spam
!unzip lingspam-dataset.zip
!unzip spamassassin-spam.zip
!unzip sms-spam-collection-dataset.zip
!rm -rf lingspam-dataset.zip
!rm -rf sms-spam-collection-dataset.zip
!rm -rf spamassassin-spam.zip

### Functions for data processing

In [None]:
def load_model_and_tokenizer(model_path_or_name: str):
    """
    Load a BERT model and tokenizer for sequence classification.

    Parameters
    ----------
    model_path_or_name : str
        Path to a local model directory or the name of a pretrained model on Hugging Face hub.

    Returns
    -------
    model : BertForSequenceClassification
        The loaded BERT model for sequence classification.

    tokenizer : BertTokenizerFast
        The tokenizer associated with the model.
    """
    model = BertForSequenceClassification.from_pretrained(model_path_or_name)
    tokenizer = BertTokenizerFast.from_pretrained(model_path_or_name)
    return model, tokenizer

def process_data(data, tokenizer, column='message', padding=True,
                 truncation=True):
    """
    Tokenize input text data using the specified tokenizer.

    Parameters
    ----------
    data : dict
        Dictionary containing a key corresponding to `column` with text data.

    tokenizer : PreTrainedTokenizerFast
        Tokenizer to apply.

    column : str, optional
        Name of the text column to tokenize (default is 'message').

    padding : bool, optional
        Whether to pad sequences to the same length (default is True).

    truncation : bool, optional
        Whether to truncate sequences that are too long (default is True).

    Returns
    -------
    dict
        Tokenized output suitable for model input.
    """
    return tokenizer(data[column], padding=padding, truncation=truncation)

def removeUrlAndHtml(df, column='message'):
    """
    Remove URLs and HTML tags from a text column in a DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame.

    column : str, optional
        Name of the column containing text to clean (default is 'message').

    Returns
    -------
    pd.DataFrame
        DataFrame with cleaned text.
    """
    regexes = [
        r"https?://\S+|www\.\S+", r"<[^>]>"
    ]

    for reg in regexes:
        df[column] = df[column].str.replace(reg, "", regex=True)
    return df

def preprocess_data(file_mapping, test_size=0.3):
    """
    Load, clean, and split multiple CSV datasets into train and test datasets.

    Parameters
    ----------
    file_mapping : dict
        Mapping of file paths to configuration dictionaries. Each configuration may contain:
            - 'encoding': File encoding.
            - 'columns': Mapping of original to new column names.
            - 'map': Optional. Mapping of values in specific columns.
            - 'regex': Optional. Regex replacements per column.

    test_size : float, optional
        Proportion of data to reserve for testing (default is 0.3).

    Returns
    -------
    train_dataset : Dataset
        The training subset of the dataset.

    test_dataset : Dataset
        The testing subset of the dataset.
    """
    df_concat = pd.DataFrame(columns=['message','label'])
    for file, conf in file_mapping.items():
        df = pd.read_csv(file, encoding=conf['encoding'], on_bad_lines='skip',
                          engine='python')
        df.rename(columns=conf['columns'], inplace=True)

        if 'map' in conf:
            for column, value_mapping in conf['map'].items():
                df[column] = df[column].map(value_mapping)
        if 'regex' in conf:
            for column, regex_exp in conf['regex'].items():
                df[column] = df[column].str.replace(regex_exp, "", regex=True)

        print(f"Dataset {file} has {len(df)} records.")
        df_concat = pd.concat([df_concat, df[list(conf['columns'].values())]])

    print(f"Total amount of records in consolidated dataset is: \
        {len(df_concat)}")

    # Let's remove urls and html tags
    df_concat = removeUrlAndHtml(df_concat)
    print(f"Total amount of records in consolidated dataset is after cleaning \
    urls and html tags: {len(df_concat)}")

    dataset = Dataset.from_pandas(df_concat)
    dataset = dataset.train_test_split(test_size=test_size)
    return dataset['train'], dataset['test']

def prepare_dataset(dataset, tokenizer):
    """
    Apply tokenization and formatting to a Hugging Face dataset.

    Parameters
    ----------
    dataset : Dataset
        Hugging Face dataset to be tokenized.

    tokenizer : PreTrainedTokenizerFast
        Tokenizer to apply.

    Returns
    -------
    Dataset
        Tokenized dataset formatted for PyTorch.
    """
    dataset = dataset.map(lambda x: process_data(x, tokenizer), batched=True,
                          batch_size=len(dataset))
    dataset.set_format(type='torch', columns=['input_ids', 'attention_mask',
                                              'label'])
    return dataset

### Functions for training

In [None]:
def train(model, train_dataset, output_dir='./results', log_dir='./logs',
          epochs=10, batch_size=16):
    """
    Fine-tune a Hugging Face model using the Trainer API.

    Parameters
    ----------
    model : PreTrainedModel
        The Hugging Face model to be trained.

    train_dataset : Dataset
        The training dataset (Hugging Face `datasets.Dataset` object).

    output_dir : str, optional
        Directory to save model checkpoints and final model (default is './results').

    log_dir : str, optional
        Directory to store training logs for TensorBoard or other logging tools (default is './logs').

    epochs : int, optional
        Number of training epochs (default is 10).

    batch_size : int, optional
        Batch size per device during training (default is 16).

    Returns
    -------
    Trainer
        The Hugging Face `Trainer` object after training is completed.
    """
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=epochs,
        per_device_train_batch_size=batch_size,
        warmup_steps=500,
        weight_decay=0.01,
        eval_strategy="no",
        logging_dir=log_dir,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
    )

    trainer.train()

    return trainer

### Functions for evaluation

In [None]:
def evaluate(trainer, test_dataset):
    """
    Evaluate a trained model on a test dataset and display classification metrics.

    Parameters
    ----------
    trainer : Trainer
        Hugging Face Trainer object that has been trained.

    test_dataset : Dataset
        The dataset to evaluate on (must contain a 'label' column).

    Returns
    -------
    None
        Prints the classification report and displays the confusion matrix plot.
    """
    predictions = trainer.predict(test_dataset)
    predicted_labels = predictions.predictions.argmax(axis=-1)
    true_labels = test_dataset['label'].numpy()

    print(classification_report(true_labels, predicted_labels))

    cm = confusion_matrix(true_labels, predicted_labels)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["HAM", "SPAM"])
    disp.plot()

### Main Workflow

This function orchestrates the full text classification pipeline using BERT.

It performs the following steps based on the `train_new_model` flag:

1. **Load and preprocess data** from multiple CSV sources using a configuration dictionary (`file_mapping`) that specifies column mappings, encodings, optional label mappings, and regex cleaning rules.

2. If `train_new_model=True`:
 - Loads a pretrained BERT model and tokenizer (default: 'bert-base-uncased').
 - Preprocesses and tokenizes the datasets.
 - Trains the model using Hugging Face's `Trainer`.
 - Saves the trained model and tokenizer to the `./saved_model/` directory.
 - Compresses the model directory into a `.zip` file and downloads it.

3. If `train_new_model=False`:
 - Loads a previously saved model and tokenizer from `./saved_model/`.
 - Prepares the test set for evaluation only.

4. **Evaluation**:
 - Runs the model on the test dataset.
 - Prints a classification report (precision, recall, F1-score).
 - Displays a confusion matrix plot for HAM vs SPAM classification.

Usage:
-------
- To train a new model from scratch and save it:
 main(train_new_model=True)

- To skip training and only evaluate a saved model:
 main(train_new_model=False)

In [None]:
def main(train_new_model=True):
    file_mapping = {
        './messages.csv':
        {
            'columns': {
                'label': 'label',
                'message': 'message',
            },
            'encoding': 'utf-8',
        },
        './spam.csv':
        {
            'columns': {
                'v1': 'label',
                'v2': 'message',
            },
            'encoding': 'latin-1',
            'map': {
                'label': {
                    'ham': 0,
                    'spam': 1,
                }
            }
        },
        './SA_SubTxt_fn.csv':
        {
            'columns': {
                'label': 'label',
                'data': 'message',
            },
            'encoding': 'utf-8',
            'regex': {
                'message': r"^\[|\]$"
            },
        }
    }

    if train_new_model:

        test_size = 0.3
        model_path = 'bert-base-uncased'  # Pretrained model base
        model, tokenizer = load_model_and_tokenizer(model_path)

        train_set, test_set = preprocess_data(file_mapping, test_size)

        train_set = prepare_dataset(train_set, tokenizer)
        test_set = prepare_dataset(test_set, tokenizer)

        trainer = train(model, train_set)

        # Save model and tokenizer
        trainer.save_model('./saved_model')
        tokenizer.save_pretrained('./saved_model')

        # Zip the result directory
        shutil.make_archive('saved_model', 'zip', './saved_model')
        files.download('saved_model.zip')

    else:
        # Load already saved model
        model, tokenizer = load_model_and_tokenizer('./saved_model')

        _, test_set = preprocess_data(file_mapping)
        test_set = prepare_dataset(test_set, tokenizer)

        trainer = Trainer(model=model)  # No need for training arguments when just evaluating

    # Evaluate in both cases
    evaluate(trainer, test_set)
main(False)