# Pre-Processing

In [34]:
from google.colab import drive
from google.colab import files

drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [35]:
import pandas as pd
import re
import csv
import sys
import uuid
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer # Bag of Words: Term Frequencey and TFIDF
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import f1_score

# Regex Expressions
REGEX_PATTERNS = {
    'currency': r'[$€£¥]\s*\d+(?:[.,]\d+)?|\d+(?:[.,]\d+)?\s*(?:USD|EUR|GBP|JPY|CAD|AUD|CHF)',

    'time': r'\b(?:[01]?\d|2[0-3]):[0-5]\d(?::[0-5]\d)?(?:\s*[aApP][mM])?\b',

    'day' : r'(?i) (sun|mon|tue(s)?|wed(nesday)?|thu(r(s)?)?|fri)(day|\.)? ', # note the spaces at the beginning and end

    'date': r'(?i)\b(?:\d{1,2}[-\/\.]\d{1,2}[-\/\.]\d{2,4}|\d{4}[-\/\.]\d{1,2}[-\/\.]\d{1,2}|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2}(?:[a-z]{2})?,?\s+\d{2,4}|\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*,?\s+\d{2,4}|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2}(?:\s?[—-]\s?\d{1,2})?)\b',

    'phone': r'^(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]\d{3}[\s.-]\d{4}$',

    'percentage': r'\b(?<!\.)(?!0+(?:\.0+)?%)(?:\d|[1-9]\d|100)(?:(?<!100)\.\d+)?%',

    'number': r'\b\d+(?:[.,]\d+)?\b',

    'email': r'(?:[a-z0-9!#$%&\'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])',

    'url' : r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
}

## Load Dataset(s)

In [36]:

pd.set_option('display.max_colwidth', None)
csv.field_size_limit(sys.maxsize)

file_paths = {'/content/drive/MyDrive/COS720 Project/Datasets/CEAS_08.csv',
              '/content/drive/MyDrive/COS720 Project/Datasets/Nigerian_Fraud.csv',
              '/content/drive/MyDrive/COS720 Project/Datasets/Ling.csv'}

def load_email_data(file_path):

  with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
        reader = csv.reader(f, quotechar='"', escapechar='\\')
        headers = next(reader)
        data = []
        for row in reader:
            if len(row) == len(headers):
                data.append(row)
            else:
                print(f"Skipping malformed row: {row}")

        df = pd.DataFrame(data, columns=headers)
        print(f"Loaded data with fallback method: {len(df)} rows")
        return df

## Clean Data

### Extract emails from body

### Converting to lowercase

In [37]:
def to_lowercase(df, text_column='body'):
  df_copy = df.copy()

  df_copy[text_column] = df_copy[text_column].apply(
      lambda x: x.lower() if isinstance(x, str) else x
  )

  return df_copy

### Replacing text with classes

In [38]:
## We replace currency with <CUR>, time with <TIME>, date with <DATE>, phone numbers with <PHONE>, percentages with <PERC>, and other general numbers that do not fall into the previous categories with <NUM>

def replace_data_categories(text):
    modified_text = text

    # Replace each type in order (specific to general)
    modified_text = re.sub(REGEX_PATTERNS['currency'], '<cur>', modified_text) # currency
    modified_text = re.sub(REGEX_PATTERNS['time'], '<time>', modified_text) # times
    modified_text = re.sub(REGEX_PATTERNS['day'], '<day>', modified_text) # times
    modified_text = re.sub(REGEX_PATTERNS['date'], '<date>', modified_text) # dates
    modified_text = re.sub(REGEX_PATTERNS['phone'], '<phone>', modified_text) # phone numbers
    modified_text = re.sub(REGEX_PATTERNS['percentage'], '<perc>', modified_text) # percentages
    modified_text = re.sub(REGEX_PATTERNS['number'], '<num>', modified_text) # general numbers

    return modified_text

### Extract emails data

In [39]:
import math
from urllib.parse import urlparse, parse_qs

class UrlFeaturizer(object):

    def setUrl(self, url):
      self.url = url
      self.domain = url.split('//')[-1].split('/')[0]
      self.errCount = 0

    def entropy(self):
        string = self.url.strip()
        prob = [float(string.count(c)) / len(string) for c in dict.fromkeys(list(string))]
        entropy = -sum([(p * math.log(p) / math.log(2.0)) for p in prob])
        return entropy

    def numDigits(self):
        digits = [i for i in self.url if i.isdigit()]
        return len(digits)

    def urlLength(self):
        return len(self.url)

    def numParameters(self):
        try:
          query = urlparse(self.url).query
          if not query:
              return 0
          params = parse_qs(query)
          return len(params)
        except ValueError:
          self.errCount += 1
          return 0


    def numFragments(self):
        fragments = self.url.split('#')
        return len(fragments) - 1

    def numSubDomains(self):
      try:
        hostname = urlparse(self.url).hostname
        if not hostname:
            return 0
        parts = hostname.split('.')
        if len(parts) <= 2:
            return 0
        return len(parts) - 2
      except ValueError:
        self.errCount += 1
        return 0

    def hasHttp(self):
        return 'http:' in self.url

    def hasHttps(self):
        return 'https:' in self.url

    def run(self):
        data = {}
        data['entropy'] = self.entropy()
        data['numDigits'] = self.numDigits()
        data['urlLength'] = self.urlLength()
        data['numParams'] = self.numParameters()
        data['hasHttp'] = int(self.hasHttp())
        data['hasHttps'] = int(self.hasHttps())
        data['numFragments'] = self.numFragments()
        data['numSubDomains'] = self.numSubDomains()
        data['num_%20'] = self.url.count("%20")
        data['num_@'] = self.url.count("@")


        return data

def process_url_data(url_df):
  featurizer = UrlFeaturizer()
  features_list = []

  for _, row in url_df.iterrows():
      email_id = row['email_id']
      url = row['url']
      label = row['label']

      featurizer.setUrl(url)
      features = featurizer.run()
      features['label'] = label

      features_list.append(features)


  url_features_df = pd.DataFrame(features_list)
  return url_features_df

In [None]:
def extract_urls_from_text(text):
    if not isinstance(text, str):
        return []

    url_pattern = re.compile(REGEX_PATTERNS['url'])
    return url_pattern.findall(text)

def replace_urls_with_tag(text):
    if not isinstance(text, str):
        return text

    url_pattern = re.compile(REGEX_PATTERNS['url'])
    return url_pattern.sub('<url>', text) #  Replace all URLs in text with <URL> tag


def process_email_data(df):

    print(f"Original dataframe shape: {df.shape}")
    print(f"Columns: {df.columns.tolist()}")

    url_data = []

    # id to link URLs back to emails
    df['email_id'] = [str(uuid.uuid4()) for _ in range(len(df))]

    # Extract and replace URLs -- Subject to further processing
    for idx, row in df.iterrows():
        email_id = row['email_id']
        body = row['body']
        label = row['label']

        urls = extract_urls_from_text(body)

        for url in urls:
            url_data.append({
                'email_id': email_id,
                'url': url,
                'label': label
            })

        # Replace urls
        modified_body = replace_urls_with_tag(body)

        # Replace data categories
        modified_body = replace_data_categories(modified_body)

        df.at[idx, 'body'] = modified_body

    df = to_lowercase(df)

    url_df = pd.DataFrame(url_data) # Create URL dataframe

    print(f"Modified dataframe shape: {df.shape}")
    print(f"Columns: {df.columns.tolist()}")
    print(f"URL dataframe shape: {url_df.shape}")

    return df, url_df

data_frames = []

for file_path in file_paths:
     data_frames.append(load_email_data(file_path))

combined_df = pd.concat(data_frames, join='outer', ignore_index=True)
combined_df['label'] = combined_df['label'].astype(int)
combined_df, url_df = process_email_data(combined_df)
url_features_df = process_url_data(url_df)

print(combined_df.shape)

### Splitting data

In [41]:
# Split body into test and train subsets
X_train, X_test, target_train, target_test = train_test_split(combined_df['body'], combined_df['label'], train_size=0.7, random_state=1)

# Split urls int test and train subsets
X = url_features_df.drop(columns=['label'])   # exclude label from features
y = url_features_df['label']
X_train_url, X_test_url, target_train_url, target_test_url = train_test_split(
    X, y, train_size=0.7, random_state=1
)
print(y.dtype)
X_train_url.head()

int64


Unnamed: 0,entropy,numDigits,urlLength,numParams,hasHttp,hasHttps,numFragments,numSubDomains,num_%20,num_@
93698,4.296229,8,62,0,1,0,0,1,0,0
1981,3.534219,0,23,0,1,0,0,1,0,0
114875,4.465167,73,152,6,1,0,0,1,0,0
2338,4.246982,8,65,0,1,0,0,1,0,0
36916,3.943943,0,40,0,1,0,0,1,0,0


### Removing non-word and non special characters

### Tokenization and Vectorization

In [42]:
# Initialize vecotrizer
def initialise_tfidf_vectorizer(data, max_features = 2000):
    vectorizer_tfidf = TfidfVectorizer(stop_words='english', max_features=max_features) # remove stop words
    vectorizer_tfidf.fit(data)
    X = vectorizer_tfidf.transform(data)
    return X, vectorizer_tfidf

X_train_tfidf, vectorizer_tfidf = initialise_tfidf_vectorizer(X_train)

### Lemmatization

### Train/Import and Test Model(s)

In [43]:
from sklearn.neural_network import MLPClassifier
from sklearn import tree
import joblib
from sklearn.tree import DecisionTreeClassifier

MODEL_PATH_CLF = "/content/drive/MyDrive/COS720 Project/Models/body_classifier.joblib"

try:
    clf = joblib.load(MODEL_PATH_CLF)
    print("Loaded existing model from", MODEL_PATH_CLF)
except FileNotFoundError:
    print("Model file not found. Training a new model...")
    clf = MLPClassifier(hidden_layer_sizes=(100,),
                        activation='logistic',
                        solver='adam',
                        max_iter=200,
                        random_state=0)

    clf.fit(X_train_tfidf, target_train)

Loaded existing model from /content/drive/MyDrive/COS720 Project/Models/body_classifier.joblib


#### Train URL model

In [44]:
from sklearn.metrics import classification_report, accuracy_score

MODEL_PATH_URL = "/content/drive/MyDrive/COS720 Project/Models/mlp_url_classifier.joblib"

try:
    mlp = joblib.load(MODEL_PATH_URL)
    print("Loaded existing model from", MODEL_PATH_URL)
except FileNotFoundError:
    print("Model file not found. Training a new URL model...")

    mlp = MLPClassifier(
        hidden_layer_sizes=(256, 128, 64, 32, 16),
        activation='relu',
        solver='adam',
        max_iter=200,
        verbose=True,
        random_state=42
    )
    mlp.fit(X_train_url, target_train_url)

Loaded existing model from /content/drive/MyDrive/COS720 Project/Models/mlp_url_classifier.joblib


In [45]:
joblib.dump(mlp, 'mlp_url_classifier.joblib')

['mlp_url_classifier.joblib']

# Static URL Classification

### Testing

In [47]:
test_file_path = "/content/drive/MyDrive/COS720 Project/Datasets/SpamAssasin.csv"

# Evaluate model using different data file from training
with open(test_file_path, 'r', encoding='utf-8', errors='replace') as f:
  reader = csv.reader(f, quotechar='"', escapechar='\\')
  headers = next(reader)
  data = []
  for row in reader:
      if len(row) == len(headers):
          data.append(row)
  test_df = pd.DataFrame(data, columns=headers)
  test_df['label'] = test_df['label'].astype(int)
  print(f"Loaded data with fallback method: {len(test_df)} rows")

# test_df, test_url_df = process_email_data(test_df)
# test_url_features_df = process_url_data(test_url_df)

y_pred = clf.predict(X_test)
y_test = target_test

# y_test = test_df['label']
# y_pred = clf.predict(test_df['body'])

f1 = f1_score(y_test, y_pred)  # by default, pos_label=1
print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))

Loaded data with fallback method: 5805 rows
Accuracy: 0.9960305792413996

Classification Report:
               precision    recall  f1-score   support

           0       1.00      0.99      1.00      5889
           1       1.00      1.00      1.00      7715

    accuracy                           1.00     13604
   macro avg       1.00      1.00      1.00     13604
weighted avg       1.00      1.00      1.00     13604



In [49]:
print(X_test_url.head())
# x = test_url_features_df.drop(columns=['label'])   # exclude label from features
# y = test_url_features_df['label']

x = X_test_url
y = target_test_url

predictions = mlp.predict(x)

print(y.dtype)
print(y.head())

print("Accuracy:", accuracy_score(y, predictions))
print("\nClassification Report:\n", classification_report(y, predictions))

         entropy  numDigits  urlLength  numParams  hasHttp  hasHttps  \
91301   4.296229          8         62          0        1         0   
118589  4.658370         10         79          2        1         0   
101631  4.691232          8         62          0        1         0   
55263   3.661226          0         26          0        1         0   
131103  3.783465          0         25          0        1         0   

        numFragments  numSubDomains  num_%20  num_@  
91301              0              1        0      0  
118589             0              0        0      0  
101631             0              1        0      0  
55263              0              1        0      0  
131103             0              1        0      0  
int64
91301     1
118589    0
101631    1
55263     0
131103    0
Name: label, dtype: int64
Accuracy: 0.9568209791263721

Classification Report:
               precision    recall  f1-score   support

           0       0.95      0.93      0.9

# Explainability

In [None]:
import subprocess

def custom_tokenizer(text):
    pattern = r"\[[^\]]+\]|<[^>]+>|\w+"
    return re.findall(pattern, text)
try:
    from lime.lime_text import LimeTextExplainer
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "lime"])
    from lime.lime_text import LimeTextExplainer

sample_data = test_df['body'].iloc[5799]
explainer = LimeTextExplainer(class_names=["benign","phishing"], split_expression=custom_tokenizer)
exp = explainer.explain_instance(sample_data, clf.predict_proba, num_features=10, labels=[1])
exp.show_in_notebook()
exp.as_html()