In [None]:
# module imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import random
import re

# model imports
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

# processing imports
from sklearn.preprocessing import LabelEncoder
from string import punctuation
import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from termcolor import colored
nltk.download('stopwords')
stopwords.words("english")

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

import tensorflow as tf
from tensorflow.keras.utils import to_categorical

from keras.layers import Dense, LSTM, MaxPool1D, Flatten, Dropout, Conv1D, Activation, Embedding
from keras.models import Sequential
from keras.layers import Input
from keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv1D, Embedding, Dropout, Flatten, Bidirectional, SimpleRNN
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam

# import pipeline and SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from warnings import filterwarnings
filterwarnings('ignore')

In [None]:
trd = pd.read_csv('a.csv')
ted = pd.read_csv('CEAS_08.csv')
tcd = pd.read_csv('b.csv', encoding='utf-8', on_bad_lines='skip')

In [None]:
trd = trd.drop(columns=['Unnamed: 0'])
trd = trd.rename(columns = {"Email Text" : "Text", "Email Type" : "label"})
trd['label'].replace({'Safe Email': 0, 'Phishing Email': 1}, inplace=True)
trd.astype({'label': 'int64'}).dtypes
trd.info()
trd.head(100)

In [None]:
ted["Text"] = ted["subject"] + " " + ted["body"]
ted = ted.drop(columns=['sender', 'receiver', 'date', 'subject', 'body', 'urls'])
ted = ted[['Text', 'label']]
ted.info()
ted.head(100)

In [None]:
tcd["Text"] = tcd["subject"] + " " + tcd["body"]
tcd = tcd.drop(columns=['sender', 'receiver', 'date', 'subject', 'body', 'urls'])
tcd = tcd[['Text', 'label']]
tcd.info()
tcd.head(100)

Combined datasets (testing machines)

In [None]:
frames = [trd, ted, tcd]
data_1 = pd.concat(frames)
data_1

2 training dataset and 1 testing dataset (used for results)

In [None]:
frames = [trd, tcd]
data_2 = pd.concat(frames)
data_2

In [None]:
# Define the preprocessing function
def preprocess(Text):
    Text = re.sub(r'[^\w\s]', ' ', str(Text).lower()).strip()
    return Text

# Preprocess training and test data
data_2['Text'] = data_2['Text'].apply(preprocess)
ted['Text'] = ted['Text'].apply(preprocess)

# Initialize TfidfVectorizer
vectorizer = TfidfVectorizer(
    stop_words='english',
    ngram_range=(1, 1),
    max_df=1.0,
    min_df=1,
    max_features=6000
)

# Fit the vectorizer on the training data and transform both datasets
X_train_tfidf = vectorizer.fit_transform(data_2['Text']).toarray()
X_test_tfidf = vectorizer.transform(ted['Text']).toarray()

# Reshape for LSTM and CNN
X_train_tfidf = X_train_tfidf.reshape(X_train_tfidf.shape[0], X_train_tfidf.shape[1], 1)
X_test_tfidf = X_test_tfidf.reshape(X_test_tfidf.shape[0], X_test_tfidf.shape[1], 1)

# Convert labels to numpy array
y_train = np.array(data_2['label'])
y_test = np.array(ted['label'])

Model

In [None]:
# 1. ANN Model
def ann():
    model = Sequential(name="ANN")
    model.add(Dense(128, activation='relu', input_shape=(X_train_tfidf.shape[1],)))
    model.add(Dropout(0.5))
    model.add(Dense(32, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

#2. RNN Model
def rnn():
    model = Sequential(name="RNN")
    model.add(SimpleRNN(64, input_shape=(X_train_tfidf.shape[1], 1), return_sequences=False))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# 3. CNN Model
def cnn():
    model = Sequential(name="CNN")
    model.add(Conv1D(64, kernel_size=3, activation='relu', input_shape=(X_train_tfidf.shape[1], 1)))
    model.add(Dropout(0.5))
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# 4. CNN-LSTM Model
def cnn_lstm():
    model = Sequential(name="CNN-LSTM")
    model.add(Conv1D(64, kernel_size=3, activation='relu', input_shape=(X_train_tfidf.shape[1], 1)))
    model.add(Dropout(0.3))
    model.add(LSTM(64))
    model.add(Dropout(0.3))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# Train and evaluate each model
models = {
    'ANN': ann(),
    'RNN': rnn(),
    'CNN': cnn(),
    'CNN-LSTM': cnn_lstm()
}

Train the model

In [None]:
# Define function to evaluate the model
def evaluate_model(model, X_test, y_test):
    y_pred = (model.predict(X_test) > 0.5).astype("int32")
    print(classification_report(y_test, y_pred))

# Early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

    # Train and evaluate all models
for model, model in models.items():
    print(f"\nTraining {model} model...")

    # Fit the model and store the history object
    history = model.fit(
        X_train_tfidf, y_train,
        epochs=50,
        batch_size=64,
        validation_split=0.2,
        callbacks=[early_stopping]
    )

    # Evaluate model
    print(f"Evaluating {model} model...")
    evaluate_model(model, X_test_tfidf, y_test)

    # Calculate train accuracy
    trainScore = model.evaluate(X_train_tfidf, y_train, verbose=0)
    print("Our accuracy is %{}".format(trainScore[1] * 100))

Plotting the graph and confusion matrix

In [None]:
# Function to plot training and validation metrics
def plot_graphs(var1, var2, string, metrics):
    metrics[[var1, var2]].plot()
    plt.title('Model: Training and Validation ' + string)
    plt.xlabel('Number of epochs')
    plt.ylabel(string)
    plt.legend([var1, var2])
    plt.show()

# Function to plot confusion matrix
def plot_confusion_matrix(cm):
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    fig, ax = plt.subplots(figsize=(9, 7))
    disp.plot(ax=ax, cmap="viridis", colorbar=True)

    plt.title("Confusion Matrix")
    plt.show()

# Loop through each model
for model_name, model in models.items():
    # Print the model name for clarity
    print(f"\nEvaluating model: {model_name}\n{'='*40}")

    # Convert history to DataFrame for further analysis
    metrics = pd.DataFrame(history.history)

    # Rename columns to match desired format
    metrics.rename(columns={'loss': 'Training_Loss',
                            'accuracy': 'Training_Accuracy',
                            'val_loss': 'Validation_Loss',
                            'val_accuracy': 'Validation_Accuracy'}, inplace=True)

    # Print before plotting loss
    print(f"Plotting loss graphs for {model_name}")
    plot_graphs('Training_Loss', 'Validation_Loss', 'Loss', metrics)

    # Print before plotting accuracy
    print(f"Plotting accuracy graphs for {model_name}")
    plot_graphs('Training_Accuracy', 'Validation_Accuracy', 'Accuracy', metrics)

    # Predict and create confusion matrix
    y_pred = model.predict(X_test_tfidf)
    y_pred = (y_pred > 0.5)  # Adjust threshold if necessary
    cm = confusion_matrix(y_test, y_pred)

    # Print message before confusion matrix plot
    print(f"Plotting confusion matrix for {model_name}")
    plot_confusion_matrix(cm)


Machine Learning: Logistic Regression,
K-Neighbors,
Random Forest,
SVC

In [None]:
# Define the preprocessing function
def preprocess(Text):
    Text = re.sub(r'[^\w\s]', ' ', str(Text).lower()).strip()
    return Text

# Preprocess training and test data
data_2['Text'] = data_2['Text'].apply(preprocess)
ted['Text'] = ted['Text'].apply(preprocess)

# Initialize TfidfVectorizer
vectorizer = TfidfVectorizer(
    stop_words='english',
    ngram_range=(1, 1),
    max_df=1.0,
    min_df=1,
    max_features=5000
)

# Fit the vectorizer on the training data and transform both datasets
X_train_tfidf = vectorizer.fit_transform(data_2['Text']).toarray()
X_test_tfidf = vectorizer.transform(ted['Text']).toarray()

# Convert labels to numpy array
y_train = np.array(data_2['label'])
y_test = np.array(ted['label'])

In [None]:
# Initialize classifiers
models = {
    "SVC": SVC(),
    "Logistic Regression": LogisticRegression(max_iter=1000),
    "Random Forest": RandomForestClassifier(),
    "K-Neighbors": KNeighborsClassifier()
}

# Train, predict, and evaluate each model
for model_name, model in models.items():
    # Train the model
    model.fit(X_train_tfidf, y_train)

    # Predict on the test data
    y_pred = model.predict(X_test_tfidf)

    # Display classification report
    print(f"\n{model_name} Classification Report:\n", classification_report(y_test, y_pred))

    # Compute accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print(f"{model_name} Accuracy: {accuracy:.2f} %")

    # Compute confusion matrix
    conf_matrix = confusion_matrix(y_test, y_pred)

    # Plot confusion matrix
    plt.figure(figsize=(6, 5))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='coolwarm',
                xticklabels=['Normal', 'Phishing'], yticklabels=['Normal', 'Phishing'])
    plt.title(f"Confusion Matrix for {model_name}")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()
