In [None]:
# Import libraries
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format

# Import dataset
df = pd.read_csv('./Phishing_Email.csv')
df.head()

In [None]:
df.drop(df.columns[0], axis=1, inplace=True)

In [None]:
df.shape

In [None]:
df.isnull().sum()

In [None]:
df.dropna(inplace=True)

In [None]:
df.shape

In [None]:
df["Email Type"].value_counts()

In [None]:
df["Email Type"].value_counts().plot(kind="pie", autopct='%1.1f%%', ylabel='')

In [None]:
from imblearn.under_sampling import RandomUnderSampler

X = df.drop(columns=["Email Type"])
y = df["Email Type"]
texts_under, labels_under = RandomUnderSampler(random_state=100).fit_resample(X, y)
labels_under.value_counts().plot(kind="pie", autopct='%1.1f%%', ylabel='')

In [None]:
texts_under

In [None]:
texts_under["Word Count"] = texts_under["Email Text"].apply(lambda texto: len(texto.split(" ")))
texts_under["Word Count"].describe()

In [None]:
ax = texts_under["Word Count"].hist(range=[0, 1000])
ax.set_xlabel('Email Word Length')
ax.set_ylabel('Count')

In [None]:
from sklearn.preprocessing import LabelEncoder

encoded_labels = LabelEncoder().fit_transform(labels_under)
encoded_labels

In [None]:
import re

def parse_sentence(text: str):
    clean_text = ""
    for word in text.lower().split(" "):
        if word.startswith('@') and len(word) > 1:
            word = 'user'
        elif word.startswith('http'):
            word = 'http'
        clean_text += word + " "
    clean_text = re.sub(r'["#$%&\'()*+,\-./:;<=>@[\]^_`{|}~―]', '', clean_text)
    clean_text = clean_text.replace('  ', ' ')
    return clean_text[:-1]

texts_clean = texts_under["Email Text"].apply(lambda texto: parse_sentence(texto))
texts_clean.values

In [None]:
import nltk
from gensim.models import Word2Vec
from nltk.tokenize import word_tokenize

#nltk.download('punkt')  # Download the tokenizer models if not already downloaded
#tokenized_corpus = [word_tokenize(text.lower()) for text in texts_clean.values] # Lowercasing for consistency
#skipgram_model = Word2Vec(sentences=tokenized_corpus,
#                          vector_size=128,  # Dimensionality of the word vectors
#                          window=5,         # Maximum distance between the current and predicted word within a sentence
#                          sg=1,             # Skip-Gram model (1 for Skip-Gram, 0 for CBOW)
#                          min_count=1,      # Ignores all words with a total frequency lower than this
#                          workers=-1)        # Number of CPU cores to use for training the model
## Training
#skipgram_model.train([tokenized_corpus], total_examples=1, epochs=10)
#skipgram_model.save("skipgram_model.model")
loaded_model = Word2Vec.load("Models/word2vec/skipgram_model.model")

In [None]:
import numpy as np 
import torch

new_vocab_size = len(loaded_model.wv)
new_embedding_dim = loaded_model.vector_size

# Create an embedding matrix
embedding_matrix = np.zeros((new_vocab_size, new_embedding_dim))
word2index = {word: idx for idx, word in enumerate(loaded_model.wv.index_to_key)}

for word, idx in word2index.items():
    embedding_matrix[idx] = loaded_model.wv[word]

# Convert embedding matrix to a tensor
embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float32)

In [None]:
# Set random seeds for reproducibility
import random

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Set the seed
set_seed(100)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class TransformerModel(nn.Module):
    def __init__(self, embedding_dim, num_encoder_layers=2, nhead=4, dropout=0.3):
        super(TransformerModel, self).__init__()
        encoder_layer = TransformerEncoderLayer(d_model=embedding_dim, nhead=nhead, batch_first=True, dropout=dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        self.fc = nn.Linear(embedding_dim, 1)

    def forward(self, x):
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)
        return x.squeeze(1)


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import DataLoader, random_split

class TextDataset(Dataset):
    def __init__(self, texts, labels, word2index, embedding_matrix):
        self.texts = texts
        self.labels = labels
        self.word2index = word2index
        self.embedding_matrix = embedding_matrix
        self.max_len = 1024

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx].split()
        label = self.labels[idx]
        indices = [self.word2index.get(word, 0) for word in text]  # Use 0 for unknown words
        
        # Pad or truncate the sequence to max_len
        if len(indices) < self.max_len:
            indices = indices + [0] * (self.max_len - len(indices))
        else:
            indices = indices[:self.max_len]
            
        # Convert indices to embeddings
        embeddings = self.embedding_matrix[indices]
        
        return embeddings.clone().detach(), torch.tensor(label, dtype=torch.float32)

# Split the dataset into training and validation sets
dataset = TextDataset(texts_clean.values, encoded_labels, word2index, embedding_matrix)
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

In [None]:
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

for seed in range(5):
    model = TransformerModel(embedding_dim=embedding_matrix.shape[1])

    print(f'\nIteration: {seed}')
    # Define loss function and optimizer
    criterion = nn.BCEWithLogitsLoss()
    optimizer = Adam(model.parameters(), lr=0.001)
    scheduler = ReduceLROnPlateau(optimizer, mode='min' , factor=0.1, patience=4)

    # Training loop
    num_epochs = 20

    # Load model to device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Define early stopping parameters
    early_stopping_patience = 8
    best_val_loss = float('inf')
    patience_counter = 0

    generator = torch.Generator().manual_seed(seed)
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size], generator)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        for embeddings, label in train_loader:
            embeddings, label = embeddings.to(device), label.to(device)
            optimizer.zero_grad()
            outputs = model(embeddings)
            loss = criterion(outputs, label)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * embeddings.size(0)

        train_loss /= len(train_loader.dataset)

        # Validation loop
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for embeddings, label in val_loader:
                embeddings, label = embeddings.to(device), label.to(device)
                outputs = model(embeddings)
                loss = criterion(outputs, label)
                
                val_loss += loss.item() * embeddings.size(0)

        val_loss /= len(val_loader.dataset)
        scheduler.step(val_loss)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Save the best model
            torch.save(model.state_dict(), f'Models/best_custom_model_{seed}.pt')
        else:
            patience_counter += 1

        if patience_counter >= early_stopping_patience:
            print("Early stopping triggered")
            break

precision_list = []
recall_list = []
f1_list = []

for seed in range(5):
    # Load the best model
    model.load_state_dict(torch.load(f'Models/best_custom_model_{seed}.pt'))

    # Evaluation on test set
    from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc

    model.eval()
    all_labels = []
    all_preds = []
    all_probs = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).float()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Calculate metrics
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    precision_list.append(precision)
    recall_list.append(recall)
    f1_list.append(f1)

    # Print metrics
    print(f'\nIteration: {seed}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')

print('Overall:')
print(f'Average Precision: {np.mean(precision_list)}')
print(f'Average Recall: {np.mean(recall_list)}')
print(f'Average F1: {np.mean(f1_list)}')

# Plot confusion matrix
from matplotlib import pyplot as plt
import seaborn as sns

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Negative', 'Positive'], yticklabels=['Negative', 'Positive'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

# Plot ROC curve
fpr, tpr, _ = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

In [None]:
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

for seed in range(5):
    model = TransformerModel(embedding_matrix)

    # Load model to device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    print(f'\nIteration: {seed}')
    # Define loss function and optimizer
    criterion = nn.BCEWithLogitsLoss()
    optimizer = Adam(model.parameters(), lr=0.001)
    scheduler = ReduceLROnPlateau(optimizer, mode='min' , factor=0.1, patience=4)

    # Fast Gradient Sign Attack Function
    def fgsm_attack(model, data, target, epsilon):
        data = data.clone().detach()
        embedding = model.embedding(data).detach() # Get embeddings and detach from current graph
        embedding.requires_grad = True  # Now we require gradients for the embedding
        output = model(data)
        loss = criterion(output, target)
        model.zero_grad()
        loss.backward()
        data_grad = data.grad.data
        perturbed_data = data + epsilon * data_grad.sign()
        return perturbed_data

    # Training loop
    num_epochs = 20
    epsilon = 0.1

    # Define early stopping parameters
    early_stopping_patience = 8
    best_val_loss = float('inf')
    patience_counter = 0

    generator = torch.Generator().manual_seed(seed)
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size], generator)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        for embeddings, label in train_loader:
            embeddings, label = embeddings.to(device), label.to(device)
            optimizer.zero_grad()
            outputs = model(embeddings)
            loss = criterion(outputs, label)
            loss.backward()
            optimizer.step()

            # Generate adversarial examples and train on them
            data_adv = fgsm_attack(model, embeddings, label, epsilon)
            optimizer.zero_grad()
            output_adv = model(data_adv)
            loss_adv = criterion(output_adv, label)
            loss_adv.backward()
            optimizer.step()

            train_loss += (loss.item() + loss_adv.item()) * embeddings.size(0)

        train_loss /= len(train_loader.dataset)

        # Validation loop
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for embeddings, label in val_loader:
                embeddings, label = embeddings.to(device), label.to(device)
                outputs = model(embeddings)
                loss = criterion(outputs, label)
                val_loss += loss.item() * embeddings.size(0)

        val_loss /= len(val_loader.dataset)
        scheduler.step(val_loss)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Save the best model
            torch.save(model.state_dict(), f'Models/best_adversarial_custom_model_{seed}.pt')
        else:
            patience_counter += 1

        if patience_counter >= early_stopping_patience:
            print("Early stopping triggered")
            break

In [None]:
precision_list = []
recall_list = []
f1_list = []

for seed in range(5):
    # Load the best model
    model.load_state_dict(torch.load(f'Models/best_adversarial_custom_model_{seed}.pt'))

    # Evaluation on test set
    from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc

    model.eval()
    all_labels = []
    all_preds = []
    all_probs = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).float()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Calculate metrics
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    precision_list.append(precision)
    recall_list.append(recall)
    f1_list.append(f1)

    # Print metrics
    print(f'\nIteration: {seed}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')

In [None]:
print('Overall:')
print(f'Average Precision: {np.mean(precision_list)}')
print(f'Average Recall: {np.mean(recall_list)}')
print(f'Average F1: {np.mean(f1_list)}')

In [None]:
# Plot confusion matrix
from matplotlib import pyplot as plt
import seaborn as sns

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Negative', 'Positive'], yticklabels=['Negative', 'Positive'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# Plot ROC curve
fpr, tpr, _ = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()