In [None]:
from google.colab import files
uploaded = files.upload()

# Importing the CSV

In [None]:
import pandas as pd
df = pd.read_csv("email_dataset.csv")
df.head()

Unnamed: 0,email,category
0,Urgent: Your package delivery has failed. Clic...,spam
1,New post from your friend Alex on Instagram. S...,social
2,Exclusive offer: 30% off your next purchase. U...,promotional
3,"Hi sweetie, don't forget to pick up milk on yo...",personal
4,Your credit card payment is due in 3 days. Ple...,finance


# Preprocessing

In [None]:
# Manual word embeddings using Word2Vec
# from gensim.models import Word2Vec
# import re

# # Preprocessing function
# def preprocess_text(text):
#     # Convert to lowercase
#     text = text.lower()
#     # Remove unwanted characters (e.g., punctuation, numbers)
#     text = re.sub(r'[^a-z\s]', '', text)
#     # Tokenize the text into words
#     tokens = text.split()
#     return tokens

# # Apply preprocessing to the 'Text' column
# df['cleaned_text'] = df['Text'].apply(preprocess_text)

# # Train a Word2Vec model
# model = Word2Vec(sentences=df['cleaned_text'], vector_size=100, wisum(vectors) / len(vectors)ndow=5, min_count=1, workers=4)

# # Function to get average word vector for each email
# def get_average_word_vector(tokens, model):
#     vectors = [model.wv[word] for word in tokens if word in model.wv]
#     return sum(vectors) / len(vectors) if vectors else [0] * model.vector_size

# # Add a column with word embeddings for each email
# df['Embedding'] = df['cleaned_text'].apply(lambda x: get_average_word_vector(x, model))

# # Print embeddings for each class
# for email_class in df['Type'].unique():
#     print(f"Embeddings for {email_class}:")
#     print(df[df['Type'] == email_class]['Embedding'].values)
#     print("\n")

## Removing Extra Whitespace

In [None]:
df["email"] = df["email"].apply(lambda x: x.strip())
df['category'] = df["category"].apply(lambda x: x.strip())

df["category"].unique()

array(['spam', 'social', 'promotional', 'personal', 'finance',
       'important'], dtype=object)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Combine 'Subject' and 'Text' columns
# df['content'] = df['Subject'] + ' ' + df['Text']

# Replace missing data with empty string
df['email'].fillna('', inplace=True)

# Convert labels to numerical values
label_encoder = LabelEncoder()
df['category'] = label_encoder.fit_transform(df['category'])

df.head()

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['email'].fillna('', inplace=True)


Unnamed: 0,email,category
0,Urgent: Your package delivery has failed. Clic...,5
1,New post from your friend Alex on Instagram. S...,4
2,Exclusive offer: 30% off your next purchase. U...,3
3,"Hi sweetie, don't forget to pick up milk on yo...",2
4,Your credit card payment is due in 3 days. Ple...,0


# Train-Test Split

In [None]:
train_texts, val_texts, train_labels, val_labels = train_test_split(df['email'].values, df['category'].values, test_size=0.2)

# Tokenize text

In [None]:
from transformers import BertTokenizer

# Load the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the texts
train_encodings = tokenizer(train_texts.tolist(), truncation=True, padding=True, max_length=128)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding=True, max_length=128)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]



# Prepare Dataset for BERT

In [None]:
import torch

class EmailDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = EmailDataset(train_encodings, train_labels)
val_dataset = EmailDataset(val_encodings, val_labels)


# Fine-tune the model

In [None]:
# from transformers import BertForSequenceClassification, EarlyStoppingCallback, Trainer, TrainingArguments

# # Load the BERT model for classification
# model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(label_encoder.classes_))

# # Define training arguments
# training_args = TrainingArguments(
#     output_dir='./results',
#     evaluation_strategy="epoch",
#     save_strategy="epoch",
#     save_total_limit=2,
#     load_best_model_at_end=True,
#     per_device_train_batch_size=16,
#     per_device_eval_batch_size=16,
#     num_train_epochs=50,
#     weight_decay=0.01
#     # logging_dir='./logs',
#     # logging_steps=10
# )

# # Initialize the Trainer
# trainer = Trainer(
#     model=model,
#     args=training_args,
#     train_dataset=train_dataset,
#     eval_dataset=val_dataset,
#     callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
# )

# # Fine-tune the model
# trainer.train()


from sklearn.metrics import accuracy_score
from transformers import BertForSequenceClassification, EarlyStoppingCallback, Trainer, TrainingArguments
import numpy as np

# Load the BERT model for classification
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(label_encoder.classes_))

# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    load_best_model_at_end=True,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=50,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10
)

# Define a function to compute accuracy
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    accuracy = accuracy_score(labels, predictions)
    return {"accuracy": accuracy}

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,  # Pass the compute_metrics function
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

# Fine-tune the model
trainer.train()

# Evaluate the model to get the accuracy on the validation set
eval_results = trainer.evaluate()
print(f"Validation accuracy: {eval_results['eval_accuracy']}")

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Accuracy
1,0.3731,0.575528,0.838835


Epoch,Training Loss,Validation Loss,Accuracy
1,0.3731,0.575528,0.838835


# Evaluate the model

In [None]:
# Evaluate the model on the training set to get training accuracy
train_results = trainer.evaluate(train_dataset=train_dataset)
print(f"Training accuracy: {train_results['eval_accuracy']}")

# Evaluate the model on the validation set
val_results = trainer.evaluate()
print(f"Validation accuracy: {val_results['eval_accuracy']}")

# Evaluate the model on the test set
test_results = trainer.evaluate(test_dataset=val_dataset)
print(f"Test accuracy: {test_results['eval_accuracy']}")

# Test the model



In [None]:
import torch
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

def classify_email(subject, text):
    email_content = subject + ' ' + text
    encoding = tokenizer(email_content, return_tensors='pt', truncation=True, padding=True, max_length=128)

    # Move the input tensors to the same device as the model
    encoding = {key: val.to(device) for key, val in encoding.items()}

    # Forward pass to get the output logits
    output = model(**encoding)

    # Get the predicted class
    prediction = torch.argmax(output.logits, dim=1)

    return label_encoder.inverse_transform(prediction.detach().cpu().numpy())

subjects: list[str] = ["Free gift cards", "Hello Dear", "Your account has been compromised."]
texts: list[str] = ["You have won a free gift card. Click here to claim!", "I am stuck in Africa and I need your help.", "Kindly login and reclaim your account."]
predicted_classes = [classify_email(subject, text).item() for subject, text in zip(subjects, texts)]
print(predicted_classes)