# End-to-End Phishing URL/Email Detection (Colab-Ready)

This notebook walks you through an end-to-end pipeline: data loading, preprocessing, feature engineering, feature selection, model training and tuning, evaluation, explainability, export of a reusable inference pipeline, and a Streamlit UI that loads the saved model without retraining.

- You can run this as-is in Google Colab. Where relevant, Colab-specific guidance is provided.
- Artifacts are saved to the `models/` and `models/reports/` folders in the repository.
- The exported pipeline can be used directly by the included Streamlit app.



## 1) Set Up Environment and Install Dependencies

- Installs and pins the core packages for reproducibility: scikit-learn, imbalanced-learn, xgboost, shap, seaborn, matplotlib, tldextract, urllib3
- Works in Colab. If running locally, you could alternatively use `pip install -r requirements.txt`.


In [None]:
# If running in Colab, uncomment the next line to upgrade pip first for wheel compatibility
# !pip install -qU pip

# Install core libraries (safe to run multiple times)
packages = [
    'numpy>=1.23,<2.0',
    'pandas>=1.5',
    'scikit-learn>=1.2',
    'imbalanced-learn>=0.10',
    'xgboost>=1.7',
    'shap>=0.43',
    'matplotlib>=3.7',
    'seaborn>=0.12',
    'tldextract>=5.1',
    'urllib3>=2.0',
    'joblib>=1.3'
]

import sys, subprocess
for p in packages:
    try:
        __import__(p.split('>=')[0].split('<')[0].split('=')[0])
    except Exception:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', p])

import numpy as np, pandas as pd, sklearn, imblearn, xgboost, shap, matplotlib, seaborn as sns, tldextract, urllib3, joblib
print('Python', sys.version)
print('numpy', np.__version__)
print('pandas', pd.__version__)
print('scikit-learn', sklearn.__version__)
print('imbalanced-learn', imblearn.__version__)
print('xgboost', xgboost.__version__)
print('shap', shap.__version__)
print('matplotlib', matplotlib.__version__)
print('seaborn', sns.__version__)
print('tldextract', tldextract.__version__)
print('urllib3', urllib3.__version__)
print('joblib', joblib.__version__)


## 2) Import Libraries and Set Global Config

Set a global seed and plotting style. Centralize constants here.

In [None]:
import os, re, math, time, random, json, hashlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from urllib.parse import urlparse
import tldextract

SEED = 42
np.random.seed(SEED)
random.seed(SEED)

sns.set(style='whitegrid', palette='muted')
plt.rcParams['figure.figsize'] = (8,5)

import warnings
warnings.filterwarnings('ignore')

print('SEED =', SEED)


## 3) Mount Google Drive (optional) and Define Paths

If running in Colab, mount your Drive to persist artifacts. Otherwise, artifacts will be saved under the repo `models/`.

In [None]:
print(r'''# Windows PowerShell
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -r requirements.txt
streamlit run streamlit_app/app.py
''')

### Run the Streamlit app locally (no retraining)

Run these commands in a local PowerShell terminal from the repo root:

In [None]:
from src import predict as pred

model, scaler, feature_columns = pred.load_artifacts(models_dir=MODELS_DIR)

demo_url = 'http://secure-login-update.example.com/verify?acc=123'
demo_subject = 'URGENT: Verify your account now'
demo_body = 'Dear user, your account will be suspended. Click here to verify: http://bit.ly/verify-now'

pred_label_url, prob_url, feats_url = pred.predict_url(demo_url, model, scaler, feature_columns)
pred_label_email, prob_email, feats_email = pred.predict_email(demo_subject, demo_body, model, scaler, feature_columns)

print('URL prediction -> label:', pred_label_url, 'prob:', prob_url)
print('Email prediction -> label:', pred_label_email, 'prob:', prob_email)

### Quick prediction demo using the saved artifacts

In [None]:
X, y = tm.split_features_target(combined_df)
best_name, best_model, scaler, reports, feature_columns = tm.train_and_evaluate(X, y, random_state=SEED)
tm.save_artifacts(best_name, best_model, scaler, reports, feature_columns, out_dir=MODELS_DIR)

print('Artifacts:')
print(' - model:', os.path.join(MODELS_DIR, 'phishing_detector_model.pkl'))
print(' - scaler:', os.path.join(MODELS_DIR, 'scaler.pkl'))
print(' - feature columns:', os.path.join(MODELS_DIR, 'feature_columns.json'))
print(' - metrics:', os.path.join(MODELS_DIR, 'metrics.json'))
print(' - reports dir:', REPORTS_DIR)

### Train and save artifacts (model, scaler, feature columns, metrics)

In [None]:
# Ensure we can import from the repo's src package
import sys
if ROOT not in sys.path:
    sys.path.insert(0, ROOT)

from src import data_preprocessing as dp
from src import train_models as tm

# Auto-detect your datasets in data/ folder (override if needed)
import glob, re

def pick_csv(patterns):
    files = glob.glob(os.path.join(DATA_DIR, '*.csv'))
    for f in files:
        name = os.path.basename(f).lower()
        if any(re.search(p, name) for p in patterns):
            return f
    return None

url_csv = pick_csv([r'url', r'web', r'site', r'domain'])
email_csv = pick_csv([r'email', r'mail', r'subject', r'body'])
print('Detected URL CSV:', url_csv)
print('Detected Email CSV:', email_csv)

assert url_csv and email_csv, "Couldn't auto-detect both datasets. Set url_csv/email_csv manually to paths in data/."

# Run feature extraction and save unified cleaned dataset
url_df = dp.load_url_dataset(url_csv)
email_df = dp.load_email_dataset(email_csv)
combined_df = pd.concat([url_df, email_df], ignore_index=True).fillna(0)
clean_path = os.path.join(DATA_DIR, 'cleaned_phishing_dataset.csv')
combined_df.to_csv(clean_path, index=False)
print('Saved cleaned dataset ->', clean_path, combined_df.shape)
combined_df.head(3)

## A) Train using your datasets in `data/` (Repo scripts)

This path uses the repository's feature engineering and training scripts, producing `model.pkl`, `scaler.pkl`, and `feature_columns.json` that the included Streamlit app can use directly without retraining.

In [None]:
# Detect Colab
try:
    import google.colab  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False

if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    ROOT = '/content'
else:
    ROOT = '.'

DATA_DIR = os.path.join(ROOT, 'data')
MODELS_DIR = os.path.join(ROOT, 'models')
REPORTS_DIR = os.path.join(MODELS_DIR, 'reports')

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(REPORTS_DIR, exist_ok=True)

PIPELINE_PATH = os.path.join(MODELS_DIR, 'phishing_pipeline.joblib')
METRICS_JSON = os.path.join(MODELS_DIR, 'metrics.json')
METRICS_CSV = os.path.join(REPORTS_DIR, 'metrics_table.csv')
ROC_PNG = os.path.join(REPORTS_DIR, 'roc_curve.png')
PR_PNG = os.path.join(REPORTS_DIR, 'pr_curve.png')
CM_PNG = os.path.join(REPORTS_DIR, 'confusion_matrix.png')
SHAP_SUMMARY_PNG = os.path.join(REPORTS_DIR, 'shap_summary.png')

print('Artifacts will be saved to:')
print('MODELS_DIR =', MODELS_DIR)
print('REPORTS_DIR =', REPORTS_DIR)


## 4) Load Dataset (URL download or Colab upload)

Pick one of the two options:
- Download a sample dataset programmatically
- Upload your own CSV(s) via the UI in Colab

In [None]:
import io
SAMPLE_URL = 'https://raw.githubusercontent.com/jaimeps/URL-Classifier/master/url_spam.csv'  # simple demo dataset

# Option A: Download sample dataset
try:
    df = pd.read_csv(SAMPLE_URL)
    # Try to normalize expected columns
    possible_url_cols = ['url', 'URL', 'text', 'Text']
    possible_label_cols = ['label', 'Label', 'is_phishing', 'phishing', 'class']
    url_col = next((c for c in possible_url_cols if c in df.columns), None)
    label_col = next((c for c in possible_label_cols if c in df.columns), None)
    if url_col is None or label_col is None:
        raise ValueError('Could not find URL/label columns in sample dataset.')
    df = df[[url_col, label_col]].rename(columns={url_col:'url', label_col:'label'})
    print('Loaded sample dataset:', df.shape)
except Exception as e:
    print('Sample download failed or schema mismatch:', e)
    df = None

# Option B: Upload via Colab
if IN_COLAB and df is None:
    from google.colab import files
    uploaded = files.upload()
    for name, content in uploaded.items():
        tmp = pd.read_csv(io.BytesIO(content))
        df = tmp.copy()
        break
    print('Uploaded dataset:', df.shape if df is not None else None)

assert df is not None, 'No dataset available. Please re-run one of the options above.'
df.head(3)

## 5) Quick Data Audit and Schema Validation

In [None]:
print('Shape:', df.shape)
print('Columns:', df.columns.tolist())
print('Nulls:\n', df.isna().sum())
print('Duplicates:', df.duplicated().sum())

# Ensure required columns exist
assert 'url' in df.columns, 'Expected column "url" not found.'
assert 'label' in df.columns, 'Expected column "label" not found.'

# Show class balance
print('Class balance:\n', df['label'].value_counts(dropna=False))
df.tail(3)

## 6) Clean and Normalize Data

In [None]:
def normalize_label(x):
    if pd.isna(x):
        return np.nan
    s = str(x).strip().lower()
    if s in {'1', 'phishing', 'spam', 'malicious', 'bad'}:
        return 1
    if s in {'0', 'legit', 'benign', 'good', 'ham'}:
        return 0
    # try to coerce to int
    try:
        return int(float(s))
    except Exception:
        return np.nan

# Basic cleaning
before = len(df)
df['url'] = df['url'].astype(str).str.strip()
df['label'] = df['label'].map(normalize_label)

# Drop invalids
df = df.dropna(subset=['url', 'label'])
df = df.drop_duplicates()

# Optional cap of extreme URL length (rarely needed)
MAX_URL_LEN = 2048
df = df[df['url'].str.len() <= MAX_URL_LEN]

print(f'Removed {before - len(df)} rows. Final shape: {df.shape}')
print('Class balance after clean:\n', df['label'].value_counts())
df.head(3)

## 7) Train/Validation/Test Split with Stratification

In [None]:
from sklearn.model_selection import train_test_split

X_all = df[['url']].copy()
y_all = df['label'].astype(int).copy()

X_train, X_tmp, y_train, y_tmp = train_test_split(
    X_all, y_all, test_size=0.3, random_state=SEED, stratify=y_all
)
X_val, X_test, y_val, y_test = train_test_split(
    X_tmp, y_tmp, test_size=0.5, random_state=SEED, stratify=y_tmp
)

print('Train/Val/Test sizes:', len(X_train), len(X_val), len(X_test))

## 8) URL Feature Engineering (lexical and heuristic signals)

Implement robust feature functions and a vectorized transformer.

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import FunctionTransformer

suspicious_tokens = ['login','verify','update','secure','account','bank','click','confirm','password']

_ip_regex = re.compile(r'^(?:http[s]?://)?(?:\d{1,3}\.){3}\d{1,3}')

def url_heuristics(u: str) -> dict:
    try:
        s = str(u).strip()
        parsed = urlparse(s)
        ext = tldextract.extract(s)
        host = parsed.hostname or ''
        path = parsed.path or ''
        query = parsed.query or ''
        scheme = parsed.scheme or ''
        domain = ext.domain or ''
        suffix = ext.suffix or ''
        subdomain = ext.subdomain or ''
        features = {}
        features['url_len'] = len(s)
        features['host_len'] = len(host)
        features['path_len'] = len(path)
        features['query_len'] = len(query)
        features['num_digits'] = sum(ch.isdigit() for ch in s)
        features['num_dots'] = s.count('.')
        features['num_hyphen'] = s.count('-')
        features['num_slash'] = s.count('/')
        features['has_at'] = int('@' in s)
        features['has_double_slash'] = int('//' in s[8:])
        features['has_https_token'] = int('https' in path)
        features['is_ip_url'] = int(bool(_ip_regex.match(s)))
        features['subdomain_count'] = len([p for p in subdomain.split('.') if p])
        features['domain_len'] = len(domain)
        features['suffix_len'] = len(suffix)
        features['scheme_https'] = int(scheme == 'https')
        features['suspect_keyword_count'] = sum(tok in s.lower() for tok in suspicious_tokens)
        # entropy
        probs = [s.count(c)/len(s) for c in set(s)] if s else [1]
        features['url_entropy'] = -sum(p*math.log(p+1e-12) for p in probs)
        return features
    except Exception:
        return {'url_len':0,'host_len':0,'path_len':0,'query_len':0,'num_digits':0,'num_dots':0,
                'num_hyphen':0,'num_slash':0,'has_at':0,'has_double_slash':0,'has_https_token':0,
                'is_ip_url':0,'subdomain_count':0,'domain_len':0,'suffix_len':0,'scheme_https':0,
                'suspect_keyword_count':0,'url_entropy':0.0}

heuristic_feature_names = list(url_heuristics('https://example.com').keys())

class URLHeuristicTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        vals = [list(url_heuristics(u).values()) for u in X['url'].astype(str).tolist()]
        return np.array(vals)

heuristic_tf = FunctionTransformer(lambda X: URLHeuristicTransformer().transform(X), validate=False)


## 9) Optional Content-Based Features (disabled by default)

For stability and speed in Colab, this is turned off. You can enable by setting `ENABLE_CONTENT = True`.

In [None]:
ENABLE_CONTENT = False

# Placeholder transformer that outputs zeros to keep ColumnTransformer shape consistent when disabled
class ContentFeatureTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        arr = np.zeros((len(X), 3))  # e.g., status class, content length bin, has_forms
        return arr

content_tf = FunctionTransformer(lambda X: ContentFeatureTransformer().transform(X), validate=False)

## 10) TF-IDF Vectorization for URL/Text Tokens

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

# Tokenizers
def token_words(url: str):
    s = str(url)
    tokens = re.split(r'[/:.\-_?=&]+', s)
    return [t for t in tokens if t]

def identity(x):
    return x

# Vectorizers
char_vectorizer = TfidfVectorizer(
    analyzer='char', ngram_range=(3,5), min_df=2, max_features=30000)
word_vectorizer = TfidfVectorizer(
    tokenizer=token_words, preprocessor=identity, token_pattern=None,
    ngram_range=(1,2), min_df=2, max_features=30000)


## 11) Combine Features with ColumnTransformer and Pipeline

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

numeric_features = heuristic_feature_names

preprocess = ColumnTransformer(
    transformers=[
        ('heuristics', Pipeline([
            ('heur', heuristic_tf),
            ('scaler', StandardScaler(with_mean=False))
        ]), ['url']),
        ('char_tfidf', TfidfVectorizer(analyzer='char', ngram_range=(3,5), min_df=2, max_features=20000), 'url'),
        ('word_tfidf', TfidfVectorizer(tokenizer=token_words, preprocessor=identity, token_pattern=None, ngram_range=(1,2), min_df=2, max_features=20000), 'url'),
        ('content', content_tf, ['url']) if ENABLE_CONTENT else ('content_off', FunctionTransformer(lambda X: np.zeros((len(X),0)), validate=False), ['url'])
    ], remainder='drop', verbose_feature_names_out=False
)

print('Preprocessor ready.')

## 12) Address Class Imbalance

In [None]:
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE

USE_SMOTE = False  # set True to enable SMOTE for imbalance
smote_step = ('smote', SMOTE(random_state=SEED)) if USE_SMOTE else None


## 13) Feature Selection (VarianceThreshold and SelectKBest)

In [None]:
from sklearn.feature_selection import VarianceThreshold, SelectKBest, chi2

USE_VAR_THRESH = True
USE_SELECT_KBEST = True
K_BEST = 20000  # adjust to your dataset size and runtime

feature_select_steps = []
if USE_VAR_THRESH:
    feature_select_steps.append(('var', VarianceThreshold(threshold=1e-6)))
if USE_SELECT_KBEST:
    feature_select_steps.append(('kbest', SelectKBest(chi2, k=min(K_BEST, 50000))))


## 14) Define Candidate Models

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier

candidates = {
    'logreg': LogisticRegression(max_iter=2000, solver='saga', n_jobs=-1, class_weight='balanced', random_state=SEED),
    'linsvc': CalibratedClassifierCV(LinearSVC(C=1.0, class_weight='balanced', random_state=SEED), method='sigmoid', cv=3),
    'rf': RandomForestClassifier(n_estimators=300, max_depth=None, n_jobs=-1, class_weight='balanced_subsample', random_state=SEED),
    'xgb': XGBClassifier(n_estimators=400, learning_rate=0.1, max_depth=6, subsample=0.8, colsample_bytree=0.8, eval_metric='logloss', n_jobs=-1, random_state=SEED, reg_lambda=1.0)
}
print('Candidates:', list(candidates.keys()))

## 15) Baseline Cross-Validation

In [None]:
from sklearn.model_selection import StratifiedKFold, cross_validate
from sklearn.metrics import make_scorer, precision_score, recall_score, f1_score, roc_auc_score, average_precision_score

scoring = {
    'accuracy': 'accuracy',
    'precision': make_scorer(precision_score),
    'recall': make_scorer(recall_score),
    'f1': make_scorer(f1_score),
    'roc_auc': 'roc_auc',
    'pr_auc': make_scorer(average_precision_score, needs_threshold=True)
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)

results = []
for name, est in candidates.items():
    steps = [('preprocess', preprocess)]
    if smote_step:
        steps.append(smote_step)
    steps.extend(feature_select_steps)
    steps.append(('clf', est))
    pipe = ImbPipeline(steps)
    scores = cross_validate(pipe, X_train, y_train, cv=cv, scoring=scoring, n_jobs=-1, return_train_score=False)
    row = {'model': name}
    for k, v in scores.items():
        if k.startswith('test_'):
            row[k.replace('test_', 'mean_')] = np.mean(v)
            row[k.replace('test_', 'std_')] = np.std(v)
    results.append(row)

cv_df = pd.DataFrame(results).sort_values('mean_f1', ascending=False)
cv_df

## 16) Hyperparameter Tuning with RandomizedSearchCV

In [None]:
from sklearn.model_selection import RandomizedSearchCV

# Example param grids (keep small for Colab runtime)
param_grids = {
    'logreg': {
        'clf__C': np.logspace(-2, 2, 10),
        'clf__l1_ratio': np.linspace(0, 1, 5)
    },
    'linsvc': {
        'clf__base_estimator__C': np.logspace(-2, 2, 8)
    },
    'rf': {
        'clf__n_estimators': [200, 300, 500],
        'clf__max_depth': [None, 10, 20],
        'clf__max_features': ['sqrt', 'log2', None]
    },
    'xgb': {
        'clf__n_estimators': [200, 400, 600],
        'clf__max_depth': [4, 6, 8],
        'clf__learning_rate': [0.05, 0.1, 0.2],
        'clf__subsample': [0.7, 0.9, 1.0],
        'clf__colsample_bytree': [0.7, 0.9, 1.0],
        'clf__reg_lambda': [0.5, 1.0, 2.0]
    }
}

best_name = None
best_estimator = None
best_score = -np.inf

for name, est in candidates.items():
    steps = [('preprocess', preprocess)]
    if smote_step:
        steps.append(smote_step)
    steps.extend(feature_select_steps)
    steps.append(('clf', est))
    pipe = ImbPipeline(steps)
    params = param_grids.get(name, {})
    if not params:
        print(f'No param grid for {name}, skipping tuning.')
        continue
    search = RandomizedSearchCV(pipe, params, n_iter=16, scoring='roc_auc', cv=cv, n_jobs=-1, random_state=SEED, refit=True, verbose=1)
    search.fit(X_train, y_train)
    print(name, 'best ROC-AUC:', search.best_score_)
    if search.best_score_ > best_score:
        best_score = search.best_score_
        best_name = name
        best_estimator = search.best_estimator_

print('Best model from tuning:', best_name, 'ROC-AUC:', best_score)


## 17) Fit Best Model and Calibrate Probabilities

In [None]:
from sklearn.calibration import CalibratedClassifierCV

# If tuning was skipped or failed, fallback to a simple strong baseline
if best_estimator is None:
    steps = [('preprocess', preprocess)]
    if smote_step:
        steps.append(smote_step)
    steps.extend(feature_select_steps)
    steps.append(('clf', candidates['xgb']))
    best_estimator = ImbPipeline(steps)
    best_name = 'xgb'

# Calibrate probabilities if underlying estimator lacks predict_proba
clf = best_estimator.named_steps.get('clf')
if not hasattr(clf, 'predict_proba'):
    # Wrap the whole pipeline with calibrated classifier is non-trivial; instead replace the final clf
    base = clf
    calibrated = CalibratedClassifierCV(base_estimator=base, method='sigmoid', cv=3)
    best_estimator.steps[-1] = ('clf', calibrated)

best_estimator.fit(pd.concat([X_train, X_val]), pd.concat([y_train, y_val]))

print('Final model fitted:', best_name)

## 18) Evaluation on Holdout Set (metrics, ROC/PR, confusion matrix)

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc, precision_recall_curve

proba = best_estimator.predict_proba(X_test)[:,1]
preds = (proba >= 0.5).astype(int)

report = classification_report(y_test, preds, output_dict=True)
print(pd.DataFrame(report).T)

cm = confusion_matrix(y_test, preds)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.tight_layout()
plt.savefig(CM_PNG)
plt.show()

fpr, tpr, _ = roc_curve(y_test, proba)
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, label=f'ROC AUC = {roc_auc:.3f}')
plt.plot([0,1],[0,1],'k--')
plt.xlabel('FPR'); plt.ylabel('TPR'); plt.title('ROC Curve'); plt.legend(); plt.tight_layout();
plt.savefig(ROC_PNG)
plt.show()

prec, rec, thr = precision_recall_curve(y_test, proba)
pr_auc = auc(rec, prec)
plt.plot(rec, prec, label=f'PR AUC = {pr_auc:.3f}')
plt.xlabel('Recall'); plt.ylabel('Precision'); plt.title('Precision-Recall Curve'); plt.legend(); plt.tight_layout();
plt.savefig(PR_PNG)
plt.show()


## 19) Comparative Analysis Across Models

In [None]:
# Save CV table and plot a quick comparison
cv_path = os.path.join(REPORTS_DIR, 'cv_baselines.csv')
cv_df.to_csv(cv_path, index=False)

ax = cv_df.plot(x='model', y=['mean_f1','mean_roc_auc'], kind='bar')
plt.title('Baseline CV comparison')
plt.xticks(rotation=0)
plt.tight_layout()
plt.show()

print('Saved CV baselines to:', cv_path)


## 20) Explainability (Permutation Importance and SHAP)

In [None]:
# Permutation importance (on a small sample for speed)
from sklearn.inspection import permutation_importance

sample_idx = np.random.choice(len(X_test), size=min(1000, len(X_test)), replace=False)
perm = permutation_importance(best_estimator, X_test.iloc[sample_idx], y_test.iloc[sample_idx], n_repeats=5, random_state=SEED, n_jobs=-1)
pi = pd.DataFrame({'feature': np.arange(len(perm.importances_mean)), 'importance': perm.importances_mean}).sort_values('importance', ascending=False).head(20)
pi.head()

# SHAP summary (works best for tree-based models)
try:
    import shap
    # Explainer expects raw model and raw features; we can use KernelExplainer on the pipeline proba function
    bg = X_train.sample(min(100, len(X_train)), random_state=SEED)
    explainer = shap.KernelExplainer(best_estimator.predict_proba, bg)
    shap_vals = explainer.shap_values(X_test.sample(min(100, len(X_test)), random_state=SEED))
    shap.summary_plot(shap_vals[1], features=None, show=False)
    plt.tight_layout(); plt.savefig(SHAP_SUMMARY_PNG); plt.show()
except Exception as e:
    print('SHAP explanation skipped:', e)


## 21) Export Trained Pipeline and Metadata (joblib)

In [None]:
import joblib, platform

joblib.dump(best_estimator, PIPELINE_PATH)

meta = {
    'model_name': best_name,
    'python': platform.python_version(),
    'numpy': np.__version__,
    'pandas': pd.__version__,
    'sklearn': sklearn.__version__,
    'xgboost': xgboost.__version__,
    'seed': SEED,
    'train_rows': int(len(X_train)+len(X_val)),
    'test_rows': int(len(X_test)),
    'roc_auc_test': float(roc_auc),
    'pr_auc_test': float(pr_auc),
}
with open(METRICS_JSON, 'w') as f:
    json.dump(meta, f, indent=2)

print('Saved pipeline to:', PIPELINE_PATH)
print('Saved metadata to:', METRICS_JSON)


## 22) Batch Inference Script (no retraining)

In [None]:
from typing import List

infer_pipe = joblib.load(PIPELINE_PATH)

def score_urls(urls: List[str], threshold: float = 0.5):
    df_in = pd.DataFrame({'url': urls})
    probs = infer_pipe.predict_proba(df_in)[:,1]
    labels = (probs >= threshold).astype(int)
    return pd.DataFrame({'url': urls, 'proba_phishing': probs, 'label_pred': labels})

# Demo
score_urls([
    'https://example.com/login',
    'http://192.168.1.10/verify?acc=123',
    'https://secure-bank-auth.com/update',
]).head()

## 23) Streamlit UI App (loads saved pipeline and predicts)

Below is a minimal Streamlit app that loads the saved `phishing_pipeline.joblib` and predicts for single or batch URLs. Save as `streamlit_app/pipeline_app.py` if you want a separate app from the heuristic-based one. No training occurs at runtime.

In [None]:
streamlit_snippet = r'''
import streamlit as st
import pandas as pd
import joblib

st.set_page_config(page_title="Phishing URL Detector (Pipeline)")
st.title("Phishing URL Detector (Pipeline)")

@st.cache_resource
def load_pipeline():
    return joblib.load("models/phishing_pipeline.joblib")

pipe = load_pipeline()

st.subheader("Single URL")
url = st.text_input("Enter URL", value="https://example.com/login")
if st.button("Predict"):
    proba = pipe.predict_proba(pd.DataFrame({'url':[url]}))[:,1][0]
    label = int(proba >= 0.5)
    st.write(f"Prediction: {'Phishing' if label==1 else 'Legit'} (prob={proba:.3f})")

st.subheader("Batch CSV Upload")
file = st.file_uploader("Upload CSV with a 'url' column", type=['csv'])
if file:
    df_in = pd.read_csv(file)
    assert 'url' in df_in.columns, "CSV must contain a 'url' column"
    probs = pipe.predict_proba(df_in[['url']])[:,1]
    df_out = df_in.copy()
    df_out['proba_phishing'] = probs
    df_out['label_pred'] = (probs >= 0.5).astype(int)
    st.dataframe(df_out.head())
'''
print(streamlit_snippet)


## 24) Optional: CLI and FastAPI Inference Endpoints

Below is a minimal FastAPI snippet you can use after saving `phishing_pipeline.joblib`. Run it with `uvicorn` locally (not in Colab).

In [None]:
fastapi_snippet = r'''
from fastapi import FastAPI
from pydantic import BaseModel
import pandas as pd
import joblib

app = FastAPI()
pipe = joblib.load('models/phishing_pipeline.joblib')

class Item(BaseModel):
    url: str

@app.post('/predict')
def predict(item: Item):
    proba = pipe.predict_proba(pd.DataFrame({'url':[item.url]}))[:,1][0]
    label = int(proba >= 0.5)
    return {'proba_phishing': float(proba), 'label_pred': label}
'''
print(fastapi_snippet)


## 25) Reproducibility, Seeds, and Run Logs

We log versions, a dataset hash, and key hyperparameters to the metadata JSON saved earlier.

In [None]:
# Compute a quick dataset hash (on URL and label)
h = hashlib.md5(pd.util.hash_pandas_object(df[['url','label']], index=False).values).hexdigest()
print('Dataset hash:', h)

# Update metadata with hash and params
with open(METRICS_JSON, 'r') as f:
    meta2 = json.load(f)
meta2['dataset_md5'] = h
meta2['best_model_name'] = best_name
meta2['use_smote'] = USE_SMOTE
meta2['use_var_thresh'] = USE_VAR_THRESH
meta2['use_select_kbest'] = USE_SELECT_KBEST
with open(METRICS_JSON, 'w') as f:
    json.dump(meta2, f, indent=2)

print('Updated metadata at:', METRICS_JSON)


## 26) Save Artifacts and Verify

Write all artifacts to `models/` and `models/reports/`, list files, and optionally zip for download in Colab.

In [None]:
import glob
print('Model file exists:', os.path.exists(PIPELINE_PATH))
print('Metrics file exists:', os.path.exists(METRICS_JSON))
print('Reports:', glob.glob(os.path.join(REPORTS_DIR, '*')))

if IN_COLAB:
    print('Optionally zip artifacts for download...')
    # from google.colab import files
    # !zip -r artifacts.zip models/
    # files.download('artifacts.zip')