In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import logging
import re
import os

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    roc_curve,
    auc,
    RocCurveDisplay,
    ConfusionMatrixDisplay
)
from IPython.display import display


In [2]:
def log_time(message, start_time):
    elapsed = time.time() - start_time
    print(f"{message} took {elapsed:.4f} seconds")
    return elapsed

def preview_file(file_path, num_lines=5):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            print(f"Previewing first {num_lines} lines of {file_path}:")
            for i, line in enumerate(f, 1):
                if i > num_lines:
                    break
                print(f"Line {i}: {line.strip()}")
    except Exception as e:
        print(f"Error previewing file: {e}")

def clean_dataset(file_path, output_path='data/cleaned_dataset.csv'):
    print(f"Cleaning dataset: {file_path}")
    start = time.time()
    cleaned_lines = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                if not line.strip():
                    continue
                line = re.sub(r'\t+', '\t', line)
                line = line.replace('"', '')
                parts = line.strip().split('\t')
                if len(parts) >= 2:
                    label, text = parts[0], ' '.join(parts[1:])
                    cleaned_lines.append(f"{label}\t{text}")
                else:
                    print(f"Skipping malformed line: {line.strip()}")
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(cleaned_lines))
        print(f"Cleaned dataset saved to {output_path}")
        log_time("Dataset cleaning", start)
        return output_path
    except Exception as e:
        print(f"Error cleaning dataset: {e}")
        raise


In [3]:
def load_and_preprocess_data(file_path='data/dataset.csv'):
    start = time.time()
    preview_file(file_path)
    cleaned_file_path = clean_dataset(file_path)
    try:
        df = pd.read_csv(
            cleaned_file_path,
            sep='\t',
            header=None,
            names=['label', 'text'],
            engine='python',
            encoding='utf-8',
            quoting=3
        )
        df['label'] = df['label'].map({'ham': 'not_spam', 'spam': 'spam'})
        df = df.dropna(subset=['label', 'text'])
        df = df[df['label'].isin(['not_spam', 'spam'])]
        print(f"Loaded {len(df)} valid samples")
        print(f"Class distribution: {df['label'].value_counts().to_dict()}")
        log_time("Data loading and preprocessing", start)
        return df
    except Exception as e:
        print(f"Error loading dataset: {e}")
        raise


In [4]:
def vectorize_text(X_train, X_test):
    start = time.time()
    vectorizer = TfidfVectorizer(
        stop_words='english',
        max_features=20000,
        ngram_range=(1, 3),
        min_df=2
    )
    X_train_vect = vectorizer.fit_transform(X_train).astype(np.float32)
    X_test_vect = vectorizer.transform(X_test).astype(np.float32)
    print(f"Vocabulary size: {len(vectorizer.vocabulary_)}")
    log_time("Text vectorization", start)
    return vectorizer, X_train_vect, X_test_vect


In [5]:
def train_naive_bayes(X_train, y_train):
    start = time.time()
    model = MultinomialNB(alpha=0.5)
    model.fit(X_train, y_train)
    print("Naïve Bayes model trained successfully")
    log_time("Naïve Bayes training", start)
    return model

def train_svm(X_train, y_train):
    start = time.time()
    model = LinearSVC(C=1.0, max_iter=1000, dual=False, class_weight='balanced')
    model.fit(X_train, y_train)
    print("SVM model trained successfully")
    log_time("SVM training", start)
    return model


In [6]:
def evaluate_model(model, model_name, X_test, y_test):
    start = time.time()
    preds = model.predict(X_test)
    accuracy = accuracy_score(y_test, preds)
    report = classification_report(y_test, preds, output_dict=True)
    
    # Compute confusion matrix
    cm = confusion_matrix(y_test, preds, labels=['not_spam', 'spam'])
    cm_df = pd.DataFrame(
        cm,
        index=['Actual: not_spam', 'Actual: spam'],
        columns=['Predicted: not_spam', 'Predicted: spam']
    )
    
    # Display results
    print(f"\n{model_name} Results:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision (spam): {report['spam']['precision']:.4f}")
    print(f"Recall (spam): {report['spam']['recall']:.4f}")
    print(f"F1-score (spam): {report['spam']['f1-score']:.4f}")
    print("\nConfusion Matrix:")
    display(cm_df)
    
    # Plot confusion matrix
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['not_spam', 'spam'])
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f"{model_name} Confusion Matrix")
    plt.show()
    
    # Plot ROC curve
    if hasattr(model, "predict_proba"):
        y_score = model.predict_proba(X_test)[:, 1]
    else:
        y_score = model.decision_function(X_test)
    
    fpr, tpr, _ = roc_curve(y_test.map({'not_spam': 0, 'spam': 1}), y_score)
    roc_auc = auc(fpr, tpr)
    
    plt.figure()
    plt.plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], 'k--', label='No Skill')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{model_name} ROC Curve')
    plt.legend(loc="lower right")
    plt.show()
    
    log_time(f"{model_name} evaluation", start)
    return preds, accuracy, report, cm


In [7]:
def compare_models(nb_accuracy, svm_accuracy, nb_report, svm_report, nb_cm, svm_cm):
    print("\nModel Comparison:")
    print(f"Naïve Bayes Accuracy: {nb_accuracy:.4f}")
    print(f"SVM Accuracy: {svm_accuracy:.4f}")
    print(f"Accuracy Difference (SVM - NB): {svm_accuracy - nb_accuracy:.4f}")
    print("\nSpam Class Metrics Comparison:")
    print(f"{'Metric':<15} {'Naïve Bayes':<15} {'SVM':<15}")
    print(f"{'-'*45}")
    print(f"{'Precision':<15} {nb_report['spam']['precision']:<15.4f} {svm_report['spam']['precision']:<15.4f}")
    print(f"{'Recall':<15} {nb_report['spam']['recall']:<15.4f} {svm_report['spam']['recall']:<15.4f}")
    print(f"{'F1-score':<15} {nb_report['spam']['f1-score']:<15.4f} {svm_report['spam']['f1-score']:<15.4f}")
    
    # Display confusion matrices
    nb_cm_df = pd.DataFrame(
        nb_cm,
        index=['Actual: not_spam', 'Actual: spam'],
        columns=['Predicted: not_spam', 'Predicted: spam']
    )
    svm_cm_df = pd.DataFrame(
        svm_cm,
        index=['Actual: not_spam', 'Actual: spam'],
        columns=['Predicted: not_spam', 'Predicted: spam']
    )
    
    print("\nNaïve Bayes Confusion Matrix:")
    display(nb_cm_df)
    print("\nSVM Confusion Matrix:")
    display(svm_cm_df)


In [8]:

def classify_text(model, model_name, vectorizer, text):
    start = time.time()
    text_vect = vectorizer.transform([text]).astype(np.float32)
    pred = model.predict(text_vect)[0]
    elapsed = log_time(f"{model_name} single text prediction", start)
    return pred, elapsed

def explain_prediction(model, model_name, vectorizer, text):
    start = time.time()
    text_vect = vectorizer.transform([text]).toarray()[0]
    feature_names = vectorizer.get_feature_names_out()
    
    if model_name == "Naïve Bayes":
        spam_probs = np.exp(model.feature_log_prob_[1])
        top_features = sorted(
            [(feature_names[i], spam_probs[i]) for i in np.where(text_vect > 0)[0]],
            key=lambda x: x[1],
            reverse=True
        )[:5]
    else:  # SVM
        weights = model.coef_[0]
        top_features = sorted(
            [(feature_names[i], weights[i]) for i in np.where(text_vect > 0)[0]],
            key=lambda x: abs(x[1]),
            reverse=True
        )[:5]
    
    elapsed = log_time(f"{model_name} prediction explanation", start)
    return top_features, elapsed