In [None]:
"""
===================================================
 Email Spam Detection Prediction Pipeline
---------------------------------------------------
 Author : Devashish
 Purpose: Deployment-ready inference pipeline
 Notes  : Optimized for API / Microservice integration
===================================================
"""

import os
import re
import joblib
import numpy as np
import pandas as pd
from scipy.sparse import hstack
from nltk.corpus import stopwords
import nltk
from typing import List, Dict, Union

# ===============================
# Initialization (Run Once)
# ===============================
nltk.download('stopwords', quiet=True)
stop_words = set(stopwords.words('english'))

# Paths (relative or configurable via ENV)
MODEL_DIR = "models"

# Load all models once at startup
print("ðŸš€ Loading models and vectorizers...")
nb_model = joblib.load(os.path.join(MODEL_DIR, "nb_model.pkl"))
lr_model = joblib.load(os.path.join(MODEL_DIR, "lr_model.pkl"))
lgb_model = joblib.load(os.path.join(MODEL_DIR, "lgb_model.pkl"))
tfidf = joblib.load(os.path.join(MODEL_DIR, "tfidf_vectorizer.pkl"))
le = joblib.load(os.path.join(MODEL_DIR, "label_encoder.pkl"))

print("âœ… Models loaded successfully!")


# ===============================
# Utility Functions
# ===============================
def clean_text(text: str) -> str:
    """Cleans raw email text for TF-IDF transformation."""
    text = text.lower()
    text = re.sub(r'http\S+|www\S+', '<URL>', text)
    text = re.sub(r'\S+@\S+', '<EMAIL>', text)
    text = re.sub(r'[^a-z0-9\s<>]', '', text)
    text = ' '.join([w for w in text.split() if w not in stop_words])
    return text


def compute_features(df: pd.DataFrame) -> np.ndarray:
    """Compute additional numeric and keyword-based features."""
    df['num_links'] = df['content'].str.count(r'http\S+|www\S+')
    df['num_exclamations'] = df['content'].str.count('!')
    df['num_uppercase_words'] = df['content'].apply(lambda x: sum(1 for w in x.split() if w.isupper()))
    df['text_length'] = df['content'].str.len()
    df['num_special_chars'] = df['content'].str.count(r'[^a-zA-Z0-9\s]')

    spam_words = ['free', 'win', 'click', 'prize', 'buy now']
    for word in spam_words:
        df[f'has_{word.replace(" ", "_")}'] = df['content'].str.contains(word, case=False).astype(int)

    return df[['num_links', 'num_exclamations', 'num_uppercase_words',
               'text_length', 'num_special_chars'] +
              [f'has_{w.replace(" ", "_")}' for w in spam_words]].values


def ensemble_predict(X_combined):
    """Run ensemble prediction (NB + LR + LGBM)."""
    nb_probs = nb_model.predict_proba(X_combined)
    lr_probs = lr_model.predict_proba(X_combined)
    lgb_probs = lgb_model.predict_proba(X_combined)

    ensemble_probs = (0.3 * nb_probs + 0.4 * lr_probs + 0.3 * lgb_probs)
    pred_label = ensemble_probs.argmax(axis=1)
    confidence = ensemble_probs.max(axis=1) * 100

    labels = le.inverse_transform(pred_label)
    return labels, confidence


# ===============================
# Core Prediction Methods
# ===============================
def predict_single(subject: str, body: str) -> Dict[str, Union[str, float]]:
    content = f"{subject} {body}".strip()
    clean_content = clean_text(content)

    X_text = tfidf.transform([clean_content])

    # Handcrafted numeric features
    num_links = len(re.findall(r'http\S+|www\S+', content))
    num_exclamations = content.count('!')
    num_uppercase_words = sum(1 for w in content.split() if w.isupper())
    text_length = len(content)
    num_special_chars = len(re.findall(r'[^a-zA-Z0-9\s]', content))
    spam_words_vals = [int(word in content.lower()) for word in ['free', 'win', 'click', 'prize', 'buy now']]

    X_hand = np.array([[num_links, num_exclamations, num_uppercase_words,
                        text_length, num_special_chars] + spam_words_vals])
    X_combined = hstack([X_text, X_hand])

    label, confidence = ensemble_predict(X_combined)
    return {"label": label[0], "confidence": round(confidence[0], 2)}


def predict_batch(df: pd.DataFrame) -> pd.DataFrame:
    """
    Predict multiple emails from a DataFrame (CSV or API batch input).
    Expected columns: 'subject', 'body'
    """
    required_cols = {'subject', 'body'}
    if not required_cols.issubset(df.columns):
        raise ValueError(f"CSV must contain columns: {required_cols}")

    df['content'] = df['subject'].fillna('') + ' ' + df['body'].fillna('')
    df['clean_content'] = df['content'].apply(clean_text)

    X_text = tfidf.transform(df['clean_content'])
    X_hand = compute_features(df)
    X_combined = hstack([X_text, X_hand])

    labels, confidence = ensemble_predict(X_combined)

    df['predicted_label'] = labels
    df['confidence'] = np.round(confidence, 2)
    return df[['subject', 'predicted_label', 'confidence']]


def predict_from_csv(csv_path: str, output_path: str = "csv_predictions.csv"):
    """
    Read emails from CSV, predict spam/ham, save results.
    """
    print(f"\nðŸ“„ Reading data from: {csv_path}")
    df = pd.read_csv(csv_path)
    results_df = predict_batch(df)
    results_df.to_csv(output_path, index=False)

    spam_count = (results_df["predicted_label"].str.lower() == "spam").sum()
    ham_count = (results_df["predicted_label"].str.lower() == "ham").sum()

    print(f"\nðŸ“Š HAM: {ham_count}")
    print(f"ðŸ“¬ SPAM: {spam_count}")
    print(f"âœ… Saved predictions to: {output_path}")

    return results_df


# ===============================
# Main Entry (Local Testing)
# ===============================
if __name__ == "__main__":
    csv_path = "spam_emails_robust.csv"
    predict_from_csv(csv_path)

ðŸš€ Loading models and vectorizers...
âœ… Models loaded successfully!

ðŸ“„ Reading data from: spam_emails_robust.csv

ðŸ“Š HAM: 0
ðŸ“¬ SPAM: 4255
âœ… Saved predictions to: csv_predictions.csv


