# Spam Email Detector with Neural Network and BERT model
## with ~99.3% Accuracy on 164K email sample

In [None]:
import pandas as pd  
import torch  
import torch.nn as nn  
import torch.optim as optim 
from tqdm import tqdm 
from torch.utils.data import Dataset, DataLoader, random_split  
from sklearn.metrics import classification_report, accuracy_score  
from transformers import BertTokenizer, BertModel
# from transformers import DistilBertTokenizer, DistilBertModel
from torch.cuda.amp import GradScaler, autocast   

# Load the dataset  
data = pd.read_csv("Data/FullDataset.csv")  
print(data.head())  

# Preprocess the dataset   
X = data["text"].values  
y = data["label"].values  

# Load tokenizer for Bert  
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')   

# Custom dataset class  
class TextDataset(Dataset):  
    def __init__(self, texts, labels, tokenizer, max_length=128):  
        self.texts = texts  
        self.labels = labels  
        self.tokenizer = tokenizer  
        self.max_length = max_length  

    def __len__(self):  
        return len(self.texts)  

    def __getitem__(self, idx):  
        encoding = self.tokenizer.encode_plus(  
            self.texts[idx],   
            add_special_tokens=True,   
            max_length=self.max_length,   
            padding='max_length',   
            truncation=True,  
            return_tensors='pt'  
        )  

        # Return tensor inputs and label  
        return {  
            'input_ids': encoding['input_ids'].flatten(),  
            'attention_mask': encoding['attention_mask'].flatten(),  
            'labels': torch.tensor(self.labels[idx], dtype=torch.float32)   
        }  

# Creating the dataset  
dataset = TextDataset(X, y, tokenizer)  

# Split the dataset into train (90%) and test (10%)  
train_size = int(0.9 * len(dataset))   
test_size = len(dataset) - train_size   
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])  

# Create data loaders  
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)  
test_loader = DataLoader(test_dataset, batch_size=16)  

# Define the neural network using nn.Module  
class SpamDetectorNNWithBert(nn.Module):  
    def __init__(self):  
        super(SpamDetectorNNWithBert, self).__init__()  
        self.bert = BertModel.from_pretrained('bert-base-uncased') 
        # self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased') 
        self.linear = nn.Linear(768, 1)  # 768 is the output size from BERT  
        # self.sigmoid = nn.Sigmoid()  

    def forward(self, input_ids, attention_mask):  
        # Get BERT embeddings  
        outputs = self.bert(input_ids, attention_mask=attention_mask)  
        # Only take the output embeddings for the [CLS] token  
        cls_output = outputs.last_hidden_state[:, 0, :]  # Shape (batch_size, 768)  
        # return self.sigmoid(self.linear(cls_output))  # Output shape (batch_size, 1)  
        return self.linear(cls_output)  # Return logits directly

# Create the model  
model = SpamDetectorNNWithBert()  

# Move model to CUDA if available  
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")    
model.to(device)  

# Choose loss function and optimizer  
# criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=2e-5)  # Lower learning rate for BERT  
criterion = nn.BCEWithLogitsLoss()

# Function to train the model  
def train_model(model, train_loader, criterion, optimizer, num_epochs=10, accumulation_steps=4):  
    model.train()  
    scaler = GradScaler()  # Initialize the GradScaler for mixed precision 
    
    for epoch in range(num_epochs):  
        total_loss = 0  
        for batch in tqdm(train_loader):  
            input_ids = batch['input_ids'].to(device)  
            attention_mask = batch['attention_mask'].to(device)  
            labels = batch['labels'].to(device)  

            # optimizer.zero_grad()  
            # outputs = model(input_ids, attention_mask)  
            # loss = criterion(outputs.view(-1), labels)  
            # loss.backward()  
            # optimizer.step()
            
            with autocast():  # Enable mixed precision  
                # outputs = model(input_ids, attention_mask)  
                # loss = criterion(outputs.view(-1), labels)  
                logits = model(input_ids, attention_mask)
                loss = criterion(logits.view(-1), labels.view(-1))

            scaler.scale(loss).backward()  # Scale the loss and backpropagate  
            scaler.step(optimizer)  # Update the weights  
            scaler.update()  # Update the scale for the next iteration  

            total_loss += loss.item()  
        
        print(f'Epoch [{epoch+1}/{num_epochs}],  Avg. Loss: {total_loss/len(train_loader):.4f}')  

# Train the model
# try:  
train_model(model, train_loader, criterion, optimizer, num_epochs=2)
# except (OutOfMemoryError):
#     device = "cpu"
#     model.to(device)
#     train_model(model, train_loader, criterion, optimizer, num_epochs=2)

# Evaluate the model  
def evaluate_model(model, data_loader):  
    model.eval()  
    all_preds = []  
    all_labels = []  
    
    with torch.no_grad():  
        for batch in data_loader:  
            input_ids = batch['input_ids'].to(device)  
            attention_mask = batch['attention_mask'].to(device)  
            labels = batch['labels'].to(device)  

            outputs = model(input_ids, attention_mask)  
            preds = (outputs.squeeze() > 0.5).float()  # Convert probabilities to binary predictions  
            all_preds.extend(preds.cpu().numpy())  
            all_labels.extend(labels.cpu().numpy())  
    
    return all_labels, all_preds  

# Test the model  
y_test, y_pred = evaluate_model(model, test_loader)  

# Evaluate the model  
print("Accuracy:", accuracy_score(y_test, y_pred))  
print(classification_report(y_test, y_pred))

In [1]:
# Import necessary Libraries
import pandas as pd  
import torch  
import torch.nn as nn  
import torch.optim as optim 
from tqdm import tqdm 
from torch.utils.data import Dataset, DataLoader, random_split  
from sklearn.metrics import classification_report, accuracy_score  
from transformers import BertTokenizer, BertModel

In [2]:
# Load the dataset  
data = pd.read_csv("Data/FullDataset.csv")  
# See first 5 rows 
print(data.head())

                                                text  label
0  Subject: naturally irresistible your corporate...      1
1  Subject: the stock trading gunslinger  fanny i...      1
2  Subject: unbelievable new homes made easy  im ...      1
3  Subject: 4 color printing special  request add...      1
4  Subject: do not have money , get software cds ...      1


In [3]:
# Preprocess the dataset   
X = data["text"].values  
y = data["label"].values  

# Load tokenizer for Bert  
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [4]:
# Custom dataset class  
class TextDataset(Dataset):  
    def __init__(self, texts, labels, tokenizer, max_length=128):  
        self.texts = texts  
        self.labels = labels  
        self.tokenizer = tokenizer  
        self.max_length = max_length  # Max number of words in a senetence

    def __len__(self):  
        return len(self.texts)  

    def __getitem__(self, idx):  
        encoding = self.tokenizer.encode_plus(  # Encoding sentence
            self.texts[idx],   
            add_special_tokens=True,  # Contain Special tokens like [SEP]
            max_length=self.max_length,   
            padding='max_length',  # Fill letter sentece to have 128 word(token) with [PAD] 
            truncation=True,  # Cut big sentence to 128 token
            return_tensors='pt'  
        )  

        # Return tensor inputs and label  
        return {  
            'input_ids': encoding['input_ids'].flatten(),  
            'attention_mask': encoding['attention_mask'].flatten(),  
            'labels': torch.tensor(self.labels[idx], dtype=torch.float32)   
        }

In [5]:
# Creating the dataset  
dataset = TextDataset(X, y, tokenizer)

In [6]:
# Split the dataset into train (90%) and test (10%)  
train_size = int(0.9 * len(dataset))   
test_size = len(dataset) - train_size   
train_dataset, test_dataset = random_split(dataset, [train_size, test_size]) 

In [7]:
# Create data loaders  
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)  
test_loader = DataLoader(test_dataset, batch_size=16)  

In [8]:
# Define the neural network using nn.Module  
class SpamDetectorNNWithBert(nn.Module):  
    def __init__(self):  
        super(SpamDetectorNNWithBert, self).__init__()  
        self.bert = BertModel.from_pretrained('bert-base-uncased') 
        self.linear = nn.Linear(768, 1)  # 768 is the output size from BERT  

    def forward(self, input_ids, attention_mask):  
        # Get BERT embeddings  
        outputs = self.bert(input_ids, attention_mask=attention_mask)  
        # Only take the output embeddings for the [CLS] token  
        cls_output = outputs.last_hidden_state[:, 0, :]  # Get the CLS token output  
        return self.linear(cls_output)  # Return logits directly

In [9]:
# Create the model  
model = SpamDetectorNNWithBert()

In [10]:
# Move model to CUDA if available  
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")    
model.to(device)  

SpamDetectorNNWithBert(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, el

In [11]:
# Choose loss function and optimizer  
optimizer = optim.Adam(model.parameters(), lr=2e-5)
criterion = nn.BCEWithLogitsLoss()

In [14]:
# Function to train the model  
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):  
    model.train()  
    # scaler = GradScaler()  # Initialize the GradScaler for mixed precision 
    
    for epoch in range(num_epochs):  
        total_loss = 0  
        for batch in tqdm(train_loader):  
            input_ids = batch['input_ids'].to(device)  
            attention_mask = batch['attention_mask'].to(device)  
            labels = batch['labels'].to(device)  

            optimizer.zero_grad()  
            logits = model(input_ids, attention_mask)
            loss = criterion(logits.view(-1), labels.view(-1))
            loss.backward()  
            optimizer.step() 

            total_loss += loss.item()  
        
        print(f'Epoch [{epoch+1}/{num_epochs}],  Avg. Loss: {total_loss/len(train_loader):.4f}')  

In [15]:
train_model(model, train_loader, criterion, optimizer, num_epochs=2)

  attn_output = torch.nn.functional.scaled_dot_product_attention(
100%|██████████| 9251/9251 [2:27:48<00:00,  1.04it/s]  


Epoch [1/2],  Avg. Loss: 0.0382


100%|██████████| 9251/9251 [4:00:10<00:00,  1.56s/it]       

Epoch [2/2],  Avg. Loss: 0.0121





In [16]:
# Evaluate the model  
def evaluate_model(model, data_loader):  
    model.eval()  
    all_preds = []  
    all_labels = []  
    
    with torch.inference_mode():  
        for batch in data_loader:  
            input_ids = batch['input_ids'].to(device)  
            attention_mask = batch['attention_mask'].to(device)  
            labels = batch['labels'].to(device)  

            outputs = model(input_ids, attention_mask)  
            preds = (outputs.squeeze() > 0.5).float()  # Convert probabilities to binary predictions  
            all_preds.extend(preds.cpu().numpy())  
            all_labels.extend(labels.cpu().numpy())  
    
    return all_labels, all_preds

In [17]:
# Test the model  
y_test, y_pred = evaluate_model(model, test_loader)  

In [18]:
# Evaluate the model  
print("Accuracy:", accuracy_score(y_test, y_pred))  
print(classification_report(y_test, y_pred))

Accuracy: 0.9937982610810482
              precision    recall  f1-score   support

         0.0       0.99      0.99      0.99      8210
         1.0       0.99      0.99      0.99      8237

    accuracy                           0.99     16447
   macro avg       0.99      0.99      0.99     16447
weighted avg       0.99      0.99      0.99     16447



In [19]:
# Save model
from pathlib import Path

Path("Saved_model").mkdir(exist_ok=True)
# Save model's weight
torch.save(model.state_dict(), "Saved_model/Spam_Email_Detector_DeepNLP_BERT_PyTorch_Weight.pt")
# Save whole model with structure
torch.save(model, "Saved_model/Spam_Email_Detector_DeepNLP_BERT_PyTorch_WithStructure.pt")

in below cells, you can load saved model and predict with it

In [None]:
# Create model Structure and load saved weights
class SpamDetectorNNWithBert(nn.Module):  
    def __init__(self):  
        super(SpamDetectorNNWithBert, self).__init__()  
        self.bert = BertModel.from_pretrained('bert-base-uncased') 
        self.linear = nn.Linear(768, 1)

    def forward(self, input_ids, attention_mask):  
        # Get BERT embeddings  
        outputs = self.bert(input_ids, attention_mask=attention_mask)  
        cls_output = outputs.last_hidden_state[:, 0, :]
        return self.linear(cls_output)
    
model = SpamDetectorNNWithBert()
model.load_state_dict(torch.load("Saved_model/Spam_Email_Detector_DeepNLP_BERT_PyTorch_Weight.pt", weights_only=True))

In [None]:
# Load whole model with structure
model = torch.load("Saved_model/Spam_Email_Detector_DeepNLP_BERT_PyTorch_WithStructure.pt", weights_only=False)

In [None]:
# Predict With model
encoding = tokenizer.encode_plus(
text,   # Replace with your text data
add_special_tokens=True,
max_length=128,   
padding='max_length',
truncation=True,
return_tensors='pt'  
)  
   
model.eval()  
with torch.inference_mode():  
    outputs = model(encoding['input_ids'].to(device), encoding['attention_mask'].to(device))  
    preds = (outputs.squeeze() > 0.5).float()
