In [None]:
!pip install transformers-interpret
!pip install lime
!pip install shap
!pip install stop-words

In [None]:
import string
import nltk
nltk.download('words')
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('omw-1.4')
nltk.download('wordnet')
from nltk.corpus import words
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from stop_words import get_stop_words
from nltk.stem import PorterStemmer
from nltk.stem import SnowballStemmer
from collections import Counter
from bs4 import BeautifulSoup
from nltk.corpus import wordnet

In [None]:
import torch
import pandas as pd
import numpy as np
import tensorflow as tf
import re
import os
import zipfile
import seaborn as sns
import matplotlib.pyplot as plt
import random
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')

from transformers import (
    DistilBertTokenizer,
    DistilBertForSequenceClassification,
    RobertaTokenizer,
    RobertaForSequenceClassification
)

from torch.optim import AdamW

from torch.utils.data import Dataset, TensorDataset, DataLoader, RandomSampler, SequentialSampler
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    classification_report,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from transformers_interpret import SequenceClassificationExplainer
from lime.lime_text import LimeTextExplainer
from IPython.core.display import display, HTML
import gc
from collections import defaultdict

random.seed(42)

In [None]:
df = pd.read_csv('Phishing_Email.csv')
df.head()

In [None]:
df['Label']=df['Email Type'].apply(lambda x: 1 if x=='Phishing Email' else 0)
df = df.drop('Unnamed: 0', axis=1)
df['Email Type'].value_counts()
df.head()

**Data Distribution**

In [None]:
email_type_counts = df['Email Type'].value_counts()

plt.figure(figsize=(5, 5))

colors = ['#66c2a5' if label == 'Safe Email' else '#fc8d62' for label in email_type_counts.index]

plt.pie(email_type_counts, labels=email_type_counts.index,
        autopct=lambda p: f'{p:.1f}% ({int(p * sum(email_type_counts) / 100)})',
        textprops={'fontsize': 12, 'fontweight': 'bold'}, colors=colors)

plt.title('Email Type Distribution', fontsize=16, fontweight='bold')

plt.savefig('email_type_pie_chart.jpg', dpi=600)
plt.show()


**Remove rows with null values**

In [None]:
# Check if null values exist
print(df.isnull().any())
print('-------------------------------')
print(df.isnull().sum())

if df.isnull().values.any():
    df = df.dropna()
    print('After removing null values:')
    print(df.isnull().any())
    print('-------------------------------')
    print(df.isnull().sum())
else:
    print('No null values found.')

print("New Data length: ", len(df))

df['Email Type'].value_counts()

**Preprocessing**

In [None]:
def clean_text(text):
    text = text.lower()
    text = BeautifulSoup(text, 'html.parser').get_text()
    text = re.sub(r'\s+', ' ', text)
    text = text.replace('\r', ' ')
    text = text.strip()
    return text

def preprocess_text(text):
    return clean_text(text)

df['Email_Text'] = df['Email Text'].apply(preprocess_text)
df.head(5)

In [None]:
data = df[['Email_Text', 'Email Type', 'Label']]
data.head(5)

**Synonym Replacement Function**

In [None]:
stop_words = set(stopwords.words('english'))

def synonym_replacement(text, replace_ratio=0.2):
    words = text.split()
    new_words = words[:]

    # Identify indices of non-stopwords
    non_stop_indices = [i for i, word in enumerate(words) if word.lower() not in stop_words]

    # Ensure at least one replacement for short texts
    num_replacements = max(1, int(len(non_stop_indices) * replace_ratio))

    if not non_stop_indices:
        return text

    indices_to_replace = random.sample(non_stop_indices, min(num_replacements, len(non_stop_indices)))

    for idx in indices_to_replace:
        word = words[idx]
        synonyms = wordnet.synsets(word)

        valid_synonyms = [
            lemma.name().replace('_', ' ')
            for syn in synonyms
            for lemma in syn.lemmas()
            if lemma.name().lower() != word.lower()
        ]

        # Replace with a random synonym if available
        if valid_synonyms:
            synonym = random.choice(valid_synonyms)
            new_words[idx] = synonym

    return ' '.join(new_words)

def augment_phishing_data(email_text, replace_ratio=0.2):
    return synonym_replacement(email_text, replace_ratio)

**Split dataset**

In [None]:
# train_val_texts, test_texts, train_val_labels, test_labels = train_test_split(
#     data['Email_Text'],
#     data['Email Type'],
#     test_size=0.2,
#     random_state=42,
#     stratify=data['Email Type']
# )

train_val_texts, test_texts, train_val_labels, test_labels = train_test_split(
    data['Email_Text'],
    data['Email Type'],
    test_size=0.2,
    random_state=42
)

In [None]:
print("After initial split:")
print(f"Train+Val size: {len(train_val_texts)}")
print(f"Test size: {len(test_texts)}")
print("\nTest set distribution:")
print(test_labels.value_counts())

In [None]:
# train_texts, val_texts, train_labels, val_labels = train_test_split(
#     train_val_texts,
#     train_val_labels,
#     test_size=0.2,
#     random_state=42,
#     stratify=train_val_labels
# )
train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_val_texts,
    train_val_labels,
    test_size=0.2,
    random_state=42
)

In [None]:
print("After train-val split:")
print(f"Train size: {len(train_texts)}")
print(f"Validation size: {len(val_texts)}")
print(f"Test size: {len(test_texts)}")

print("\nClass distribution before augmentation:")
print("Training set:")
print(train_labels.value_counts())
print("\nValidation set:")
print(val_labels.value_counts())
print("\nTest set:")
print(test_labels.value_counts())

**Create balanced training set**

In [None]:
train_df = pd.DataFrame({
    'Email_Text': train_texts.reset_index(drop=True),
    'Email Type': train_labels.reset_index(drop=True)
})

phishing_train = train_df[train_df['Email Type'] == 'Phishing Email'].copy()
non_phishing_train = train_df[train_df['Email Type'] == 'Safe Email'].copy()

majority_count = len(non_phishing_train)
minority_count = len(phishing_train)
samples_to_generate = majority_count - minority_count

print(f"Phishing emails in training: {minority_count}")
print(f"Safe emails in training: {majority_count}")
print(f"Samples to generate: {samples_to_generate}")

augmented_samples = []

if samples_to_generate > 0:
    while len(augmented_samples) < samples_to_generate:
        for index, row in phishing_train.iterrows():
            email_text = row['Email_Text']
            augmented_text = augment_phishing_data(email_text)
            augmented_samples.append({
                'Email_Text': augmented_text,
                'Email Type': row['Email Type'],
            })
            if len(augmented_samples) >= samples_to_generate:
                break

    augmented_df = pd.DataFrame(augmented_samples)
    balanced_train_df = pd.concat([train_df, augmented_df], ignore_index=True)
    balanced_train_df = balanced_train_df.sample(frac=1, random_state=42).reset_index(drop=True)
else:
    balanced_train_df = train_df  # Already balanced

print("\nAfter augmentation:")
print("Balanced training set distribution:")
print(balanced_train_df['Email Type'].value_counts())
print(f"Final training set size: {len(balanced_train_df)}")

In [None]:

final_train_texts = balanced_train_df['Email_Text']
final_train_labels = balanced_train_df['Email Type']


val_texts = val_texts.reset_index(drop=True)
val_labels = val_labels.reset_index(drop=True)
test_texts = test_texts.reset_index(drop=True)
test_labels = test_labels.reset_index(drop=True)

print("Final dataset sizes:")
print(f"Training: {len(final_train_texts)}")
print(f"Validation: {len(val_texts)}")
print(f"Test: {len(test_texts)}")

**Load roberta-base Tokenizer and Model**

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=2)

**Tokenizing data**

In [None]:
MAX_LEN = 512
train_encodings = tokenizer(final_train_texts.tolist(), truncation=True, padding=True, max_length=MAX_LEN)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding=True, max_length=MAX_LEN)
test_encodings = tokenizer(test_texts.tolist(), truncation=True, padding=True, max_length=MAX_LEN)

***Creating PyTorch datasets***

In [None]:
train_dataset = TensorDataset(
    torch.tensor(train_encodings['input_ids']),
    torch.tensor(train_encodings['attention_mask']),
    torch.tensor(final_train_labels.map({'Phishing Email': 1, 'Safe Email': 0}).tolist())
)
val_dataset = TensorDataset(
    torch.tensor(val_encodings['input_ids']),
    torch.tensor(val_encodings['attention_mask']),
    torch.tensor(val_labels.map({'Phishing Email': 1, 'Safe Email': 0}).tolist())
)
test_dataset = TensorDataset(
    torch.tensor(test_encodings['input_ids']),
    torch.tensor(test_encodings['attention_mask']),
    torch.tensor(test_labels.map({'Phishing Email': 1, 'Safe Email': 0}).tolist())
)

***Create DataLoaders***

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
optimizer = AdamW(model.parameters(), lr=2e-5)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

**Model training**

In [None]:
epochs = 5
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []
test_losses = []
test_accuracies = []

for epoch in range(epochs):
    # Training phase
    model.train()
    total_loss = 0.0
    correct = 0
    total_samples = 0

    for batch in train_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        optimizer.zero_grad()

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()

        logits = outputs.logits
        predictions = torch.argmax(logits, dim=1)
        correct += (predictions == labels).sum().item()
        total_samples += labels.size(0)

        loss.backward()
        optimizer.step()

    epoch_accuracy = correct / total_samples * 100
    epoch_loss = total_loss / len(train_loader)
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_accuracy)

    print(f"Epoch [{epoch + 1}/{epochs}] - Training Loss: {epoch_loss:.4f}, Training Accuracy: {epoch_accuracy:.2f}%")

    # Validation phase
    model.eval()
    val_correct = 0
    val_total_samples = 0
    val_total_loss = 0.0

    with torch.no_grad():
        for val_batch in val_loader:
            val_input_ids = val_batch[0].to(device)
            val_attention_mask = val_batch[1].to(device)
            val_label = val_batch[2].to(device)

            val_outputs = model(val_input_ids, attention_mask=val_attention_mask, labels=val_label)
            val_loss = val_outputs.loss
            val_total_loss += val_loss.item()

            val_logits = val_outputs.logits
            val_predictions = torch.argmax(val_logits, dim=1)
            val_correct += (val_predictions == val_label).sum().item()
            val_total_samples += val_label.size(0)

    val_epoch_accuracy = val_correct / val_total_samples * 100
    val_epoch_loss = val_total_loss / len(val_loader)
    val_losses.append(val_epoch_loss)
    val_accuracies.append(val_epoch_accuracy)

    print(f"Epoch [{epoch + 1}/{epochs}] - Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_accuracy:.2f}%")

    # Testing phase
    test_correct = 0
    test_total_samples = 0
    test_total_loss = 0.0

    with torch.no_grad():
        for test_batch in test_loader:
            test_input_ids = test_batch[0].to(device)
            test_attention_mask = test_batch[1].to(device)
            test_label = test_batch[2].to(device)

            test_outputs = model(test_input_ids, attention_mask=test_attention_mask, labels=test_label)
            test_loss = test_outputs.loss
            test_total_loss += test_loss.item()

            test_logits = test_outputs.logits
            test_predictions = torch.argmax(test_logits, dim=1)
            test_correct += (test_predictions == test_label).sum().item()
            test_total_samples += test_label.size(0)

    test_accuracy = test_correct / test_total_samples * 100
    test_loss = test_total_loss / len(test_loader)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

    print(f"Epoch [{epoch + 1}/{epochs}] - Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
    print("\n" + "-"*70)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 4))

# Accuracy plots
plt.subplot(1, 2, 1)
plt.plot(range(1, epochs + 1), train_accuracies, label='Training Accuracy')
plt.plot(range(1, epochs + 1), val_accuracies, label='Validation Accuracy')
plt.plot(range(1, epochs + 1), test_accuracies, label='Test Accuracy', linestyle='--')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy over Epochs')
plt.legend()

# Loss plots
plt.subplot(1, 2, 2)
plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, epochs + 1), val_losses, label='Validation Loss')
plt.plot(range(1, epochs + 1), test_losses, label='Test Loss', linestyle='--')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()

plt.tight_layout()
plt.savefig('loss_and_accuracy.jpg', dpi=600)
plt.show()


# **Evaluation Report**

In [None]:
# Evaluation
model.eval()
predictions = []
true_labels = []

for batch in test_loader:
    input_ids = batch[0].to(device)
    attention_mask = batch[1].to(device)
    labels = batch[2].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

    predictions.extend(torch.argmax(logits, dim=1).cpu().detach().numpy())
    true_labels.extend(labels.cpu().detach().numpy())

# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
print(f"Accuracy: {accuracy * 100:.2f}%")


In [None]:
from sklearn.metrics import classification_report

class_report = classification_report(true_labels, predictions)
print("Classification Report:")
print(class_report)

In [None]:
# Confusion Matrix
class_names=['Safe Email', 'Phishing Email']
conf_matrix = confusion_matrix(true_labels, predictions)
plt.figure(figsize=(5, 5))
sns.heatmap(conf_matrix, annot=True, fmt="d", xticklabels=class_names, yticklabels=class_names, cmap="Blues")
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()
plt.savefig('Confusion',dpi=600);

In [None]:
from sklearn.metrics import roc_curve, auc, roc_auc_score
import matplotlib.pyplot as plt

all_probs = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        probs = torch.softmax(outputs.logits, dim=1)  # Convert logits to probabilities

        all_probs.extend(probs[:, 1].cpu().numpy())
        all_labels.extend(labels.cpu().numpy())


**ROC curve and AUC**

In [None]:
fpr, tpr, thresholds = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(6, 4))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'AUC = {roc_auc:.3f}')
plt.plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
# plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')

roc_save_path = 'roc_auc_curve.jpg'
plt.savefig(roc_save_path, dpi=600)
plt.show()

print(f"AUC Score: {roc_auc_score(all_labels, all_probs):.3f}")

# **Explainability Analysis**

In [None]:
model.config.id2label = {0: 'Safe Email', 1: 'Phishing Email'}
model.config.label2id = {'Safe Email': 0, 'Phishing Email': 1}

SELECT TEXT TO INTERPRET

In [None]:
text_index = 3650
text_to_interpret = test_texts.iloc[text_index]
selected_text_label = test_labels.iloc[text_index]
print(f"Text to Interpret: {text_to_interpret}")
print(f"Selected Text Label: {selected_text_label}")

TRANSFORMERS INTERPRET

In [None]:
def interpret_text(text, label, model, tokenizer):
    print(f"Text: {text}")
    print(f"Actual Class: {label}")

    cls_explainer = SequenceClassificationExplainer(model, tokenizer)

    word_attributions = cls_explainer(text)
    print("Word Attributions:", word_attributions)

    visualization_path = 'roberta_viz_1.html'
    cls_explainer.visualize(visualization_path)

    return word_attributions

attributions = interpret_text(text_to_interpret, selected_text_label, model, tokenizer)

LIME EXPLANATION

In [None]:
save_dir = ''
class_names = ['Legitimate Email', 'Phishing Email']
explainer = LimeTextExplainer(class_names=class_names)

def explain_prediction(text):
    def model_predict(texts):
        input_encodings = tokenizer(texts, truncation=True, padding=True, max_length=MAX_LEN, return_tensors='pt')
        input_ids = input_encodings['input_ids'].to(device)
        attention_mask = input_encodings['attention_mask'].to(device)
        model.eval()
        with torch.no_grad():
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
        return logits.cpu().numpy()

    torch.cuda.empty_cache()
    gc.collect()

    explanation = explainer.explain_instance(text, model_predict, num_features=15, top_labels=1)
    predicted_probabilities = model_predict([text])
    prediction_probabilities = {class_names[i]: round(float(predicted_probabilities[0][i]), 2) for i in range(len(class_names))}

    print("Prediction probabilities:")
    print(prediction_probabilities)

    exp_class = explanation.available_labels()[0]
    exp = explanation.as_list(label=exp_class)
    print(f"Explanation for {exp_class}: {exp}")

    display(HTML(explanation.as_html()))

    # Save HTML explanation
    html_path = os.path.join(save_dir, 'lime_explanation_1.html')
    with open(html_path, 'w', encoding='utf-8') as f:
        f.write(explanation.as_html())

    # Save JPG plot
    exp_plot = explanation.as_pyplot_figure(label=exp_class)
    exp_plot.set_figwidth(10)
    exp_plot.set_figheight(6)
    jpg_path = os.path.join(save_dir, 'lime_explanation_1.jpg')
    exp_plot.savefig(jpg_path, bbox_inches='tight', dpi=600, format='jpg')

    return exp

# Get LIME explanation
lime = explain_prediction(text_to_interpret)
# Extract LIME words
lime_words = [word for word, score in lime]

PROCESS TRANSFORMER ATTRIBUTIONS

In [None]:
# Group attribution scores by token
token_scores = defaultdict(list)
for token, score in attributions:
    token_scores[token].append(score)

# Calculate average attribution per token
avg_token_attributions = {
    token: sum(scores) / len(scores)
    for token, scores in token_scores.items()
}

# Create trans_values dictionary
trans_values = {word: avg_token_attributions.get(word, 0.0) for word in lime_words}

print("\nTransformer Attribution Values:")
for word in lime_words:
    value = trans_values[word]
    print(f"{word:<10} -> {value:.5f}")

**NORMALIZATION AND HYBRID SCORING (LITA)**

In [None]:
def normalize_to_minus1_plus1(values):
    min_val = min(values)
    max_val = max(values)
    range_val = max_val - min_val
    if range_val == 0:
        return [0 for _ in values]
    return [((x - min_val) / range_val) * 2 - 1 for x in values]

# Extract lists of scores for lime and trans in the same order
lime_scores = [score for _, score in lime]
trans_scores = [trans_values[word] for word, _ in lime]

# Normalize scores
lime_norm = normalize_to_minus1_plus1(lime_scores)
trans_norm = normalize_to_minus1_plus1(trans_scores)

# Alpha weight
alpha = 0.5

# Calculate hybrid scores
hybrid_scores = [
    (lime[i][0], lime_scores[i], lime_norm[i], trans_scores[i], trans_norm[i], alpha * lime_norm[i] + (1 - alpha) * trans_norm[i])
    for i in range(len(lime))
]

print(f"\n{'Word':<12} {'LIME Score':>10} {'LIME Norm':>10} {'Trans Score':>12} {'Trans Norm':>12} {'Hybrid Score':>14}")
print('-'*80)

for word, lime_s, lime_n, trans_s, trans_n, hybrid in hybrid_scores:
    print(f"{word:<12} {lime_s:10.5f} {lime_n:10.5f} {trans_s:12.5f} {trans_n:12.5f} {hybrid:14.5f}")

In [None]:
hybrid_dict = {word: hybrid for word, _, _, _, _, hybrid in hybrid_scores}

print("\nHybrid Scores Dictionary:")
for word, score in hybrid_dict.items():
    print(f"'{word}': {score:.5f},")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

words = lime_words
hybrid_scores = [hybrid_dict[word] for word in words]

colors = ['green' if score >= 0 else 'red' for score in hybrid_scores]

plt.figure(figsize=(14, 7))
bars = plt.bar(words, hybrid_scores, color=colors)

plt.axhline(0, color='black', linestyle='--', linewidth=0.8)
plt.title("Hybrid Scores (α=0.5)", pad=20, fontweight='bold', fontsize=18)
plt.ylabel("Normalized Score [-1, 1]", labelpad=10, fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right', fontsize=13, fontweight='bold')
plt.yticks(np.arange(-1, 1.25, 0.25), fontsize=12)
plt.ylim(-1, 1)
plt.grid(axis='y', linestyle=':', alpha=0.7)
plt.tight_layout()
plt.savefig("hybrid_phishing_scores2.jpg", dpi=300, format='jpg')
plt.show()