# Spam / Phishing / URL Detection — HD Notebook

This notebook loads your 3 datasets, cleans/merges them, trains models, evaluates with PR-AUC & F2, and writes final.csv.

In [None]:
import os, re, json, warnings, joblib
warnings.filterwarnings("ignore")
import numpy as np, pandas as pd
from sklearn.model_selection import StratifiedKFold, cross_val_predict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.cluster import KMeans
from sklearn.metrics import precision_recall_curve, average_precision_score, precision_recall_fscore_support, confusion_matrix
from urllib.parse import urlparse
from scipy.sparse import hstack

BASE_DIR = "spam_malware_hd_v2"
RAW_DIR  = os.path.join(BASE_DIR, "data", "raw")
PROC_DIR = os.path.join(BASE_DIR, "data", "processed")
MODEL_DIR= os.path.join(BASE_DIR, "models")
APP_DIR  = os.path.join(BASE_DIR, "app")
for d in [RAW_DIR, PROC_DIR, MODEL_DIR, APP_DIR]: os.makedirs(d, exist_ok=True)
print(BASE_DIR, RAW_DIR, PROC_DIR, MODEL_DIR, APP_DIR)

In [None]:
def normalise_label(series):
    return series.map({
        "spam":1,"ham":0,"malicious":1,"benign":0,"phish":1,"legit":0,"legitimate":0,"defacement":1
    }).fillna(series).astype(int)

def load_sms(path):
    df = pd.read_csv(path, encoding="latin-1"); df.columns=[c.lower() for c in df.columns]
    txt = "text" if "text" in df.columns else ("message" if "message" in df.columns else df.select_dtypes("object").columns[0])
    lab = "label" if "label" in df.columns else ("category" if "category" in df.columns else ("spam" if "spam" in df.columns else None))
    if lab is None: raise ValueError("SMS dataset must contain a label/category/spam column")
    df["label"] = normalise_label(df[lab])
    out = df.rename(columns={txt:"text"})[["text","label"]].copy().drop_duplicates(subset=["text"])
    out["source"] = "sms"; return out

def load_emails(path):
    df = pd.read_csv(path); df.columns=[c.lower() for c in df.columns]
    txt = "text" if "text" in df.columns else df.select_dtypes("object").columns[0]
    lab=None
    for c in ["label","spam","is_phish","target","class"]:
        if c in df.columns: lab=c; break
    if lab is None: raise ValueError("Emails dataset missing a label-like column")
    df["label"] = normalise_label(df[lab])
    out = df.rename(columns={txt:"text"})[["text","label"]].dropna()
    out["source"]="phish"; return out

def load_urls(path):
    df = pd.read_csv(path); df.columns=[c.lower() for c in df.columns]
    ucol = "url" if "url" in df.columns else [c for c in df.columns if "url" in c][0]
    lab=None
    for c in ["label","is_malicious","spam","target","class"]:
        if c in df.columns: lab=c; break
    if lab is None: raise ValueError("URLs dataset missing a label-like column")
    df["label"] = normalise_label(df[lab])
    out = df.rename(columns={ucol:"text"})[["text","label"]].dropna()
    out["source"]="url"; return out

In [None]:
sms_df   = load_sms(os.path.join(RAW_DIR, "sms_spam.csv"))
email_df = load_emails(os.path.join(RAW_DIR, "emails.csv"))
url_df   = load_urls(os.path.join(RAW_DIR, "urls.csv"))
print("Loaded:", sms_df.shape, email_df.shape, url_df.shape)

In [None]:
def clean_text(s):
    s = str(s).lower().strip()
    s = re.sub(r"[\r\n\t]+"," ", s)
    s = re.sub(r"\s+"," ", s)
    return s

df = pd.concat([sms_df, email_df, url_df], ignore_index=True)
df["text"] = df["text"].map(clean_text)
final_path = os.path.join(PROC_DIR, "final.csv")
df.to_csv(final_path, index=False)
print("Saved final:", final_path, "shape:", df.shape)
df.head(3)

In [None]:
def evaluate_probabilities(y_true, prob_pos, beta=2.0):
    precision, recall, thresholds = precision_recall_curve(y_true, prob_pos)
    fbeta = (1+beta**2)*(precision*recall)/(beta**2*precision+recall+1e-12)
    best_idx = int(np.nanargmax(fbeta)); best_thr = float(thresholds[max(0, best_idx-1)]) if best_idx>0 else 0.5
    return {"best_threshold": best_thr, "pr_auc": float(average_precision_score(y_true, prob_pos)),
            "best_precision": float(precision[best_idx]), "best_recall": float(recall[best_idx]), "best_fbeta": float(fbeta[best_idx])}

def report_at_threshold(y_true, prob_pos, thr):
    y_pred = (prob_pos >= thr).astype(int)
    p,r,f1,_ = precision_recall_fscore_support(y_true, y_pred, average=None, labels=[1])
    cm = confusion_matrix(y_true, y_pred, labels=[0,1])
    return {"threshold": float(thr), "precision_malicious": float(p[0]), "recall_malicious": float(r[0]),
            "f1_malicious": float(f1[0]), "confusion_matrix[[TN,FP],[FN,TP]]": cm.tolist()}

In [None]:
# Text models with TF-IDF word + char 3-5
text_df = df[df["source"].isin(["sms","phish"])].reset_index(drop=True)
X_text = text_df["text"].values; y_text = text_df["label"].values

tfidf_word = TfidfVectorizer(analyzer="word", ngram_range=(1,2), min_df=1)
tfidf_char = TfidfVectorizer(analyzer="char_wb", ngram_range=(3,5), min_df=1)
Xw = tfidf_word.fit_transform(X_text); Xc = tfidf_char.fit_transform(X_text); Xwc = hstack([Xw, Xc])

logreg = LogisticRegression(max_iter=2000, class_weight="balanced")
svm_cal = CalibratedClassifierCV(LinearSVC(class_weight="balanced"), method="sigmoid", cv=3)

def cv_probs(estimator, X, y, cv=5):
    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
    out = np.zeros_like(y, dtype=float)
    for tr, te in skf.split(X, y):
        model = estimator
        model.fit(X[tr], y[tr])
        out[te] = model.predict_proba(X[te])[:,1]
    return out

prob_log = cv_probs(logreg, Xwc, y_text, cv=5)
prob_svm = cv_probs(svm_cal, Xwc, y_text, cv=5)

best_log = evaluate_probabilities(y_text, prob_log, beta=2.0)
best_svm = evaluate_probabilities(y_text, prob_svm, beta=2.0)
rep_log = report_at_threshold(y_text, prob_log, best_log["best_threshold"])
rep_svm = report_at_threshold(y_text, prob_svm, best_svm["best_threshold"])

best_log, rep_log, best_svm, rep_svm

In [None]:
# URL features + RandomForest
def shannon_entropy(s):
    if not s: return 0.0
    p = np.array([s.count(c) for c in set(s)], dtype=float); p/=p.sum()
    return float(-(p*np.log2(p+1e-12)).sum())

import re
from urllib.parse import urlparse

def url_features(u):
    try:
        p = urlparse(u); host=p.netloc or ""; pathq=(p.path or "")+("?" + p.query if p.query else "")
        full = (p.netloc or "") + (p.path or "") + (p.query or "")
    except:
        host=""; pathq=""; full=str(u)
    return {"len":len(u), "dots":u.count("."), "dashes":u.count("-"), "digits":sum(ch.isdigit() for ch in u),
            "specials":sum(ch in "!@#$%^&*()_+=[]{}|;:'\",<>?/" for ch in u), "entropy":shannon_entropy(full),
            "num_subdomains":host.count("."), "has_ip":int(bool(re.search(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", host))),
            "tld_len":len(host.split(".")[-1]) if "." in host else 0, "path_len":len(pathq)}

url_only = df[df["source"]=="url"].reset_index(drop=True)
from sklearn.ensemble import RandomForestClassifier
X_url = pd.DataFrame([url_features(u) for u in url_only["text"].tolist()])
y_url = url_only["label"].values

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
rf = RandomForestClassifier(n_estimators=400, class_weight="balanced", random_state=42, n_jobs=-1)
prob_rf = cross_val_predict(rf, X_url, y_url, cv=skf, method="predict_proba")[:,1]

best_rf = evaluate_probabilities(y_url, prob_rf, beta=2.0)
rep_rf  = report_at_threshold(y_url, prob_rf, best_rf["best_threshold"])
best_rf, rep_rf

In [None]:
# K-Means clustering (themes)
tfidf_clu = TfidfVectorizer(analyzer="word", ngram_range=(1,2), min_df=1)
Xc = tfidf_clu.fit_transform(text_df["text"].values)
k = min(6, max(2, int(np.sqrt(len(text_df))//2)))
km = KMeans(n_clusters=k, random_state=42, n_init="auto")
labs = km.fit_predict(Xc)
terms = np.array(tfidf_clu.get_feature_names_out())
order = km.cluster_centers_.argsort()[:, ::-1]
{i: terms[order[i,:10]].tolist() for i in range(k)}

In [None]:
# IsolationForest anomaly detection (train on benign)
tfidf_an = TfidfVectorizer(analyzer="char_wb", ngram_range=(3,5), min_df=1)
Xa = tfidf_an.fit_transform(text_df["text"].values); ya = text_df["label"].values
benign_mask = (ya==0)
iso = IsolationForest(n_estimators=400, random_state=42, contamination="auto")
iso.fit(Xa[benign_mask].toarray())
scores = iso.decision_function(Xa.toarray())
prob_like = (scores.min() - scores); prob_like = (prob_like - prob_like.min())/(prob_like.max()-prob_like.min()+1e-12)
best_iso = evaluate_probabilities(ya, prob_like, beta=2.0); rep_iso = report_at_threshold(ya, prob_like, best_iso["best_threshold"])
best_iso, rep_iso

In [None]:
# Fit final SVM on all text and save artifacts + threshold
from scipy.sparse import hstack
tfidf_word_f = TfidfVectorizer(analyzer="word", ngram_range=(1,2), min_df=1)
tfidf_char_f = TfidfVectorizer(analyzer="char_wb", ngram_range=(3,5), min_df=1)
Xw_f = tfidf_word_f.fit_transform(X_text); Xc_f = tfidf_char_f.fit_transform(X_text); Xwc_f = hstack([Xw_f, Xc_f])
svm_final = CalibratedClassifierCV(LinearSVC(class_weight="balanced"), method="sigmoid", cv=3)
svm_final.fit(Xwc_f, y_text)
joblib.dump(tfidf_word_f, os.path.join(MODEL_DIR, "tfidf_word.joblib"))
joblib.dump(tfidf_char_f, os.path.join(MODEL_DIR, "tfidf_char.joblib"))
joblib.dump(svm_final, os.path.join(MODEL_DIR, "svm_calibrated.joblib"))
# Use previously found best_svm threshold from CV
thr = 0.5
try:
    thr = float(best_svm["best_threshold"])
except: pass
with open(os.path.join(MODEL_DIR, "threshold.json"), "w") as f:
    json.dump({"f2_threshold": thr}, f, indent=2)
"Saved artifacts to " + MODEL_DIR

In [None]:
# Write Streamlit app
app_py = "\n".join([
"import os, json, joblib, numpy as np, pandas as pd, streamlit as st",
"from scipy.sparse import hstack",
"from sklearn.feature_extraction.text import TfidfVectorizer",
"from sklearn.svm import LinearSVC",
"from sklearn.calibration import CalibratedClassifierCV",
"BASE_DIR = os.path.dirname(os.path.dirname(__file__))",
"MODEL_DIR = os.path.join(BASE_DIR, 'models')",
"st.set_page_config(page_title='Spam/Malware Detector', layout='centered')",
"st.title('Spam / Phishing / URL Detector (HD Demo)')",
"@st.cache_resource",
"def load_artifacts():",
"    tw = joblib.load(os.path.join(MODEL_DIR, 'tfidf_word.joblib'))",
"    tc = joblib.load(os.path.join(MODEL_DIR, 'tfidf_char.joblib'))",
"    clf = joblib.load(os.path.join(MODEL_DIR, 'svm_calibrated.joblib'))",
"    thr = 0.5",
"    p = os.path.join(MODEL_DIR, 'threshold.json')",
"    if os.path.exists(p):",
"        with open(p,'r') as f: thr = json.load(f).get('f2_threshold', 0.5)",
"    return tw, tc, clf, float(thr)",
"tw, tc, clf, F2_THR = load_artifacts()",
"txt = st.text_area('Paste text or URL', height=160)",
"if st.button('Analyze'):",
"    if not txt.strip():",
"        st.warning('Please paste something')",
"    else:",
"        Xw = tw.transform([txt.strip().lower()])",
"        Xc = tc.transform([txt.strip().lower()])",
"        X = hstack([Xw, Xc])",
"        prob = clf.predict_proba(X)[:,1][0]",
"        label = 'Malicious' if prob >= F2_THR else 'Benign'",
"        st.subheader('Malicious probability: {:.3f} → Label: {} (thr={:.2f})'.format(prob, label, F2_THR))",
])
with open(os.path.join(APP_DIR, "app.py"), "w") as f:
    f.write(app_py)
"App written to " + os.path.join(APP_DIR, "app.py")