# EMAIL spam detection

## Imports + python version

In [None]:
import glob
import numpy as np
import email
from sklearn.model_selection import train_test_split
import re
import string
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS as sklearn_stop_words
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
from collections import defaultdict
import pandas as pd
import plotly.graph_objs as go
from plotly import tools
import plotly.offline as py
py.init_notebook_mode(connected=True)
from plotly import tools
from sklearn.linear_model import LogisticRegression
from transformers import (
    AutoModelForSequenceClassification,
    Seq2SeqTrainingArguments,
    DataCollatorForSeq2Seq,
    AutoModelForSeq2SeqLM,
    TrainingArguments,
    TrainerCallback,
    Seq2SeqTrainer,
    AutoTokenizer,
    Trainer,
)
from sentence_transformers.losses import CosineSimilarityLoss
from setfit import SetFitTrainer, SetFitModel
import evaluate
import numpy as np
from sklearn.ensemble import (
    RandomForestClassifier,
    AdaBoostClassifier,
    BaggingClassifier,
    ExtraTreesClassifier,
    GradientBoostingClassifier,
)
from sklearn.naive_bayes import GaussianNB, MultinomialNB, BernoulliNB
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from lightgbm import LGBMClassifier
from sklearn.svm import SVC
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import cross_validate

import copy
import time
import pandas as pd
import torch
import transformers
import scienceplots

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
!python3 --version

## 1. Data preprocessing

### 1.1. Download glove embedding if it not exists

!wget http://nlp.stanford.edu/data/glove.6B.zip

!unzip glove.6B.zip

!python -m gensim.scripts.glove2word2vec --input glove.6B.300d.txt --output glove.6B.300d.word2vec.txt

### 1.2. Getting ham/spam sample

In [None]:
hamdata = glob.glob("ham/*")
spamdata = glob.glob("spam/*")

In [None]:
hamdata[0:10]

In [None]:
spamdata[0:10]

In [None]:
def get_email_content(email_path: str) -> str:
    """
    Description:

    This function extracts the content from an email file located at the specified path. 

    # Args:

    * `email_path (str)`: The path to the email file.

    Returns:

    * `str`: The extracted email content as a string.

    Raises:

    * `FileNotFoundError`: If the email file is not found at the given path.
    * `IOError`: If there is an error reading the email file.

    # Example:

    >>> content = get_email_content("/path/to/email.eml")\n
    >>> print(content)
    """
    file = open(email_path,encoding='latin1')
    try:
        msg = email.message_from_file(file)
        for part in msg.walk():
            if part.get_content_type() == 'text/plain':
                return part.get_payload()
    except Exception as e:
        print(e)
        
        
def get_email_content_bulk(email_paths: list[str]) -> list[str]:
    """
    Description:

    This function extracts the content from multiple email files specified in a list of paths.

    # Args:

    * `email_paths (list[str])`: A list of paths to email files.

    Returns:

    * `list[str]`: A list containing the extracted content of each email file. 

    Raises:

    * `FileNotFoundError`: If any email file is not found at the given path.
    * `IOError`: If there is an error reading any email file.

    # Example:

    >>> email_paths = ["/path/to/email1.eml", "/path/to/email2.eml"]\n
    >>> contents = get_email_content_bulk(email_paths)\n
    >>> print(contents)
    """
    email_contents = [get_email_content(o) for o in email_paths]
    return email_contents

In [None]:
ham_path = [hamdata]
spam_path = [spamdata]

#### Getting ham samples

In [None]:
ham_sample = np.asarray([train_test_split(o, train_size=0.7, random_state=52, shuffle=True) for o in ham_path], dtype="object")

In [None]:
ham_train = np.array([])
ham_test = np.array([])
for o in ham_sample:
    ham_train = np.concatenate((ham_train,o[0]),axis=0)
    ham_test = np.concatenate((ham_test,o[1]),axis=0)

In [None]:
ham_train.shape, ham_test.shape

In [None]:
ham_train[0:10]

In [None]:
ham_test[0:10]

#### Getting spam samples

In [None]:
spam_sample = np.asarray([train_test_split(o) for o in spam_path], dtype = "object")

In [None]:
spam_train = np.array([])
spam_test = np.array([])
for o in spam_sample:
    spam_train = np.concatenate((spam_train,o[0]),axis=0)
    spam_test = np.concatenate((spam_test,o[1]),axis=0)

In [None]:
spam_train.shape, spam_test.shape

In [None]:
spam_train[0:10]

In [None]:
spam_test[0:10]

Creating x_train, y_train (paths to containing texts), x_test, y_test (corresponding labels to contents in x_train y_train respectively)

In [None]:
ham_train_label = [0]*ham_train.shape[0]
spam_train_label = [1]*spam_train.shape[0]
x_train = np.concatenate((ham_train,spam_train))
y_train = np.concatenate((ham_train_label,spam_train_label))

In [None]:
x_train

In [None]:
y_train

In [None]:
ham_test_label = [0]*ham_test.shape[0]
spam_test_label = [1]*spam_test.shape[0]
x_test = np.concatenate((ham_test,spam_test))
y_test = np.concatenate((ham_test_label,spam_test_label))

In [None]:
x_test

In [None]:
y_test

Shuffle data we got

In [None]:
train_shuffle_index = np.random.permutation(np.arange(0,x_train.shape[0]))
test_shuffle_index = np.random.permutation(np.arange(0,x_test.shape[0]))

In [None]:
train_shuffle_index

In [None]:
test_shuffle_index

In [None]:
x_train = x_train[train_shuffle_index]
y_train= y_train[train_shuffle_index]

In [None]:
x_train

In [None]:
y_train

In [None]:
x_test = x_test[test_shuffle_index]
y_test = y_test[test_shuffle_index]

In [None]:
x_test

In [None]:
y_test

Getting train and test texts

In [None]:
x_train = get_email_content_bulk(x_train)
x_test = get_email_content_bulk(x_test)

In [None]:
x_train[0:10]

In [None]:
x_test[0:10]

Removing nullables

In [None]:
def remove_null(datas: list[str],labels: list[int]) -> tuple[list[str], list[int]]:
    """
    Description: Removes elements from both lists where the corresponding label is None.

    # Args:
        datas (list[str]): A list of strings.
        labels (list[int]): A list of integers with the same length as `datas`.

    Returns:
        tuple[list[str], list[int]]: A tuple containing the updated lists of strings and labels.

    Raises:
        ValueError: If the lengths of `datas` and `labels` are not equal.

    # Example:

    >>> datas = ["apple", "banana", None, "orange"]
    >>> labels = [1, 1, 0, 1]
    >>> remove_null(datas, labels)
    (['apple', 'banana', 'orange'], [1, 1, 1])
"""
    not_null_idx = [i for i,o in enumerate(datas) if o is not None]
    return np.array(datas)[not_null_idx],np.array(labels)[not_null_idx]

In [None]:
x_train,y_train = remove_null(x_train,y_train)
x_test,y_test = remove_null(x_test,y_test)

### 1.3 Cleaning up with NLTK

In [None]:
def remove_hyperlink(word: str) -> str:
    """
    Description: Removes hyperlinks from a given string.

    # Args:
        word (str): The string potentially containing hyperlinks.

    Returns:
        str: The string with hyperlinks removed.

    # Example:

    >>> remove_hyperlink("Check out https://www.example.com!")
    'Check out !'
    """
    return  re.sub(r"http\S+", "", word)

def to_lower(word: str) -> str:
    """
    Description: Converts a string to lowercase.

    # Args:
        word (str): The string to convert.

    Returns:
        str: The lowercase version of the string.

    # Example:

    >>> to_lower("Hello WORLD")
    'hello world'
    """
    result = word.lower()
    return result

def remove_number(word: str) -> str:
    """
    Description: Removes numbers from a string.

    # Args:
        word (str): The string potentially containing numbers.

    Returns:
        str: The string with numbers removed.

    # Example:

    >>> remove_number("This is year 2023")
    'This is year '
    """
    result = re.sub(r'\d+', '', word)
    return result

def remove_punctuation(word: str) -> str:
    """
    Description: Removes punctuation marks from a string.

    # Args:
        word (str): The string potentially containing punctuation.

    Returns:
        str: The string with punctuation removed.

    # Example:

    >>> remove_punctuation("Hello, world!")
    'Hello world'
    """
    result = word.translate(str.maketrans(dict.fromkeys(string.punctuation)))
    return result

def remove_whitespace(word: str) -> str:
    """
    Description: Removes leading and trailing whitespaces from a string.

    # Args:
        word (str): The string potentially containing whitespaces.

    Returns:
        str: The string with leading and trailing whitespaces removed.

    # Example:

    >>> remove_whitespace("  hello   ")
    'hello'
    """
    result = word.strip()
    return result

def replace_newline(word: str) -> str:
    """
    Description: Replaces newline characters with spaces in a string.

    # Args:
        word (str): The string potentially containing newline characters.

    Returns:
        str: The string with newline characters replaced by spaces.

    # Example:

    >>> replace_newline("Hello\nWorld")
    'Hello World'
    """
    return word.replace('\n','')

def clean_up_pipeline(sentence: str) -> str:
    """
    Description: Applies a series of cleaning operations to a sentence.\n
    For now:
    >>> remove_hyperlink()
    >>> replace_newline()
    >>> to_lower()
    >>> remove_number()
    >>> remove_punctuation()
    >>> remove_whitespace()

    # Args:
        sentence (str): The sentence to clean.

    Returns:
        str: The cleaned sentence.

    # Example:

    >>> clean_up_pipeline("  HeLlO, wOrLd! 2023  ")
    'hello world'
    """ 
    cleaning_utils = [remove_hyperlink,
                      replace_newline,
                      to_lower,
                      remove_number,
                      remove_punctuation,
                      remove_whitespace]
    for o in cleaning_utils:
        sentence = o(sentence)
    return sentence

In [None]:
x_train = [clean_up_pipeline(o) for o in x_train]
x_test = [clean_up_pipeline(o) for o in x_test]

In [None]:
x_train[0:10]

In [None]:
x_test[0:10]

In [None]:
stemmer = PorterStemmer()
lemmatizer = WordNetLemmatizer()

In [None]:
x_train = [word_tokenize(o) for o in x_train]
x_test = [word_tokenize(o) for o in x_test]

In [None]:
def remove_stop_words(words: list[str]) -> list[str]:
    """
    Description: Removes stop words from a list of stop words from sklearn.

    # Args:
        words (list[str]): A list of words.

    Returns:
        list[str]: The list of words with stop words removed.

    # Example:

    >>> remove_stop_words(["the", "quick", "brown", "fox"])
    ['quick', 'brown', 'fox']
    """
    result = [i for i in words if i not in sklearn_stop_words]
    return result

def word_stemmer(words: list[str]) -> list[str]:
    """
    Description: Applies stemming to a list of words.

    # Args:
        words (list[str]): A list of words.

    Returns:
        list[str]: The list of stemmed words.

    # Example:

    >>> word_stemmer(["running", "jumps", "jumped"])
    ['run', 'jump', 'jump']
    """
    return [stemmer.stem(o) for o in words]

def word_lemmatizer(words: list[str]) -> list[str]:
    """
    Description: Applies lemmatization to a list of words.

    # Args:
        words (list[str]): A list of words.

    Returns:
        list[str]: The list of lemmatized words.

    # Example:

    >>> word_lemmatizer(["running", "better", "best"])
    ['run', 'good', 'good'] 
    """
    return [lemmatizer.lemmatize(o) for o in words]

def clean_token_pipeline(words: list[str]) -> list[str]:
    """
    Description: Applies a series of cleaning operations to a list of tokens.

    # Args:
        words (list[str]): A list of tokens.

    Returns:
        list[str]: The cleaned list of tokens.

    # Example:

    >>> clean_token_pipeline([" The ", "quick!", "ran.", "2023"])
    ['quick', 'ran']
    """
    cleaning_utils = [remove_stop_words,
                      word_lemmatizer]
    for o in cleaning_utils:
        words = o(words)
    return words

In [None]:
x_train = [clean_token_pipeline(o) for o in x_train]
x_test = [clean_token_pipeline(o) for o in x_test]

In [None]:
x_train = [" ".join(o) for o in x_train]
x_test = [" ".join(o) for o in x_test]

In [None]:
x_train[0:10]

In [None]:
x_test[0:10]

## 2. Visualization

In [None]:
def plot_wordcloud(text: str, mask: str=None, max_words: int=200, max_font_size: int=100, figure_size: tuple[int,int]=(24.0,16.0), 
                   title: str = None, title_size: int=40, image_color: str=False) -> None:
    """
    Description: Generates and displays a word cloud image.

    # Args:
        text (str): The text to generate the word cloud from.
        mask (str, optional): Path to an image mask. Defaults to None.
        max_words (int, optional): Maximum number of words in the cloud. Defaults to 200.
        max_font_size (int, optional): Maximum font size for words. Defaults to 100.
        figure_size (tuple[int,int], optional): Size of the figure. Defaults to (24.0,16.0).
        title (str, optional): Title for the word cloud. Defaults to None.
        title_size (int, optional): Font size of the title. Defaults to 40.
        image_color (str, optional): If True, the image will be converted to grayscale. Defaults to False.

    Returns:
        None

    # Example:

    >>> plot_wordcloud("This is an example text for word cloud generation")
    """
    stopwords = set(STOPWORDS)
    more_stopwords = {'one', 'br', 'Po', 'th', 'sayi', 'fo', 'Unknown'}
    stopwords = stopwords.union(more_stopwords)

    wordcloud = WordCloud(background_color='black',
                    stopwords = stopwords,
                    max_words = max_words,
                    max_font_size = max_font_size, 
                    random_state = 42,
                    width=800, 
                    height=400,
                    mask = mask)
    wordcloud.generate(str(text))
    
    plt.figure(figsize=figure_size)
    if image_color:
        image_colors = ImageColorGenerator(mask);
        plt.imshow(wordcloud.recolor(color_func=image_colors), interpolation="bilinear");
        plt.title(title, fontdict={'size': title_size,  
                                  'verticalalignment': 'bottom'})
    else:
        plt.imshow(wordcloud);
        plt.title(title, fontdict={'size': title_size, 'color': 'black', 
                                  'verticalalignment': 'bottom'})
    plt.axis('off');
    plt.tight_layout()  

In [None]:
spam_train_index = [i for i,o in enumerate(y_train) if o == 1]
non_spam_train_index = [i for i,o in enumerate(y_train) if o == 0]

In [None]:
spam_train_index[0:10]

In [None]:
non_spam_train_index[0:10]

In [None]:
spam_email = np.array(x_train)[spam_train_index]
non_spam_email = np.array(x_train)[non_spam_train_index]

In [None]:
plot_wordcloud(spam_email,title = 'Spam Email')

In [None]:
plot_wordcloud(non_spam_email, title="Non spam email")

In [None]:
## custom function for ngram generation ##
def generate_ngrams(text: str, n_gram: int=1) -> list[str]:
    """
    Description: Generates n-grams from a given text.

    # Args:
        text (str): The text to generate n-grams from.
        n_gram (int, optional): The value of n for the n-grams. Defaults to 1.

    Returns:
        list[str]: A list of n-grams. 

    # Example:

    >>> generate_ngrams("This is a sentence", 2)
    ['This is', 'is a', 'a sentence'] 
    """
    token = [token for token in text.lower().split(" ") if token != "" if token not in STOPWORDS]
    ngrams = zip(*[token[i:] for i in range(n_gram)])
    return [" ".join(ngram) for ngram in ngrams]

## custom function for horizontal bar chart ##
def horizontal_bar_chart(df: pd.DataFrame, color: str)-> go.Bar:
    """
    Description: Creates a horizontal bar chart using plotly.

    # Args:
        df (pd.DataFrame): The dataframe containing the data.
        color (str): The color for the bars.

    Returns:
        go.Bar: The plotly bar chart object.

    # Example:

    >>> horizontal_bar_chart(data, "blue")
    """
    trace = go.Bar(
        y=df["word"].values[::-1],
        x=df["wordcount"].values[::-1],
        showlegend=False,
        orientation = 'h',
        marker=dict(
            color=color,
        ),
    )
    return trace

In [None]:
def visualize_in_bar_chart(word_count: int=1) -> None:
    """
    Description: Visualizes word counts in a bar chart.

    # Args:
        word_count (int, optional): The number of words to include in the chart. Defaults to 1.

    Returns:
        None 

    # Example:

    >>> visualize_in_bar_chart(20) 
    """
    ## Get the bar chart from sincere questions ##
    freq_dict = defaultdict(int)
    for sent in non_spam_email:
        for word in generate_ngrams(sent,word_count):
            freq_dict[word] += 1
    fd_sorted = pd.DataFrame(sorted(freq_dict.items(), key=lambda x: x[1])[::-1])
    fd_sorted.columns = ["word", "wordcount"]
    trace0 = horizontal_bar_chart(fd_sorted.head(20), 'blue')

    ## Get the bar chart from insincere questions ##
    freq_dict = defaultdict(int)
    for sent in spam_email:
        for word in generate_ngrams(sent,word_count):
            freq_dict[word] += 1
    fd_sorted = pd.DataFrame(sorted(freq_dict.items(), key=lambda x: x[1])[::-1])
    fd_sorted.columns = ["word", "wordcount"]
    trace1 = horizontal_bar_chart(fd_sorted.head(20), 'blue')

    # Creating two subplots
    fig = tools.make_subplots(rows=1, cols=2, vertical_spacing=0.04,
                              subplot_titles=["Frequent words of non spam email", 
                                              "Frequent words of spam email"])
    fig.append_trace(trace0, 1, 1)
    fig.append_trace(trace1, 1, 2)
    fig['layout'].update(height=600, width=800, paper_bgcolor='rgb(233,233,233)', title="Word Count Plots")
    py.iplot(fig, filename='word-plots')

In [None]:
visualize_in_bar_chart(1)

In [None]:
spam_size = len(spam_train_index)
non_spam_size = len(non_spam_train_index)
total_train_size = spam_size + non_spam_size

trace = go.Bar(
    x=["Spam","Non Spam"],
    y=[spam_size, non_spam_size],
    marker=dict(
        color=[spam_size, non_spam_size],
        colorscale = 'Picnic',
        reversescale = True
    ),
)

layout = go.Layout(
    title='Target Count for Train Data',
    font=dict(size=18)
)

data = [trace]
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename="TargetCount")

## target distribution ##
labels = (np.array(["Spam", "Non Spam"]))
sizes = (np.array(([spam_size,non_spam_size]))/total_train_size*100)

trace = go.Pie(labels=labels, values=sizes)
layout = go.Layout(
    title='Train Data distribution',
    font=dict(size=18),
    width=600,
    height=600,
)
data = [trace]
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename="usertype")

In [None]:
spam_test_index = [i for i,o in enumerate(y_test) if o == 1]
non_spam_test_index = [i for i,o in enumerate(y_test) if o == 0]

spam_size = len(spam_test_index)
non_spam_size = len(non_spam_test_index)
total_test_size = spam_size + non_spam_size

trace = go.Bar(
    x=["Spam","Non Spam"],
    y=[spam_size, non_spam_size],
    marker=dict(
        color=[spam_size, non_spam_size],
        colorscale = 'Picnic',
        reversescale = True
    ),
)

layout = go.Layout(
    title='Target Count for Test Data',
    font=dict(size=18)
)

data = [trace]
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename="TargetCount")

## target distribution ##
labels = (np.array(["Spam", "Non Spam"]))
sizes = (np.array(([spam_size,non_spam_size]))/total_train_size*100)

trace = go.Pie(labels=labels, values=sizes)
layout = go.Layout(
    title='Test Data Distribution',
    font=dict(size=18),
    width=600,
    height=600,
)
data = [trace]
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename="usertype")

In [None]:
x_train = [o.split(" ") for o in x_train]
x_test = [o.split(" ") for o in x_test]

In [None]:
dict1 = {"label": y_train, "text": x_train}
dict2 = {"label": y_test, "text": x_test}

In [None]:
df1 = pd.DataFrame.from_dict(dict1)
df2 = pd.DataFrame.from_dict(dict2)

In [None]:
df = pd.concat([df1, df2])

In [None]:
df.to_csv("results/asd/data.csv")

### 2.2. Feature extraction defs (will be used in training)

Tokenize dataset

In [1]:
def tokenize(dataset, tokenizer):
    """
    Description: \n
    (For LLM models)\n
    Tokenizes a dataset using a specified tokenizer.\n
    (Because different LLM models works well with specific tokenizer)\n
    Without well selected tokenizer models can work in unpredictable form...\n
    And can cause a lot of loss/ degradation of perfomance/ etc...

    # Args:
        dataset: The dataset to be tokenized.
        tokenizer: The tokenizer object to use.

    Returns:
        The tokenized dataset.
    """

    def tokenization(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)

    def tokenization_t5(examples, padding="max_length"):
        
        # Add T5 prefix to the text
        text = ["classify as ham or spam: " +
                item for item in examples["text"]]

        # Tokenize text and labels
        inputs = tokenizer(text, max_length=tokenizer.model_max_length,
                           padding=padding, truncation=True)
        labels = tokenizer(
            text_target=examples["label"], max_length=max_label_length, padding=True, truncation=True)

        # Replace tokenizer.pad_token_id in the labels by -100 to ignore padding in the loss
        inputs["labels"] = [
            [(x if x != tokenizer.pad_token_id else -100) for x in label] for label in labels["input_ids"]
        ]
        return inputs

    if tokenizer is None:
        return dataset

    elif "T5" in type(tokenizer).__name__:
        # Extra step to convert our 0/1 labels into "ham"/"spam" strings
        dataset = dataset.map(
            lambda x: {"label": "ham" if x["label"] == 0 else "spam"})

        # Calculate the max label length after tokenization
        tokenized_label = dataset["train"].map(
            lambda x: tokenizer(x["label"], truncation=True), batched=True)
        max_label_length = max([len(x) for x in tokenized_label["input_ids"]])

        return dataset.map(tokenization_t5, batched=True, remove_columns=["label"])

    else:
        return dataset.map(tokenization, batched=True)

### 3.1. Training a lot of classifiers

#### Training defs

In [None]:
from sklearn.metrics import (
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    accuracy_score,
)

In [None]:
"""Running baselines. You can add yours below"""
MODELS = {
    "NB": (MultinomialNB(), 1000),
    "LR": (LogisticRegression(), 500),
    "KNN": (KNeighborsClassifier(n_neighbors=1), 150),
    "SVM": (SVC(kernel="sigmoid", gamma=1.0), 3000),
    "XGBoost": (XGBClassifier(learning_rate=0.01, n_estimators=150), 2000),
    "LightGBM": (LGBMClassifier(learning_rate=0.01, num_leaves=20), 3000),
    "Catboost": (CatBoostClassifier(learning_rate=0.01),2000),
    "RandomForestClassifier": (RandomForestClassifier(n_estimators=50, criterion="gini"),2000),
    "AdaBoostClassifier": (AdaBoostClassifier(n_estimators=100,learning_rate=0.1),2000),
    "BaggingClassifier": (BaggingClassifier(n_estimators=20),2000),
    "ExtraTreesClassifier": (ExtraTreesClassifier(n_estimators=100),2000),
    "GradientBoostingClassifier": (GradientBoostingClassifier(learning_rate=0.01),2000),
    "GaussianNB": (GaussianNB(var_smoothing=1e-10), 2000),
    "BernoulliNB": (BernoulliNB(alpha=0.1), 2000),
    "DecisionTreeClassifier": (DecisionTreeClassifier(criterion="gini", max_depth=20),2000),
}

"""SCORING parameters. You can add yours below (cohen kappa enjoyers=>)"""
SCORING = {
    "f1": f1_score,
    "precision": precision_score,
    "recall": recall_score,
    "accuracy": accuracy_score,
}

Transformer-related stuff

In [None]:
LLMS = {
    "RoBERTa": (
        AutoModelForSequenceClassification.from_pretrained(
            "roberta-base", num_labels=2
        ),
        AutoTokenizer.from_pretrained("roberta-base"),
    ),
    "SetFit-mpnet": (
        SetFitModel.from_pretrained("sentence-transformers/all-mpnet-base-v2"),
        None,
    ),
    "FLAN-T5-base": (
        AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base"),
        AutoTokenizer.from_pretrained("google/flan-t5-base"),
    ),
}

Additional functions (Utils):

In [None]:
import random
from pathlib import Path
import datasets

In [None]:
def train_val_test_split(df: pd.DataFrame, train_size=0.8, has_val: bool=True) -> tuple[tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], datasets.DatasetDict]:
    """
    Description: Splits a Pandas DataFrame into train, validation, and test sets. Also creates DatasetDict for training pytorch LLMs.

    # Args:
        df (pd.DataFrame): The DataFrame to split.
        train_size (float, optional): The proportion of data for the training set. Defaults to 0.8.
        has_val (bool, optional): Whether to create a validation set. Defaults to True.

    Returns:
        tuple: A tuple containing two elements:
            - tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Train, validation, and test DataFrames (if has_val is True).
            - datasets.DatasetDict: Datasets for LLM training =/
    """
    # Convert int train_size into float
    if isinstance(train_size, int):
        train_size = train_size / len(df)

    # Shuffled train/val/test split
    df = df.sample(frac=1, random_state=0)
    df_train, df_test = train_test_split(
        df, test_size=1 - train_size, stratify=df["label"]
    )

    if has_val:
        df_test, df_val = train_test_split(
            df_test, test_size=0.5, stratify=df_test["label"]
        )
        return (
            (df_train, df_val, df_test),
            datasets.DatasetDict(
                {
                    "train": datasets.Dataset.from_pandas(df_train),
                    "val": datasets.Dataset.from_pandas(df_val),
                    "test": datasets.Dataset.from_pandas(df_test),
                }
            ),
        )

    else:
        return (
            (df_train, df_test),
            datasets.DatasetDict(
                {
                    "train": datasets.Dataset.from_pandas(df_train),
                    "test": datasets.Dataset.from_pandas(df_test),
                }
            ),
        )

In [None]:
def set_seed(seed) -> None:
    """
    Description: Sets the random seed for reproducibility.

    # Args:
        seed (int): The seed value to set.

    Returns:
        None
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

In [None]:
def plot_scores(task_name: str, experiment: str, dataset_name: str) -> None:
    """
    Description: Plots scores for a specific task, experiment, and dataset.

    # Args:
        task_name (str): The name of the task.
        experiment (str): The name of the experiment.
        dataset_name (str): The name of the dataset.

    Returns:
        None
    """
    scores = pd.read_csv(f"outputs/csv/{experiment}.csv", index_col=0)

    x = np.arange(len(scores))
    width = 0.2

    # Plot
    fig, ax = plt.subplots(figsize=(12, 6))
    rects1 = ax.bar(x=x - width, height=scores["f1"], width=width, label="F1 score")
    rects2 = ax.bar(x=x, height=scores["precision"], width=width, label="Precision")
    rects3 = ax.bar(x=x + width, height=scores["recall"], width=width, label="Recall")

    ax.set_title(f"{dataset_name.upper()}")
    ax.set_ylabel("Score")
    ax.set_xticks(x, labels=scores.index, fontsize=6)
    plt.legend(bbox_to_anchor=(0.5, -0.25), loc="lower center", ncol=4)

    fig.tight_layout()

    Path(f"outputs/pdf/").mkdir(parents=True, exist_ok=True)
    Path(f"outputs/png/").mkdir(parents=True, exist_ok=True)

    plt.savefig(f"outputs/pdf/{experiment}.pdf", format="pdf")
    plt.savefig(f"outputs/png/{experiment}.png", format="png", dpi=300)
    plt.show()

In [None]:
def save_scores(task:str, experiment: str, index: str, values: dict) -> None:
    """
    Description: Saves scores for a task and experiment to a file.\n
    (P.S: Check llms and models lists and add corresponding to yours to save scores=))

    # Args:
        task (str): The name of the task.
        experiment (str): The name of the experiment.
        index (str): The index or identifier for the scores. 
        values (dict): A dictionary containing the scores to save.

    Returns:
        None
    """ 
    llms = [
        "BERT",
        "RoBERTa",
        "SetFit-MiniLM",
        "SetFit-mpnet",
        "FLAN-T5-small",
        "FLAN-T5-base",
    ]
    models = ["NB", "LR", "KNN", "SVM", "XGBoost", "LightGBM", "Catboost",
        "RandomForestClassifier", "AdaBoostClassifier", "BaggingClassifier", "ExtraTreesClassifier",
        "GradientBoostingClassifier","GaussianNB","BernoulliNB","DecisionTreeClassifier"]

    Path(f"outputs/csv/").mkdir(parents=True, exist_ok=True)

    file = Path(f"outputs/csv/{experiment}.csv")
    if file.is_file():
        scores = pd.read_csv(f"outputs/csv/{experiment}.csv", index_col=0)
        scores.loc[index] = values
    else:
        if index in llms:
            scores = pd.DataFrame(
                index=llms,
                columns=list(SCORING.keys()) + ["training_time", "inference_time"],
            )
        else:
            scores = pd.DataFrame(
                index=models,
                columns=list(SCORING.keys()) + ["training_time", "inference_time"],
            )
        scores.loc[index] = values

    scores.to_csv(f"outputs/csv/{experiment}.csv")

In [None]:
def plot_loss(experiment: str, dataset_name: str, model_name: str) -> None:
    """
    Description: Plots the LLM loss curve for a specific experiment, dataset, and model.

    # Args:
        experiment (str): The name of the experiment.
        dataset_name (str): The name of the dataset.
        model_name (str): The name of the model.

    Returns:
        None
    """ 
    log = pd.read_csv(f"outputs/csv/loss_{model_name}_{experiment}.csv")
    log = pd.DataFrame(log).iloc[:-1]

    train_losses = log["train_loss"].dropna().values
    eval_losses = log["eval_loss"].dropna().values
    x = np.arange(1, len(train_losses) + 1, step=1)

    with plt.style.context(["science", "high-vis"]):
        fig, ax = plt.subplots()
        plt.plot(x, train_losses, label="Training loss")
        plt.plot(x, eval_losses, label="Evaluation loss")

        ax.set_title(f"{model_name} ({dataset_name})")
        ax.set_xticks(x, labels=range(1, len(x) + 1))
        ax.set_xlabel("Epochs")
        ax.set_ylabel("Loss")
        ax.legend(loc="upper right")

        Path(f"outputs/pdf/").mkdir(parents=True, exist_ok=True)
        Path(f"outputs/png/").mkdir(parents=True, exist_ok=True)

        plt.savefig(f"outputs/pdf/loss_{model_name}_{experiment}.pdf", format="pdf")
        plt.savefig(
            f"outputs/png/loss_{model_name}_{experiment}.png", format="png", dpi=300
        )
        plt.show()

In [None]:
def get_dataset(name: str) -> pd.DataFrame:
    """
    Description: Retrieves a dataset by name.

    # Args:
        name (str): The name of the dataset.

    Returns:
        pd.DataFrame: The dataframe containing the dataset.
    """
    return pd.read_csv(f"results/{name}/data.csv").dropna()

Training baseline models

In [None]:
def encode_df(df: pd.DataFrame, encoder=None)-> tuple[list[int], list[int], any|None]:
    """
    Description: Encodes a dataframe using a provided encoder or infers the encoding.

    # Args:
        df (pd.DataFrame): The dataframe to encode.
        encoder (optional): The encoder to use. If None, infers the encoding.

    Returns:
        pd.DataFrame: The encoded dataframe.
    """
    if hasattr(encoder, "vocabulary_"):
        X = encoder.transform(df["text"]).toarray()
    else:
        X = encoder.fit_transform(df["text"]).toarray()
    y = df["label"].values
    return X, y, encoder

In [1]:
def train_baselines(seeds: list[int], datasets: list[str], task_name: str, test_set: str="test") -> None:
    """
    Description: Trains baseline models for a given task and set of datasets.

    # Args:
        seeds (list): A list of seed values for reproducibility. 
        datasets (list): A list of dataset names. 
        task_name (str): The name of the task. 
        test_set (str, optional): val for cv. test for evaluating on test set.

    Returns:
        None

    ## Example:
    >>> train_baselines(
        list(range(1)),
        datasets=["asd"],
        task_name="TrainBaselines",
        test_set = "test")
    """
    for seed in list(seeds):
        set_seed(seed)
    
        for dataset_name in list(datasets):
            # Create list of metrics
            scores = pd.DataFrame(
                index=list(MODELS.keys()),
                columns=list(SCORING.keys()) + ["training_time", "inference_time"],
            )

            df = get_dataset(dataset_name)
            (df_train, df_val, df_test), _ = train_val_test_split(
                    df, train_size=0.7, has_val=True
            )
                # Name experiment
            experiment = (
                f"ml_{test_set}_{task_name}_train_seed_{seed}"
            )

            # Cross-validate and test every model
            for model_name, (model, max_iter) in MODELS.items():
                encoder = TfidfVectorizer(max_features=max_iter)
                x_train, y_train, encoder = encode_df(df_train, encoder)
                x_test, y_test, encoder = encode_df(df_test, encoder)
                # Evaluate model with cross-validation
                if test_set == "val":
                    cv = cross_validate(
                        model,
                        x_train,
                        y_train,
                        scoring=list(SCORING.keys()),
                        cv=5,
                        n_jobs=-1,
                    )
                    for score_name, score_fn in SCORING.items():
                        scores.loc[score_name, model_name] = cv[
                            f"test_{score_name}"
                        ].mean()


                # Evaluate model on test set
                if test_set == "test":
                    start = time.time()
                    model.fit(x_train, y_train)
                    end = time.time()
                    scores.loc[model_name, "training_time"] = end - start

                    start = time.time()
                    y_pred = model.predict(x_test)
                    end = time.time()

                    scores.loc[model_name, "inference_time"] = end - start
                    for score_name, score_fn in SCORING.items():
                        scores.loc[model_name, score_name] = score_fn(
                            y_pred, y_test
                        )

                save_scores(
                    task_name, experiment, model_name, scores.loc[model_name].to_dict()
                )

            # Display scores
            plot_scores(task_name, experiment, "results")
            print(scores)

In [None]:
train_baselines(
    list(range(1)),
    datasets=["asd"],
    task_name="TrainBaselines",
    test_set = "test"
)

Training LLMs:

In [None]:
class EvalOnTrainCallback(TrainerCallback):
    """Custom callback to evaluate on the training set during training."""

    def __init__(self, trainer) -> None:
        super().__init__()
        self._trainer = trainer

    def on_epoch_end(self, args, state, control, **kwargs):
        if control.should_evaluate:
            control_train = copy.deepcopy(control)
            self._trainer.evaluate(
                eval_dataset=self._trainer.train_dataset, metric_key_prefix="train"
            )
            return control_train

In [None]:
def get_trainer(model, traindict, testdict, tokenizer=None):
    """
    Description: Creates a Hugging Face Trainer object for a given model and datasets.

    # Args:
        model: The Hugging Face model to train.
        traindict: A Hugging Face DatasetDict containing the training data.
        testdict: A Hugging Face DatasetDict containing the test data.
        tokenizer (optional): The tokenizer associated with the model.

    Returns:
        Trainer: A Hugging Face Trainer object configured for training.

    # Example:

    >>> trainer = get_trainer(model, train_dataset, test_dataset, tokenizer)
    """

    def compute_metrics(y_pred):
        """Computer metrics during training."""
        logits, labels = y_pred
        predictions = np.argmax(logits, axis=-1)
        return evaluate.load("f1").compute(
            predictions=predictions, references=labels, average="macro"
        )

    if type(model).__name__ == "SetFitModel":
        trainer = SetFitTrainer(
            model=model,
            train_dataset=traindict,
            eval_dataset=testdict,
            loss_class=CosineSimilarityLoss,
            metric="f1",
            batch_size=4,
            num_iterations=1,
            num_epochs=1,
        )
        return trainer

    elif "T5" in type(model).__name__ or "FLAN" in type(model).__name__:

        def compute_metrics_t5(y_pred, verbose=0):
            """Computer metrics during training for T5-like models."""
            predictions, labels = y_pred

            predictions = tokenizer.batch_decode(predictions, skip_special_tokens=True)

            # Replace -100 with pad_token_id to decode them
            labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
            labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

            predictions = [
                1 if "spam" in predictions[i] else 0 for i in range(len(predictions))
            ]
            labels = [1 if "spam" in labels[i] else 0 for i in range(len(labels))]

            result = evaluate.load("f1").compute(
                predictions=predictions, references=labels, average="macro"
            )
            return result

        data_collator = DataCollatorForSeq2Seq(
            tokenizer, model=model, label_pad_token_id=-100, pad_to_multiple_of=8
        )

        training_args = Seq2SeqTrainingArguments(
            output_dir="experiments",
            per_device_train_batch_size=4,
            per_device_eval_batch_size=4,
            learning_rate=1,
            num_train_epochs=1,
            predict_with_generate=True,
            fp16=False,
            evaluation_strategy="epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            save_total_limit=2,
        )

        trainer = Seq2SeqTrainer(
            model=model,
            args=training_args,
            train_dataset=traindict,
            eval_dataset=testdict,
            data_collator=data_collator,
            compute_metrics=compute_metrics_t5,
        )
        trainer.add_callback(EvalOnTrainCallback(trainer))
        return trainer

    else:
        training_args = TrainingArguments(
            output_dir="experiments",
            per_device_train_batch_size=8,
            per_device_eval_batch_size=4,
            learning_rate=1,
            num_train_epochs=1,
            evaluation_strategy="epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            save_total_limit=1,
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=traindict,
            eval_dataset=testdict,
            compute_metrics=compute_metrics,
        )
        trainer.add_callback(EvalOnTrainCallback(trainer))
        return trainer

In [None]:
def predict(trainer: transformers, model, dataset, tokenizer=None)-> list:
    """
    Description: Generates predictions on a given dataset using a trained model and trainer.

    # Args:
        trainer: The Hugging Face Trainer object used for training.
        model: The trained Hugging Face model.
        dataset: A Dataset containing the data for prediction.
        tokenizer (optional): The tokenizer associated with the model.

    Returns:
        list: A list of predictions for each example in the dataset.

    # Example:

    >>> predictions = predict(trainer, model, test_dataset, tokenizer)
    """
    if type(model).__name__ == "SetFitModel":
        return model(dataset["text"])

    elif "T5" in type(model).__name__:
        predictions = trainer.predict(dataset)
        predictions = tokenizer.batch_decode(
            predictions.predictions, skip_special_tokens=True
        )
        predictions = [
            "ham" if 0 in predictions[i] else "spam" for i in range(len(predictions))
        ]

        return predictions

    else:
        return trainer.predict(dataset).predictions.argmax(axis=-1)

In [None]:
def train_llms(seeds, datasets, task_name, test_set="test"):
    """Train all the large language models."""
    for seed in list(seeds):
        set_seed(seed)

        for dataset_name in datasets:
            print(dataset_name)
            # Get metrics
            scores = pd.DataFrame(
                index=list(LLMS.keys()),
                columns=list(SCORING.keys()) + ["training_time", "inference_time"],
            )

            df = get_dataset(dataset_name)
            _, dataset = train_val_test_split(
                    df, train_size=0.7, has_val=True
                )
            print("split succeded=)")
    
            # Name experiment
            experiment = (
                f"llm_{test_set}_train_seed_{seed}"
            )
    
            # Train, evaluate, test
            for model_name, (model, tokenizer) in LLMS.items():
                tokenized_dataset = tokenize(dataset, tokenizer)
                trainer = get_trainer(model, tokenized_dataset['train'], tokenized_dataset['val'], tokenizer)
                print(trainer.args)
                # Train model
                start = time.time()
                print("beginning of train")
                train_result = trainer.train()
                end = time.time()
                print(f"train ended after: {end}")
                scores.loc[model_name, "training_time"] = end - start
                if "SetFit" not in model_name:
                    log = pd.DataFrame(trainer.state.log_history)
                    log.to_csv(f"outputs/csv/loss_{model_name}_{experiment}.csv")
                    plot_loss(experiment, "EMAIL spam check", model_name)
    
                # Test model
                start = time.time()
                predictions = predict(
                    trainer, model, tokenized_dataset[test_set], tokenizer
                )
                end = time.time()
    
                for score_name, score_fn in SCORING.items():
                    scores.loc[model_name][score_name] = score_fn(
                        dataset[test_set]["label"], predictions
                    )
    
                scores.loc[model_name]["inference_time"] = end - start
                save_scores(
                    task_name, experiment, model_name, scores.loc[model_name].to_dict()
                )
    
            # Display scores
            plot_scores(task_name, experiment, task_name)
            print(scores)
    

In [None]:
train_llms(
    list(range(1)),
    datasets=["asd"],
    task_name="LLMTrainCheck",
)