<a href="https://colab.research.google.com/github/fsevkli/phish-ai/blob/main/Project_432.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:

# ===============================================================
# Phishing Email Detection with Hybrid Model & Explanations (Colab-ready)
# Caches data/splits/embeddings/models to avoid re-running on Colab.
# ===============================================================

import os
import sys
import re
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Optional: lightweight installs when running in Colab
IN_COLAB = "google.colab" in sys.modules
if IN_COLAB:
    try:
        import IPython
        IPython.get_ipython().run_line_magic(
            "pip",
            "install -q datasets sentence-transformers scikit-learn google-generativeai kagglehub joblib faiss-gpu",
        )
    except Exception as e:
        print("pip install skipped:", e)

from datasets import load_dataset
from sentence_transformers import SentenceTransformer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    brier_score_loss,
    confusion_matrix,
    ConfusionMatrixDisplay,
)
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
import google.generativeai as genai
import kagglehub
import joblib
import torch

plt.rcParams["figure.figsize"] = (10, 4)

# ===============================================================
# 1. CONFIG + GEMINI API SETUP (env-based)
# ===============================================================

#GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")
if GEMINI_API_KEY:
    genai.configure(api_key=GEMINI_API_KEY)
    gemini_model = genai.GenerativeModel("gemini-2.5-flash-lite")
    print("Gemini configured from env")
else:
    gemini_model = None
    print("WARNING: GEMINI_API_KEY not set; explanations will use a fallback string.")

DRIVE_ROOT = Path("/content/drive/MyDrive") if Path("/content/drive").exists() else Path(".")
ARTIFACT_DIR = Path(os.getenv("ARTIFACT_DIR", DRIVE_ROOT / "phishing_ai")).expanduser()
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
VERSION = "v1"

hf_cache = ARTIFACT_DIR / f"hf_clean_{VERSION}.csv"
kaggle_cache = ARTIFACT_DIR / f"kaggle_clean_{VERSION}.csv"
split_cache = ARTIFACT_DIR / f"splits_{VERSION}.npz"
model_path = ARTIFACT_DIR / f"calibrated_model_{VERSION}.joblib"
train_emb_path = ARTIFACT_DIR / f"train_emb_{VERSION}.npy"
val_emb_path = ARTIFACT_DIR / f"val_emb_{VERSION}.npy"
test_emb_path = ARTIFACT_DIR / f"test_emb_{VERSION}.npy"
kaggle_emb_path = ARTIFACT_DIR / f"kaggle_emb_{VERSION}.npy"

# ===============================================================
# 2. DATA LOAD + CLEAN (with caching)
# ===============================================================

def load_hf_dataset():
    if hf_cache.exists():
        return pd.read_csv(hf_cache)
    ds = load_dataset("zefang-liu/phishing-email-dataset")
    df = pd.DataFrame(ds["train"])
    if "Unnamed: 0" in df.columns:
        df = df.drop(columns=["Unnamed: 0"])
    if "Email Text" in df.columns:
        df = df.rename(columns={"Email Text": "email"})
    elif "text" in df.columns:
        df = df.rename(columns={"text": "email"})
    if "Email Type" in df.columns:
        df = df.rename(columns={"Email Type": "label"})
    df["email"] = df["email"].astype(str).fillna("")
    if df["label"].dtype == object:
        df["label"] = df["label"].str.contains("phish", case=False).astype(int)
    df = df[["email", "label"]]
    df.to_csv(hf_cache, index=False)
    return df

def load_kaggle_dataset():
    if kaggle_cache.exists():
        return pd.read_csv(kaggle_cache)
    path = kagglehub.dataset_download("subhajournal/phishingemails")
    csv_files = [f for f in os.listdir(path) if f.lower().endswith(".csv")]
    csv_path = os.path.join(path, csv_files[0])
    raw = pd.read_csv(csv_path)
    if "Email Text" in raw.columns and "Email Type" in raw.columns:
        df = pd.DataFrame({
            "email": raw["Email Text"].astype(str),
            "label": raw["Email Type"].str.contains("phish", case=False).astype(int),
        })
    else:
        text_col, label_col = raw.columns[:2]
        df = pd.DataFrame({
            "email": raw[text_col].astype(str),
            "label": raw[label_col].astype(int),
        })
    df["email"] = df["email"].fillna("")
    df.to_csv(kaggle_cache, index=False)
    return df

df_hf = load_hf_dataset()
df_kaggle = load_kaggle_dataset()
print("HF shape:", df_hf.shape)
print("Kaggle shape:", df_kaggle.shape)

# ===============================================================
# 3. FEATURE ENGINEERING
# ===============================================================

class PhishingFeatureExtractor:
    def __init__(self):
        self.urgency_keywords = [
            "urgent", "immediately", "action required", "verify now",
            "suspended", "expired", "limited time", "act now",
            "confirm", "click here", "final notice", "warning",
            "alert", "attention", "important", "asap",
        ]
        self.cred_keywords = [
            "password", "login", "username", "account", "verify",
            "update payment", "billing", "credit card", "ssn",
            "social security", "bank", "paypal", "gift card",
            "confirm identity", "security question",
        ]

    def extract_all(self, text: str):
        if text is None or (isinstance(text, float) and np.isnan(text)):
          text = ""
        text = str(text)
        t = text.lower()
        urg_hits = [kw for kw in self.urgency_keywords if kw in t]
        cred_hits = [kw for kw in self.cred_keywords if kw in t]
        urls = re.findall(r"https?://[^\s]+", text)
        ip_links = re.findall(r"https?://\d{1,3}(?:\.\d{1,3}){3}", text)
        long_urls = [u for u in urls if len(u) > 50]
        encoded_urls = [u for u in urls if "%" in u]
        sus_tlds = [".tk", ".ml", ".ga", ".cf", ".gq", ".xyz"]
        sus_urls = [u for u in urls for tld in sus_tlds if tld in u.lower()]
        caps_words = re.findall(r"\b[A-Z]{3,}\b", text)
        feats = {
            "urgency_count": len(urg_hits),
            "exclamations": text.count("!"),
            "caps_count": len(caps_words),
            "multi_exclam": len(re.findall(r"!{2,}", text)),
            "cred_count": len(cred_hits),
            "has_form_language": int(bool(re.search(r"enter your|provide your|update your|fill out", t))),
            "url_count": len(urls),
            "ip_url_count": len(ip_links),
            "long_url_count": len(long_urls),
            "encoded_url_count": len(encoded_urls),
            "sus_tld_count": len(sus_urls),
        }
        details = {
            "urgency_hits": urg_hits,
            "cred_hits": cred_hits,
            "urls": urls[:3],
            "ip_links": ip_links,
            "caps_words": caps_words[:5],
        }
        return feats, details

    def transform(self, texts):
        feat_list, details_list = [], []
        for txt in texts:
            f, d = self.extract_all(txt)
            feat_list.append(f)
            details_list.append(d)
        return pd.DataFrame(feat_list), details_list

feature_extractor = PhishingFeatureExtractor()
feat_df_hf, feat_details_hf = feature_extractor.transform(df_hf["email"].values)
feat_df_k, feat_details_k = feature_extractor.transform(df_kaggle["email"].values)

# ===============================================================
# 4. TRAIN/VAL/TEST SPLIT (cached indices)
# ===============================================================

def get_splits(labels):
    if split_cache.exists():
        data = np.load(split_cache, allow_pickle=True)
        return data["train_idx"], data["val_idx"], data["test_idx"]
    idx = np.arange(len(labels))
    train_idx, temp_idx, y_train_temp, y_temp = train_test_split(
        idx, labels, test_size=0.30, stratify=labels, random_state=42
    )
    val_idx, test_idx, _, _ = train_test_split(
        temp_idx, y_temp, test_size=0.50, stratify=y_temp, random_state=42
    )
    np.savez(split_cache, train_idx=train_idx, val_idx=val_idx, test_idx=test_idx)
    return train_idx, val_idx, test_idx

train_idx, val_idx, test_idx = get_splits(df_hf["label"].values)

X_train_email = df_hf["email"].values[train_idx]
X_val_email = df_hf["email"].values[val_idx]
X_test_email = df_hf["email"].values[test_idx]

y_train = df_hf["label"].values[train_idx]
y_val = df_hf["label"].values[val_idx]
y_test = df_hf["label"].values[test_idx]

X_train_feat = feat_df_hf.values[train_idx]
X_val_feat = feat_df_hf.values[val_idx]
X_test_feat = feat_df_hf.values[test_idx]

print("Train/Val/Test sizes:", len(y_train), len(y_val), len(y_test))

# ===============================================================
# 5. MINI LM EMBEDDINGS (cached to disk)
# ===============================================================

device = "cuda" if torch.cuda.is_available() else "cpu"
sentence_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device=device)
print(f"Using device: {device}")

def encode_batch(texts, batch_size=128):
    return sentence_model.encode(
        list(texts),
        batch_size=batch_size,
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=False,
    )

def encode_cached(name, texts, path):
    if path.exists():
        return np.load(path)
    arr = encode_batch(texts)
    np.save(path, arr)
    return arr

X_train_emb = encode_cached("train", X_train_email, train_emb_path)
X_val_emb = encode_cached("val", X_val_email, val_emb_path)
X_test_emb = encode_cached("test", X_test_email, test_emb_path)
X_k_emb = encode_cached("kaggle", df_kaggle["email"].values, kaggle_emb_path)

print("Embedding shapes - Train:", X_train_emb.shape, "Test:", X_test_emb.shape)

# ===============================================================
# 6. TRAIN HYBRID MODEL (with calibration) + LOAD IF AVAILABLE
# ===============================================================

X_train_hybrid = np.hstack([X_train_feat, X_train_emb])
X_test_hybrid = np.hstack([X_test_feat, X_test_emb])
X_k_hybrid = np.hstack([feat_df_k.values, X_k_emb])

if model_path.exists():
    best_model = joblib.load(model_path)
    print("Loaded calibrated model from", model_path)
else:
    print("\n" + "="*60)
    print("TRAINING HYBRID MODEL (Features + MiniLM)")
    print("="*60)
    base_clf = LogisticRegression(max_iter=2000, random_state=42)
    base_clf.fit(X_train_hybrid, y_train)
    y_pred = base_clf.predict(X_test_hybrid)
    y_prob = base_clf.predict_proba(X_test_hybrid)[:, 1]

    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred)
    rec = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    print("\nTest set performance (uncalibrated):")
    print(f"  Accuracy : {acc:.3f} ({acc*100:.1f}%)")
    print(f"  Precision: {prec:.3f}")
    print(f"  Recall   : {rec:.3f}")
    print(f"  F1 Score : {f1:.3f}")
    print(f"  Confusion: TN={cm[0,0]}, FP={cm[0,1]}, FN={cm[1,0]}, TP={cm[1,1]}")

    print("\n" + "="*60)
    print("CALIBRATING MODEL")
    print("="*60)
    calibrated_model = CalibratedClassifierCV(
        LogisticRegression(max_iter=2000, random_state=42),
        cv=5,
        method="sigmoid",
    )
    calibrated_model.fit(X_train_hybrid, y_train)

    y_prob_cal = calibrated_model.predict_proba(X_test_hybrid)[:, 1]
    brier_uncal = brier_score_loss(y_test, y_prob)
    brier_cal = brier_score_loss(y_test, y_prob_cal)
    print(f"Brier Score (uncalibrated): {brier_uncal:.4f}")
    print(f"Brier Score (calibrated)  : {brier_cal:.4f}")

    # Calibration curves
    pt_u, pp_u = calibration_curve(y_test, y_prob, n_bins=10)
    pt_c, pp_c = calibration_curve(y_test, y_prob_cal, n_bins=10)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    ax1.plot(pp_u, pt_u, "o-", label="Uncalibrated", linewidth=2)
    ax1.plot(pp_c, pt_c, "s-", label="Calibrated", linewidth=2)
    ax1.plot([0, 1], [0, 1], "k--", label="Perfect", alpha=0.5)
    ax1.set_xlabel("Mean predicted probability")
    ax1.set_ylabel("Fraction of positives")
    ax1.set_title("Reliability Diagram")
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    ConfusionMatrixDisplay(cm, display_labels=["Benign", "Phishing"]).plot(ax=ax2, cmap="Blues")
    ax2.set_title("Confusion Matrix")
    plt.tight_layout()

    best_model = calibrated_model
    joblib.dump(best_model, model_path)
    print("Saved calibrated model to", model_path)

# ===============================================================
# 7. ROBUSTNESS TEST ON KAGGLE DATASET
# ===============================================================

y_k_pred = best_model.predict(X_k_hybrid)
y_k_prob = best_model.predict_proba(X_k_hybrid)[:, 1]
acc_k = accuracy_score(df_kaggle["label"].values, y_k_pred)
prec_k = precision_score(df_kaggle["label"].values, y_k_pred)
rec_k = recall_score(df_kaggle["label"].values, y_k_pred)
f1_k = f1_score(df_kaggle["label"].values, y_k_pred)

print("\nKaggle dataset performance:")
print(f"  Accuracy : {acc_k:.3f} ({acc_k*100:.1f}%)")
print(f"  Precision: {prec_k:.3f}")
print(f"  Recall   : {rec_k:.3f}")
print(f"  F1 Score : {f1_k:.3f}")

# ===============================================================
# 8. GEMINI EXPLANATIONS + INFERENCE WRAPPERS
# ===============================================================

def gemini_explain(email_text: str, prob_phish: float, details: dict, label: int):
    if gemini_model is None:
        return "Gemini API key not set; skipping LLM explanation."
    risk = "HIGH" if prob_phish > 0.7 else "MODERATE" if prob_phish > 0.4 else "LOW"
    evidence_lines = []
    if details.get("urgency_hits"):
        evidence_lines.append(f"Urgency words: {', '.join(details['urgency_hits'][:3])}")
    if details.get("cred_hits"):
        evidence_lines.append(f"Credential/payment words: {', '.join(details['cred_hits'][:3])}")
    if details.get("urls"):
        evidence_lines.append(f"Links: {', '.join(details['urls'][:2])}")
    if details.get("ip_links"):
        evidence_lines.append("Contains IP-based link(s)")
    if details.get("caps_words"):
        evidence_lines.append(f"ALL CAPS words like {', '.join(details['caps_words'][:3])}")
    if not evidence_lines:
        evidence_lines.append("No obvious phishing indicators detected.")
    prompt = (
        "You are a cybersecurity assistant explaining phishing detections.\n\n"
        f"Email text (truncated):\n\"{email_text[:500]}\"\n\n"
        f"Model decision: {'PHISHING' if label == 1 else 'BENIGN'}\n"
        f"Model probability of phishing: {prob_phish:.2f}\n"
        f"Risk level: {risk}\n\n"
        "Evidence:\n- " + "\n- ".join(evidence_lines) + "\n\n"
        "Write 1-2 short sentences for a non-technical user:\n"
        "1) Summarize the risk with emoji (red/yellow/green).\n"
        "2) Briefly explain why (urgency, credential asks, suspicious links).\n"
        "3) Give one clear recommended action.\n\n"
        "Stay under 80 words."
    )
    try:
        resp = gemini_model.generate_content(prompt)
        return resp.text.strip()
    except Exception as e:
        return f"Could not generate explanation: {e}"


def classify_and_explain(email_text: str):
    f_vec, details = feature_extractor.extract_all(email_text)
    f_arr = np.array(list(f_vec.values()), dtype=float).reshape(1, -1)
    emb_vec = sentence_model.encode([email_text])
    x_hybrid = np.hstack([f_arr, emb_vec])
    prob_phish = best_model.predict_proba(x_hybrid)[0, 1]
    label = int(prob_phish >= 0.5)
    return {
        "label": "phishing" if label == 1 else "benign",
        "prob_phish": prob_phish,
        "risk": "HIGH" if prob_phish > 0.7 else "MODERATE" if prob_phish > 0.4 else "LOW",
        "details": details,
        "explanation": gemini_explain(email_text, prob_phish, details, label),
    }

# ===============================================================
# 9. EXAMPLE EXPLANATIONS ON TEST SET
# ===============================================================

test_df = pd.DataFrame({"email": X_test_email, "label": y_test})
phish_examples = test_df[test_df["label"] == 1].head(3)
benign_examples = test_df[test_df["label"] == 0].head(2)
examples = pd.concat([phish_examples, benign_examples])

for idx, row in examples.iterrows():
    print("\n" + "-" * 60)
    print("EMAIL PREVIEW:\n", row["email"][:400], "...\n")
    res = classify_and_explain(row["email"])
    print(f"True label : {'phishing' if row['label'] == 1 else 'benign'}")
    print(f"Predicted  : {res['label']} (p={res['prob_phish']:.2f}, risk={res['risk']})")
    print(f"\nGemini explanation:\n{res['explanation']}")

print("\nPipeline complete (cached artifacts in", ARTIFACT_DIR, ")")


[31mERROR: Could not find a version that satisfies the requirement faiss-gpu (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for faiss-gpu[0m[31m
[0mGemini configured from env
HF shape: (18650, 2)
Kaggle shape: (18650, 2)
Train/Val/Test sizes: 13055 2797 2798
Using device: cuda
Embedding shapes - Train: (13055, 384) Test: (2798, 384)
Loaded calibrated model from phishing_ai/calibrated_model_v1.joblib

Kaggle dataset performance:
  Accuracy : 0.951 (95.1%)
  Precision: 0.931
  Recall   : 0.946
  F1 Score : 0.938

------------------------------------------------------------
EMAIL PREVIEW:
 a permanent solution to penis enlargement limited offer : increase atleast 3 inches or get your money back ! - - - - > click here to learn more no more offers ...

True label : phishing
Predicted  : phishing (p=0.96, risk=HIGH)

Gemini explanation:
ðŸš© **HIGH RISK**: This email is a phishing attempt designed to trick you. It uses urgency to pressure you into clicking 



True label : phishing
Predicted  : phishing (p=0.90, risk=HIGH)

Gemini explanation:
Could not generate explanation: 403 POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent?%24alt=json%3Benum-encoding%3Dint: Your API key was reported as leaked. Please use another API key.

------------------------------------------------------------
EMAIL PREVIEW:
 re [ 1 ] : ave ! hello my dear ! i am a lovely and lonely lady who is looking for the man who will make me happy and whom i want to feel like in paradise with ! if you want to be my beautiful hero who will save me from this loneliness find me http : / / www . yqryjv 5 iqxc . hellomylove . net / and wake me up with a warm kiss . goodbye . . . . . juliana ...





True label : phishing
Predicted  : phishing (p=0.93, risk=HIGH)

Gemini explanation:
Could not generate explanation: 403 POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent?%24alt=json%3Benum-encoding%3Dint: Your API key was reported as leaked. Please use another API key.

------------------------------------------------------------
EMAIL PREVIEW:
 replacing our people who have left louise - - is it appropriate to replace the people who have left ( steve walton and susan scott lindberg ) with other people that can do the work ? jim ...





True label : benign
Predicted  : benign (p=0.02, risk=LOW)

Gemini explanation:
Could not generate explanation: 403 POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent?%24alt=json%3Benum-encoding%3Dint: Your API key was reported as leaked. Please use another API key.

------------------------------------------------------------
EMAIL PREVIEW:
 From: Matt Kettler > Hmm, I think that Marc, being one of the most active and prolific posters 
> to this list, certainly understands SA much better than most. Certainly Frequency of messages does _not_ let you draw valid conclusions about the
understanding of the issues at hand of the sender.
IMHO it's sometimes even inversely proportional.> Marc is an active and prolific contributor to SA (gee,  ...





True label : benign
Predicted  : benign (p=0.02, risk=LOW)

Gemini explanation:
Could not generate explanation: 403 POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent?%24alt=json%3Benum-encoding%3Dint: Your API key was reported as leaked. Please use another API key.

Pipeline complete (cached artifacts in phishing_ai )


Phase 2


## Deployment + caching (Colab + FAISS)
- Mount Google Drive (e.g., `/content/drive`) and set `ARTIFACT_DIR = Path("/content/drive/MyDrive/phishing_ai")` to persist models, FAISS index, and splits.
- Set `GEMINI_API_KEY` in the env (`%env GEMINI_API_KEY=...`) before running explanation cells; do not hard-code keys.
- Install extras if needed: `pip install -q faiss-gpu joblib google-generativeai` (Colab has CUDA wheels).
- Run the FAISS cell after embeddings are created; it saves `minilm.index` and id maps so reruns can reload instead of rebuilding.
- Header analysis + adversarial tests below can be run independently after training; they do not require dataset reload.
- For Gmail forwarding tests, copy the raw headers from Gmail (Show original) and paste into the `raw_headers` string when calling `classify_email_with_headers`.


### Header / sender analysis (Reply-To, domain similarity, SPF/DMARC)
Use these helpers with pasted raw headers from Gmail/Outlook. They augment the text model with header heuristics for higher-fidelity phishing checks.


In [6]:
# ===============================================================
# HEADER / SENDER ANALYSIS (Reply-To mismatch, domain similarity, SPF/DMARC)
# ===============================================================
import re
import difflib
from email.utils import parseaddr

def _domain_from(addr: str) -> str:
    name, email_addr = parseaddr(addr)
    if "@" in email_addr:
        return email_addr.split("@")[-1].lower().strip()
    return ""

def _similarity(a: str, b: str) -> float:
    a = a.lower().strip()
    b = b.lower().strip()
    if not a or not b:
        return 0.0
    return difflib.SequenceMatcher(None, a, b).ratio()

def analyze_headers(raw_headers: str):
    hdr_map = {}
    for line in raw_headers.splitlines():
        if ":" in line:
            k, v = line.split(":", 1)
            hdr_map[k.strip()] = v.strip()

    from_hdr = hdr_map.get("From", "")
    reply_hdr = hdr_map.get("Reply-To") or hdr_map.get("Reply-to", "")
    return_hdr = hdr_map.get("Return-Path", "")
    auth_results = hdr_map.get("Authentication-Results", "")
    auth_blob = auth_results + "\n" + raw_headers

    from_domain = _domain_from(from_hdr)
    reply_domain = _domain_from(reply_hdr)
    return_domain = _domain_from(return_hdr)

    evidence = []
    risk = 0

    if reply_domain and from_domain and reply_domain != from_domain:
        risk += 1
        sim = _similarity(from_domain, reply_domain)
        evidence.append(f"Reply-To domain {reply_domain} differs from From {from_domain} (sim={sim:.2f})")

    if return_domain and from_domain and return_domain != from_domain:
        risk += 1
        evidence.append(f"Return-Path domain {return_domain} differs from From {from_domain}")

    spf_fail = bool(re.search(r"spf=(fail|softfail|neutral|none|permerror|temperror)", auth_blob, re.I) or re.search(r"Received-SPF:\s*(fail|softfail|neutral|none|permerror|temperror)", auth_blob, re.I))
    spf_pass = bool(re.search(r"spf=pass", auth_blob, re.I) or re.search(r"Received-SPF:\s*pass", auth_blob, re.I))
    dmarc_fail = bool(re.search(r"dmarc=(fail|quarantine|reject)", auth_blob, re.I))
    dmarc_pass = bool(re.search(r"dmarc=pass", auth_blob, re.I))

    if spf_fail:
        risk += 1
        evidence.append("SPF failed or is neutral/none")
    if dmarc_fail:
        risk += 1
        evidence.append("DMARC failed or is in quarantine/reject")

    verdict = "low"
    if risk >= 2:
        verdict = "high"
    elif risk == 1:
        verdict = "medium"

    return {
        "from_domain": from_domain,
        "reply_to_domain": reply_domain,
        "return_path_domain": return_domain,
        "spf_pass": spf_pass,
        "dmarc_pass": dmarc_pass,
        "risk_level": verdict,
        "risk_score": risk,
        "evidence": evidence,
        "auth_results_snippet": auth_blob[:400],
    }

def classify_email_with_headers(email_text: str, raw_headers: str = ""):
    """
    Wrapper for end-to-end inference + header heuristics.
    Paste raw headers from Gmail/Outlook into `raw_headers` for extra signals.
    """
    base = classify_and_explain(email_text)
    hdr = analyze_headers(raw_headers or "")
    combined_prob = min(1.0, base["prob_phish"] + 0.08 * hdr["risk_score"])
    combined_label = "phishing" if combined_prob >= 0.5 else "benign"
    base.update({
        "header_analysis": hdr,
        "combined_prob_phish": combined_prob,
        "combined_label": combined_label,
    })
    return base


### Adversarial low-signal phishing tests
Use these to sanity-check the model on grammatically clean, low-signal phish (no typos, minimal urgency cues).


In [7]:
# ===============================================================
# ADVERSARIAL LOW-SIGNAL PHISHING TESTS
# ===============================================================
adversarial_samples = [
    "Quick heads up: payroll could not process your details. Please confirm your direct deposit info before 5pm today.",
    "We could not verify your Microsoft 365 account. Review the document in the secure portal before access is suspended: http://contoso-verify.com/login",
    "Reminder: HR policy update requires your acknowledgment. Open the attached PDF to keep access active.",
    "Travel receipt pending: upload the attached CSV to clear your card hold.",
    "Security notice: unusual login from a new device. If this was you, ignore; otherwise confirm at https://accounts-security-check.com",
]

print("\nAdversarial spot-checks:")
for txt in adversarial_samples:
    res = classify_email_with_headers(txt)
    print("\n---")
    print(txt)
    print(f"Model: {res['label']} (p={res['prob_phish']:.2f}, risk={res['risk']}) -> combined={res['combined_label']} (p={res['combined_prob_phish']:.2f})")
    print("Header flags:", res["header_analysis"]["evidence"])



Adversarial spot-checks:





---
Quick heads up: payroll could not process your details. Please confirm your direct deposit info before 5pm today.
Model: benign (p=0.35, risk=LOW) -> combined=benign (p=0.35)
Header flags: []





---
We could not verify your Microsoft 365 account. Review the document in the secure portal before access is suspended: http://contoso-verify.com/login
Model: phishing (p=0.90, risk=HIGH) -> combined=phishing (p=0.90)
Header flags: []





---
Reminder: HR policy update requires your acknowledgment. Open the attached PDF to keep access active.
Model: benign (p=0.05, risk=LOW) -> combined=benign (p=0.05)
Header flags: []





---
Travel receipt pending: upload the attached CSV to clear your card hold.
Model: benign (p=0.37, risk=LOW) -> combined=benign (p=0.37)
Header flags: []





---
Security notice: unusual login from a new device. If this was you, ignore; otherwise confirm at https://accounts-security-check.com
Model: phishing (p=0.81, risk=HIGH) -> combined=phishing (p=0.81)
Header flags: []


### FAISS neighbor index (persist to Drive)
Build or load a FAISS index on MiniLM embeddings for fast nearest-neighbor lookups in explanations.


In [None]:
# ===============================================================
# FAISS NEIGHBOR INDEX (build/save/load) - optional
# ===============================================================
try:
    import faiss
    FAISS_AVAILABLE = True
except ImportError:
    FAISS_AVAILABLE = False
    print("Install faiss-gpu to enable FAISS neighbor search and run this cell again.")

from pathlib import Path
import joblib

ARTIFACT_DIR = Path("/content/drive/MyDrive/phishing_ai").expanduser()
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
faiss_index_path = ARTIFACT_DIR / "minilm.index"
train_ids_path = ARTIFACT_DIR / "train_ids.npy"
model_path = ARTIFACT_DIR / "calibrated_model.joblib"

if FAISS_AVAILABLE:
    if faiss_index_path.exists():
        faiss_index = faiss.read_index(str(faiss_index_path))
        print("Loaded FAISS index from", faiss_index_path, "(ntotal=", faiss_index.ntotal, ")")
    elif "X_train_emb" in globals():
        train_emb = X_train_emb.astype("float32").copy()
        faiss.normalize_L2(train_emb)
        faiss_index = faiss.IndexFlatIP(train_emb.shape[1])
        faiss_index.add(train_emb)
        faiss.write_index(faiss_index, str(faiss_index_path))
        np.save(train_ids_path, np.arange(train_emb.shape[0], dtype=np.int32))
        print("FAISS index built and saved to", ARTIFACT_DIR)
    else:
        print("Embeddings not in memory; run the encoding cell before building FAISS.")

def faiss_neighbors(query_texts, k=5):
    if not FAISS_AVAILABLE:
        raise RuntimeError("FAISS not available; install faiss-gpu and rerun the build cell.")
    if "faiss_index" not in globals():
        raise RuntimeError("FAISS index not loaded; build or load it first.")
    q_emb = sentence_model.encode(list(query_texts), convert_to_numpy=True, normalize_embeddings=True)
    scores, idx = faiss_index.search(q_emb.astype("float32"), k)
    return scores, idx

if "best_model" in globals():
    joblib.dump(best_model, model_path)
    print("Saved calibrated model to", model_path)
else:
    print("best_model not in memory; train first to save.")
