# Spam Detection System - Library Imports and Setup

This notebook implements a spam detection system using both traditional machine learning and deep learning approaches.

## Library Imports
- **Standard Libraries**: pandas, numpy for data manipulation
- **Machine Learning**: scikit-learn for traditional ML models and preprocessing
- **Deep Learning**: TensorFlow/Keras for neural network implementation
- **NLP**: NLTK for text processing
- **Visualization**: matplotlib, seaborn for plotting results
- **Model Persistence**: joblib for saving trained models

The code also initializes essential NLTK components and sets the plotting style for consistent visualizations throughout the notebook.

In [2]:
# Standard libraries
import pandas as pd
import numpy as np
import os
import re
from tqdm import tqdm

# Machine Learning libraries
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import ComplementNB
from sklearn.metrics import (
    classification_report, confusion_matrix, f1_score,
    roc_curve, roc_auc_score, precision_recall_curve
)

# Deep Learning libraries
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization

# NLP libraries
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Model saving
import joblib

# Download required NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

# Initialize NLTK tools
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

# Set plotting style
plt.style.use('ggplot')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\ASUS\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\ASUS\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\ASUS\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\ASUS\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


# Data Processing Classes

## EmailAnalyzer Class
This class handles the analysis and generation of synthetic email features:
- Maintains lists of legitimate and spam domains
- Generates synthetic email addresses based on spam/non-spam labels
- Scores email domains for spam likelihood on a 0-1 scale

## TextProcessor Class
Handles text preprocessing and feature extraction:
- Text cleaning and normalization using NLTK
- Stop word removal and lemmatization
- Feature extraction including text length, word count, and word statistics
- TF-IDF transformation for text vectorization

In [3]:
class EmailAnalyzer:
    """Analyzes email domains for spam detection"""
    def __init__(self):
        # Define known legitimate domains
        self.legitimate_domains = [
            'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 
            'company.com', 'business.org', 'university.edu', 'corporate.net'
        ]
        # Define known spam domains
        self.spam_domains = [
            'temp-mail.org', 'fakemail.com', 'spam-domain.com', 
            'disposable.com', 'suspicious.net', 'temp-mail.net'
        ]
    
    def generate_synthetic_emails(self, df):
        """Generate synthetic email addresses based on spam/non-spam label"""
        synthetic_emails = []
        
        for idx, label in enumerate(df.iloc[:, -1]):
            # Generate different usernames and domains based on spam status
            if label == 0:  # Non-spam
                domain = np.random.choice(self.legitimate_domains)
                username = f"user{idx}"
            else:  # Spam
                domain = np.random.choice(self.spam_domains)
                username = f"temp{idx}"
            
            email = f"{username}@{domain}"
            synthetic_emails.append(email)
        
        return synthetic_emails
    
    def analyze_domain(self, email):
        """Score email domain for spam likelihood (0-1 scale)"""
        try:
            domain = email.split('@')[1].lower()
            
            # Direct matches
            if domain in self.spam_domains:
                return 1.0
            if domain in self.legitimate_domains:
                return 0.0
            
            # Check suspicious patterns
            suspicious_patterns = [
                r'temp.*mail', r'disposable', r'fake', r'suspicious'
            ]
            
            for pattern in suspicious_patterns:
                if re.search(pattern, domain):
                    return 0.8
            
            return 0.5  # Unknown domain
            
        except:
            return 0.5  # Invalid email format

class TextProcessor:
    """Processes text content for feature extraction"""
    def __init__(self):
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = stop_words
        self.tfidf = TfidfTransformer()
        self.scaler = StandardScaler()
    
    def process_text(self, text):
        """Clean and normalize text using NLTK"""
        # Convert to lowercase string
        text = str(text).lower()
        
        # Tokenize text
        tokens = word_tokenize(text)
        
        # Remove stopwords and lemmatize
        tokens = [
            self.lemmatizer.lemmatize(token)
            for token in tokens
            if token not in self.stop_words and token.isalnum()
        ]
        
        return ' '.join(tokens)
    
    def extract_text_features(self, text):
        """Extract numerical features from text"""
        text = str(text)
        words = text.split()
        
        return {
            'text_length': len(text),
            'word_count': len(words),
            'avg_word_length': np.mean([len(w) for w in words]) if words else 0,
            'unique_word_ratio': len(set(words)) / len(words) if words else 0,
        }

# Data Loading and Preparation Functions

## Key Functions:
1. `load_and_enhance_data(filepath)`:
   - Loads the dataset
   - Enhances it with synthetic email features
   - Processes text features
   - Combines all features into a final feature set

2. `prepare_data(X, y)`:
   - Handles data splitting and scaling
   - Implements different scaling strategies for different models:
     - MinMaxScaler for Naive Bayes
     - StandardScaler for Neural Networks
   - Calculates class weights for imbalanced data

In [4]:
def load_and_enhance_data(filepath):
    """Load dataset and enhance it with synthetic features"""
    print("Loading dataset...")
    df = pd.read_csv(filepath)
    
    print("\nDataset Statistics:")
    print(f"Total rows: {len(df)}")
    print(f"Total columns: {df.shape[1]}")
    
    # Generate synthetic emails and analyze domains
    email_analyzer = EmailAnalyzer()
    synthetic_emails = email_analyzer.generate_synthetic_emails(df)
    email_scores = [email_analyzer.analyze_domain(email) for email in synthetic_emails]
    
    # Create email features DataFrame
    features_df = pd.DataFrame({
        'domain_score': email_scores
    })
    
    # Process text features
    text_processor = TextProcessor()
    text_features = []
    
    print("\nProcessing text features...")
    for _, row in tqdm(df.iloc[:, 1:-1].iterrows(), total=len(df)):
        # Combine all word frequencies into text
        text = ' '.join(f"{k}:{v}" for k, v in row.items())
        features = text_processor.extract_text_features(text)
        text_features.append(features)
    
    # Convert text features to DataFrame
    text_features_df = pd.DataFrame(text_features)
    
    # Combine all features
    X = pd.concat([
        df.iloc[:, 1:-1],  # Original features
        features_df,       # Email features
        text_features_df   # Text features
    ], axis=1)
    
    y = df.iloc[:, -1]    # Labels
    
    print("\nFinal feature set:")
    print(f"Number of features: {X.shape[1]}")
    print(f"Added features: {list(features_df.columns) + list(text_features_df.columns)}")
    
    return X, y

def prepare_data(X, y):
    """Prepare and scale data for different models"""
    # Calculate class weights for imbalanced data
    n_samples = len(y)
    n_classes = len(np.unique(y))
    class_weights = dict(enumerate(n_samples / (n_classes * np.bincount(y))))
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Store input dimension
    input_dim = X_train.shape[1]
    
    # Scale data differently for different models
    # For Naive Bayes: Use MinMaxScaler (ensures non-negative values)
    minmax_scaler = MinMaxScaler()
    X_train_nb = minmax_scaler.fit_transform(X_train)
    X_test_nb = minmax_scaler.transform(X_test)
    
    # For Neural Networks: Use StandardScaler
    standard_scaler = StandardScaler()
    X_train_nn = standard_scaler.fit_transform(X_train)
    X_test_nn = standard_scaler.transform(X_test)
    
    print("\nData Split Information:")
    print(f"Training set size: {len(X_train)}")
    print(f"Test set size: {len(X_test)}")
    print(f"Class weights: {class_weights}")
    print(f"Input dimension: {input_dim}")
    
    return {
        'nb': (X_train_nb, X_test_nb, y_train, y_test),
        'nn': (X_train_nn, X_test_nn, y_train, y_test),
        'input_dim': input_dim
    }, class_weights

# Model Evaluation Framework

## ModelEvaluator Class
Comprehensive evaluation system that:
- Generates confusion matrices
- Plots ROC curves and calculates AUC scores
- Creates precision-recall curves
- Saves evaluation metrics and visualizations

## NaiveBayesSpam Class
Implementation of the Naive Bayes classifier:
- Uses ComplementNB for better handling of imbalanced data
- Includes cross-validation
- Saves model and evaluation results

In [5]:
class ModelEvaluator:
    """Handles model evaluation and visualization"""
    def __init__(self, model_name):
        self.model_name = model_name
    
    def evaluate_and_plot(self, y_true, y_pred, y_pred_proba=None):
        """Generate comprehensive evaluation metrics and plots"""
        # Calculate metrics
        conf_matrix = confusion_matrix(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)
        
        # Save plots to results folder
        os.makedirs('results', exist_ok=True)
        
        # Plot confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                   xticklabels=['Non-Spam', 'Spam'],
                   yticklabels=['Non-Spam', 'Spam'])
        plt.title(f'Confusion Matrix - {self.model_name}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.savefig(f'results/confusion_matrix_{self.model_name.lower().replace(" ", "_")}.png')
        plt.close()
        
        if y_pred_proba is not None:
            # ROC Curve
            fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
            auc = roc_auc_score(y_true, y_pred_proba)
            
            plt.figure(figsize=(10, 8))
            plt.plot(fpr, tpr, label=f'ROC curve (AUC = {auc:.3f})')
            plt.plot([0, 1], [0, 1], 'k--', label='Random')
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title(f'ROC Curve - {self.model_name}')
            plt.legend()
            plt.savefig(f'results/roc_curve_{self.model_name.lower().replace(" ", "_")}.png')
            plt.close()
            
            # Precision-Recall Curve
            precision, recall, _ = precision_recall_curve(y_true, y_pred_proba)
            plt.figure(figsize=(10, 8))
            plt.plot(recall, precision)
            plt.xlabel('Recall')
            plt.ylabel('Precision')
            plt.title(f'Precision-Recall Curve - {self.model_name}')
            plt.savefig(f'results/pr_curve_{self.model_name.lower().replace(" ", "_")}.png')
            plt.close()
        
        # Save classification report
        with open(f'results/report_{self.model_name.lower().replace(" ", "_")}.txt', 'w') as f:
            f.write(f"Classification Report - {self.model_name}:\n")
            f.write(classification_report(y_true, y_pred))
        
        return {
            'confusion_matrix': conf_matrix,
            'f1_score': f1,
            'auc_score': auc if y_pred_proba is not None else None
        }

class NaiveBayesSpam:
    """Naive Bayes model for spam detection"""
    def __init__(self, class_weights=None):
        self.model = ComplementNB(alpha=0.5)
        self.evaluator = ModelEvaluator('Naive Bayes')
        self.class_weights = class_weights
    
    def train_and_evaluate(self, X_train, X_test, y_train, y_test):
        # Cross-validation
        cv_scores = cross_val_score(self.model, X_train, y_train, cv=5, scoring='f1')
        
        # Save cross-validation results
        with open('results/nb_cv_results.txt', 'w') as f:
            f.write("Cross-validation Results:\n")
            f.write(f"F1 scores: {cv_scores}\n")
            f.write(f"Average F1: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
        
        # Train model
        self.model.fit(X_train, y_train)
        
        # Save model
        joblib.dump(self.model, 'results/naive_bayes_model.joblib')
        
        # Predictions
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        
        return self.evaluator.evaluate_and_plot(y_test, y_pred, y_pred_proba)

# Two-Layer MLP Implementation

## TwoLayerMLP Class
Neural network architecture with:
- Two hidden layers (128 and 64 units)
- Batch normalization and dropout for regularization
- Binary classification output
- Comprehensive training pipeline including:
  - Early stopping
  - Learning rate reduction
  - Model checkpointing
  - Training history visualization

In [6]:
class TwoLayerMLP:
    """Two-layer MLP for spam detection"""
    def __init__(self, input_dim):
        # Initialize sequential model with exactly two layers as per requirements
        self.model = Sequential([
            # First layer
            Dense(128, activation='relu', input_shape=(input_dim,)),
            BatchNormalization(),
            Dropout(0.3),
            
            # Second layer
            Dense(64, activation='relu'),
            BatchNormalization(),
            Dropout(0.2),
            
            # Output layer (not counted as a layer since it's just the prediction layer)
            Dense(1, activation='sigmoid')
        ])
        self.evaluator = ModelEvaluator('Two-Layer MLP')
    
    def train_and_evaluate(self, X_train, X_test, y_train, y_test):
        # Convert to numpy arrays if needed
        if hasattr(X_train, 'to_numpy'):
            X_train = X_train.to_numpy()
        if hasattr(X_test, 'to_numpy'):
            X_test = X_test.to_numpy()
        if hasattr(y_train, 'to_numpy'):
            y_train = y_train.to_numpy()
        if hasattr(y_test, 'to_numpy'):
            y_test = y_test.to_numpy()
        
        # Compile model
        self.model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy', 'AUC']
        )
        
        # Training callbacks
        callbacks = [
            # Early stopping to prevent overfitting
            tf.keras.callbacks.EarlyStopping(
                monitor='val_loss', 
                patience=5, 
                restore_best_weights=True
            ),
            # Reduce learning rate when training plateaus
            tf.keras.callbacks.ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=3,
                min_lr=1e-6
            ),
            # Save best model during training
            tf.keras.callbacks.ModelCheckpoint(
                'results/best_mlp_model.h5',
                monitor='val_loss',
                save_best_only=True
            )
        ]
        
        # Create results directory if it doesn't exist
        os.makedirs('results', exist_ok=True)
        
        # Train model
        history = self.model.fit(
            X_train, y_train,
            epochs=50,
            batch_size=32,
            validation_split=0.2,
            callbacks=callbacks,
            verbose=1
        )
        
        # Save training history plot
        plt.figure(figsize=(10, 6))
        plt.plot(history.history['loss'], label='Training Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.title('Two-Layer MLP Training History')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.savefig('results/mlp_training_history.png')
        plt.close()
        
        # Save training metrics
        metrics_df = pd.DataFrame(history.history)
        metrics_df.to_csv('results/mlp_training_metrics.csv')
        
        # Generate predictions
        y_pred = (self.model.predict(X_test) > 0.5).astype(int)
        y_pred_proba = self.model.predict(X_test).ravel()
        
        # Save model architecture
        with open('results/mlp_architecture.txt', 'w') as f:
            self.model.summary(print_fn=lambda x: f.write(x + '\n'))
        
        # Evaluate model and save results
        evaluation_results = self.evaluator.evaluate_and_plot(y_test, y_pred, y_pred_proba)
        
        # Save model performance metrics
        with open('results/mlp_performance.txt', 'w') as f:
            f.write("Two-Layer MLP Performance Metrics\n")
            f.write("================================\n")
            f.write(f"F1 Score: {evaluation_results['f1_score']:.4f}\n")
            f.write(f"AUC Score: {evaluation_results['auc_score']:.4f}\n")
            f.write("\nConfusion Matrix:\n")
            f.write(str(evaluation_results['confusion_matrix']))
        
        return evaluation_results

# Main Execution Pipeline

## Key Components:
1. `save_final_report()`: Generates comprehensive comparison report
2. `main()`: Orchestrates the entire training and evaluation process:
   - Data loading and preparation
   - Model training and evaluation
   - Results compilation and saving
   
The pipeline creates a structured output in the 'results' directory containing:
- Model performance metrics
- Visualization plots
- Trained models
- Detailed comparison reports

In [7]:
def save_final_report(results):
    """Generate and save final comparison report"""
    os.makedirs('results', exist_ok=True)
    
    with open('results/final_comparison_report.txt', 'w') as f:
        f.write("SPAM DETECTION SYSTEM - FINAL RESULTS\n")
        f.write("=" * 50 + "\n\n")
        
        # Compare model performances
        f.write("MODEL COMPARISON\n")
        f.write("-" * 20 + "\n\n")
        
        for model_name, metrics in results.items():
            f.write(f"\n{model_name} Results:\n")
            f.write(f"F1 Score: {metrics['f1_score']:.4f}\n")
            if metrics['auc_score'] is not None:
                f.write(f"AUC Score: {metrics['auc_score']:.4f}\n")
            
            # Add confusion matrix
            f.write("\nConfusion Matrix:\n")
            f.write(str(metrics['confusion_matrix']))
            f.write("\n" + "-" * 40 + "\n")
        
        # Add timestamp and summary
        f.write(f"\nReport generated at: {pd.Timestamp.now()}")

def main():
    """Main execution function"""
    # Create results directory
    os.makedirs('results', exist_ok=True)
    
    # Load and prepare data
    print("Step 1: Loading and preparing data...")
    X, y = load_and_enhance_data('data/emails.csv')
    scaled_data, class_weights = prepare_data(X, y)
    
    # Store results for each model
    results = {}
    
    # Train and evaluate Naive Bayes
    print("\nStep 2: Training Naive Bayes Model...")
    nb_model = NaiveBayesSpam(class_weights=class_weights)
    X_train_nb, X_test_nb, y_train_nb, y_test_nb = scaled_data['nb']
    results['Naive Bayes'] = nb_model.train_and_evaluate(
        X_train_nb, X_test_nb, y_train_nb, y_test_nb
    )
    
    # Train and evaluate Two-Layer MLP
    print("\nStep 3: Training Two-Layer MLP Model...")
    input_dim = scaled_data['input_dim']
    mlp_model = TwoLayerMLP(input_dim)
    X_train_nn, X_test_nn, y_train_nn, y_test_nn = scaled_data['nn']
    results['Two-Layer MLP'] = mlp_model.train_and_evaluate(
        X_train_nn, X_test_nn, y_train_nn, y_test_nn
    )
    
    # Generate final report
    print("\nStep 4: Generating final report...")
    save_final_report(results)
    
    print("\nTraining complete! Results saved in 'results' directory.")
    return results

# Execute main function if running as script
if __name__ == "__main__":
    results = main()

Step 1: Loading and preparing data...
Loading dataset...

Dataset Statistics:
Total rows: 5172
Total columns: 3002

Processing text features...


100%|██████████| 5172/5172 [00:10<00:00, 471.87it/s]



Final feature set:
Number of features: 3005
Added features: ['domain_score', 'text_length', 'word_count', 'avg_word_length', 'unique_word_ratio']

Data Split Information:
Training set size: 4137
Test set size: 1035
Class weights: {0: 0.704248366013072, 1: 1.724}
Input dimension: 3005

Step 2: Training Naive Bayes Model...

Step 3: Training Two-Layer MLP Model...
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50

Step 4: Generating final report...

Training complete! Results saved in 'results' directory.
