This is where we upload the data set and import it in the Pandas data frame

In [1]:
import pandas as pd
import csv
import sys
import re
# Increase field size limit to account for longer emails
csv.field_size_limit(sys.maxsize)

email_df = pd.read_csv("/kaggle/input/phishingemails/Phishing_Email.csv", engine='python')
sms_df = pd.read_csv("/kaggle/input/sms-spam-collection-dataset/spam.csv", encoding='ISO-8859-1')

Text Pre-processing

In [2]:
def clean_text(text):
    text = re.sub(r'<[^>]+>', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

# ----------------------------------------
# Cleaning up email and SMS data
# ----------------------------------------
sms_df = sms_df.rename(columns={'v1': 'label_text', 'v2': 'text'})

# Drop the 'Unnamed: 0' columns
if 'Unnamed: 0' in email_df.columns:
    email_df = email_df.drop(columns=['Unnamed: 0'])

# Drop rows with missing data
email_df = email_df.dropna(subset=['Email Text', 'Email Type'])
sms_df = sms_df.dropna(subset=['text', 'label_text'])

# Clean text
email_df['text'] = email_df['Email Text'].apply(lambda x: clean_text(str(x)))
sms_df['text'] = sms_df['text'].apply(lambda x: clean_text(str(x)))

# Lowercase and remove extra whitespaces in labels (not text)
email_df['label_text'] = email_df['Email Type'].str.lower().str.strip()
sms_df['label_text'] = sms_df['label_text'].str.lower().str.strip()

# Filter for phishing/safe messages only
email_df = email_df[email_df['label_text'].isin(['phishing email', 'safe email'])]
sms_df = sms_df[sms_df['label_text'].isin(['spam', 'ham'])]

# Rename labels to 'phishing' and 'safe'
email_df['label_text'] = email_df['label_text'].map({
    'phishing email': 'phishing',
    'safe email': 'safe'
})

sms_df['label_text'] = sms_df['label_text'].map({
    'spam': 'phishing',
    'ham': 'safe'
})

# Convert text labels to binary labels
email_df['label'] = email_df['label_text'].apply(lambda x: 1 if x == 'phishing' else 0)
sms_df['label'] = sms_df['label_text'].apply(lambda x: 1 if x == 'phishing' else 0)

# Add source column
email_df['source'] = 'email'
sms_df['source'] = 'sms'

# Select consistent columns and reset index
email_df = email_df[['text', 'label_text', 'label', 'source']].reset_index(drop=True)
sms_df = sms_df[['text', 'label_text', 'label', 'source']].reset_index(drop=True)

# Combine Both Datasets
combined_df = pd.concat([email_df, sms_df], ignore_index=True)

# Print dataset size and class distribution
print(f"Combined dataset: {len(combined_df)}")
print(combined_df['label_text'].value_counts())

print(combined_df.head())

Combined dataset: 24206
label_text
safe        16147
phishing     8059
Name: count, dtype: int64
                                                text label_text  label source
0  re : 6 . 1100 , disc : uniformitarianism , re ...       safe      0  email
1  the other side of * galicismos * * galicismo *...       safe      0  email
2  re : equistar deal tickets are you still avail...       safe      0  email
3  Hello I am your hot lil horny toy. I am the on...   phishing      1  email
4  software at incredibly low prices ( 86 % lower...   phishing      1  email


Tokenizer: Turns the data into a readable form for the computer

In [3]:
from transformers import BertTokenizer, BertModel
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from tqdm.auto import tqdm


# Set a random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

In [4]:
class ScamDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=100):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        #Tokenize text using the tokenizer from HuggingFace
        encoding = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        # Extract the input_ids (token ids) from the encoding
        token_ids = encoding['input_ids'].squeeze()

        return {
            'input_ids': token_ids,
            'label': torch.tensor(label, dtype=torch.float)
        }

[RNN Classifier](https://www.analyticsvidhya.com/blog/2019/01/sequence-models-deeplearning/):

*   Embedding dimension: Vector representation of words (or tokens) in continuous space. They capture the meaning of words ina  way the model can understand
 *   This is what we are updating every pass
*   Hidden size: Controls model's capacity (more neurons = more powerful)
*   Number of layers: Adds depth and abstraction (typically 1-2)
*   Bidirectional: Allows models to view context from past and future
*   Batch first: Confirms that input/output tensors are in (batch, seq, feature) format
*   Dropout: Regularization to prevent overfitting. Basically it randomly drops a fraction of the neurons during training in order to force the network to learn redundant representations. This way the model does memorize the training data and overfit.

In [5]:
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_classes, num_layers, dropout=0.5, pretrained=None):
        super(RNNClassifier, self).__init__()
        if pretrained is not None:
            print("Using pretrained models")
            self.embedding = nn.Embedding.from_pretrained(pretrained, freeze=False) # Turn freeze=True if you dont want to update embeddings
        else:
            self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # We will be using the Long Short-Term Memory (LSTM) RNN
        self.rnn = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        self.fc = nn.Linear(hidden_size * 2, num_classes) # We are times it by 2 due to bidirectionality
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Converts token ids into word vectors
        embedded = self.embedding(x)

        # Pass through bidirection LSTM
        output, (hidden, cell) = self.rnn(embedded) # Shape: (batch_size, seq_len, hidden_size * 2)

        # Concatenate the final hidden states in both direction
        # -2 = last forward layer, -1 = last backward layer
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)

        # Apply dropout and final linear layer
        hidden = self.dropout(hidden)
        out = self.fc(hidden) # Final prediction

        return out

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


Training and validation

In [7]:
from tqdm.auto import tqdm

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        # Training Phase
        model.train()
        train_loss = 0.0

        train_progress = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]')
        for batch in train_progress:
            texts = batch['input_ids'].to(device)
            labels = batch['label'].to(device)

            outputs = model(texts)
            loss = criterion(outputs.squeeze(), labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_progress.set_postfix(loss=f'{loss.item():.4f}')

        # Validation Phase
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            val_progress = tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Valid]')
            for batch in val_progress:
                texts = batch['input_ids'].to(device)
                labels = batch['label'].to(device)

                outputs = model(texts)
                loss = criterion(outputs.squeeze(), labels)
                val_loss += loss.item()
                val_progress.set_postfix(loss=f'{loss.item():.4f}')

        # -----------------------
        # Log Average Losses
        # -----------------------
        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)

        print(f"Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")


Run the model

In [None]:
from sklearn.model_selection import train_test_split
def main():
    # Load the tokenizer
    #tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    tokenizer = AutoTokenizer.from_pretrained('Snowflake/snowflake-arctic-embed-s')

    # Hyperparameters
    batch_size = 32
    vocab_size = tokenizer.vocab_size
    embedding_dim = 128
    hidden_size = 64
    num_classes = 1
    num_layers = 2
    dropout = 0.5
    epochs = 100
    learning_rate = 0.001

    # Train-validation split
    train_df, val_df = train_test_split(
        combined_df,
        test_size=0.2,
        stratify=combined_df['label'],
        random_state=42
    )

    #Prepare Datasets
    train_dataset = ScamDataset(train_df['text'].tolist(), train_df['label'].tolist(), tokenizer)
    val_dataset = ScamDataset(val_df['text'].tolist(), val_df['label'].tolist(), tokenizer)

    # Data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize Model
    model = RNNClassifier(vocab_size, embedding_dim, hidden_size, num_classes, num_layers, dropout)
    model = model.to(device)

    # Loss function
    criterion = nn.BCEWithLogitsLoss()

    # Optimizer
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Train the model
    print("Training the model")
    train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=epochs)

    # Save the model
    torch.save(model.state_dict(), 'tos_classifier_model.pt')
    print("\nModel saved to 'tos_classifier_model.pt'")


if __name__ == "__main__":
    main()

tokenizer_config.json:   0%|          | 0.00/1.43k [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/712k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/695 [00:00<?, ?B/s]

Training the model


Epoch 1/100 [Train]:   0%|          | 0/606 [00:00<?, ?it/s]

Epoch 1/100 [Valid]:   0%|          | 0/152 [00:00<?, ?it/s]

Epoch [1/100] - Train Loss: 0.3081 - Val Loss: 0.1804


Epoch 2/100 [Train]:   0%|          | 0/606 [00:00<?, ?it/s]

Epoch 2/100 [Valid]:   0%|          | 0/152 [00:00<?, ?it/s]

Epoch [2/100] - Train Loss: 0.1654 - Val Loss: 0.1541


Epoch 3/100 [Train]:   0%|          | 0/606 [00:00<?, ?it/s]

Epoch 3/100 [Valid]:   0%|          | 0/152 [00:00<?, ?it/s]

Epoch [3/100] - Train Loss: 0.0911 - Val Loss: 0.1176


Epoch 4/100 [Train]:   0%|          | 0/606 [00:00<?, ?it/s]

Try it yourself

In [None]:
def predict_message(model, tokenizer, text, device, max_length=512):
    model.eval()
    with torch.no_grad():
        encoding = tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=max_length,
            return_tensors='pt'
        )
        input_ids = encoding['input_ids'].to(device)
        output = model(input_ids).squeeze()
        prob = torch.sigmoid(output).item()
        return prob

def test_model():
    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained('Snowflake/snowflake-arctic-embed-s')

    # Model parameters (must match training)
    vocab_size = tokenizer.vocab_size
    embedding_dim = 128
    hidden_size = 64
    num_classes = 1
    num_layers = 2
    dropout = 0.5

    # Initialize model
    model = RNNClassifier(vocab_size, embedding_dim, hidden_size, num_classes, num_layers, dropout)
    model.load_state_dict(torch.load('tos_classifier_model.pt', map_location=device))
    model = model.to(device)

    while True:
        user_input = input("\nPaste an scam message (or type 'quit' to exit):\n> ")
        if user_input.lower() == 'quit':
            break
        score = predict_message(model, tokenizer, user_input, device)
        if score > 0.5:
            print(f"⚠️ Scam Likely! Confidence: {score:.2%}")
        else:
            print(f"✅ Looks Safe. Confidence: {100 - score * 100:.2f}%")

test_model()