In [None]:
# train_and_run_xgb.py
"""
Phishing detection — training + interactive test using XGBoost.
Reads CSV at DATA_FILE (default: your provided path).
"""

import os
import sys
import html
import re
import numpy as np
import pandas as pd
from urllib.parse import urlparse
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score, confusion_matrix
from scipy.sparse import hstack, csr_matrix
import joblib

# Use XGBoost if installed; fallback to sklearn RandomForest if not.
try:
    from xgboost import XGBClassifier
    XGB_AVAILABLE = True
except Exception:
    from sklearn.ensemble import RandomForestClassifier
    XGB_AVAILABLE = False

# ================ CONFIG ================
DATA_FILE = '/Users/yahyamohnd/Downloads/URL_final.csv'   # <-- عدل هنا إن لزم
OUT_MODEL = 'xgb_phish_model.pkl'
OUT_VECT_WORD = 'tfidf_word.pkl'
OUT_VECT_CHAR = 'tfidf_char.pkl'
OUT_SCALER = 'url_scaler.pkl'
TEST_SIZE = 0.2
RANDOM_STATE = 42
# ========================================

if not os.path.exists(DATA_FILE):
    raise FileNotFoundError(f"Data file not found: {DATA_FILE}")

print("Loading data:", DATA_FILE)
df = pd.read_csv(DATA_FILE)
print("Raw columns:", list(df.columns))

# If there's no 'text' column, try 'URL' or first string column
if 'text' not in df.columns:
    if 'URL' in df.columns:
        df['text'] = df['URL'].astype(str)
    else:
        text_cols = [c for c in df.columns if df[c].dtype == object]
        if len(text_cols) > 0:
            df['text'] = df[text_cols[0]].astype(str)
        else:
            raise ValueError("No text-like column found in CSV (expecting 'text' or 'URL' or other string column).")

# Ensure label exists and is integer 0/1
if 'label' not in df.columns:
    raise ValueError("CSV must contain a 'label' column with 0 (legit) / 1 (phish).")

df['label'] = df['label'].astype(int)

# ---------- text cleaning ----------
def clean_text_basic(s):
    if pd.isna(s): return ''
    s = str(s)
    s = html.unescape(s)
    s = re.sub(r'<[^>]+>', ' ', s)
    return s.strip()

# Advanced normalization for text (keeps <URL> token)
def clean_text_advanced(s):
    s = clean_text_basic(s)
    s = s.lower()
    s = re.sub(r'http\S+|www\.\S+', '<URL>', s)
    s = re.sub(r'\d+', '<NUM>', s)
    s = re.sub(r'[^\w\s<>]', ' ', s)
    s = re.sub(r'\s+', ' ', s).strip()
    return s

df['text'] = df['text'].apply(clean_text_advanced)

# ---------- URL feature extraction ----------
def extract_url_features(url_text):
    # if text does not look like URL, return zeros (we still produce features for consistency)
    # We'll treat strings with '<url>' or starting with http/www as URLs
    s = str(url_text)
    is_url = ('<url>' in s) or s.startswith('http') or s.startswith('www') or '://' in s
    if not is_url:
        # zero features
        return {k:0 for k in url_feature_cols}

    p = urlparse(s if '://' in s else 'http://'+s)
    scheme = p.scheme
    netloc = p.netloc
    path = p.path or ''
    query = p.query or ''
    full = s if '://' in s else 'http://'+s

    url_length = len(full)
    # simplistic IP check: host is 4 numeric parts
    parts = netloc.split('.')
    has_ip_address = 1 if len(parts)==4 and all(pt.isdigit() for pt in parts) else 0
    dot_count = netloc.count('.') + path.count('.')
    https_flag = 1 if scheme == 'https' else 0
    uniq_chars = len(set(full))
    url_entropy = round((uniq_chars / (len(full)+1)) * 10, 3)
    token_count = max(1, len([t for t in (path + ('?'+query if query else '')).split('/') if t]))
    subdomain_count = max(0, netloc.count('.') - 1)
    query_param_count = 1 if query else 0
    tld = netloc.split('.')[-1] if '.' in netloc else ''
    tld_length = len(tld)
    path_length = len(path)
    has_hyphen_in_domain = 1 if '-' in netloc else 0
    number_of_digits = sum(c.isdigit() for c in full)
    tld_pop_map = {'com':1000,'org':300,'net':200,'io':100,'co':150,'info':50}
    tld_popularity = tld_pop_map.get(tld, 10)
    suspicious_file_extension = 1 if any(full.endswith(ext) for ext in ('.exe','.zip','.php','.asp')) else 0
    domain_name_length = len(netloc)
    percentage_numeric_chars = round((number_of_digits / (len(full)+1)) * 100, 3)

    return {
        "url_length": url_length,
        "has_ip_address": has_ip_address,
        "dot_count": dot_count,
        "https_flag": https_flag,
        "url_entropy": url_entropy,
        "token_count": token_count,
        "subdomain_count": subdomain_count,
        "query_param_count": query_param_count,
        "tld_length": tld_length,
        "path_length": path_length,
        "has_hyphen_in_domain": has_hyphen_in_domain,
        "number_of_digits": number_of_digits,
        "tld_popularity": tld_popularity,
        "suspicious_file_extension": suspicious_file_extension,
        "domain_name_length": domain_name_length,
        "percentage_numeric_chars": percentage_numeric_chars
    }

# Feature column names
url_feature_cols = [
    'url_length','has_ip_address','dot_count','https_flag','url_entropy','token_count',
    'subdomain_count','query_param_count','tld_length','path_length','has_hyphen_in_domain',
    'number_of_digits','tld_popularity','suspicious_file_extension','domain_name_length','percentage_numeric_chars'
]

# If CSV already has these columns, use them; otherwise compute
existing_url_features = [c for c in url_feature_cols if c in df.columns]
missing_features = [c for c in url_feature_cols if c not in df.columns]
if missing_features:
    print("Computing missing URL features:", missing_features)
    # compute row-wise (vectorized apply)
    feats_df = df['text'].apply(lambda t: pd.Series(extract_url_features(t)))
    # append only missing
    for c in missing_features:
        df[c] = feats_df[c].values
    existing_url_features = url_feature_cols

# Build X (TF-IDF + URL features)
# Word-level TF-IDF + char-level TF-IDF (use both)
vect_word = TfidfVectorizer(analyzer='word', ngram_range=(1,2), max_features=4000)
vect_char = TfidfVectorizer(analyzer='char_wb', ngram_range=(3,5), max_features=3000)

Xw = vect_word.fit_transform(df['text'])
Xc = vect_char.fit_transform(df['text'])

# URL dense features -> standardize
X_url = df[existing_url_features].fillna(0).astype(float).values
scaler = StandardScaler()
X_url_scaled = scaler.fit_transform(X_url)

# Final X
X = hstack([Xw, Xc, csr_matrix(X_url_scaled)])
y = df['label'].astype(int).values

print("Feature matrix shape:", X.shape)
print("Labels distribution:", np.bincount(y))

# Train/test split (stratify)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE,
                                                    random_state=RANDOM_STATE, stratify=y)

# If using XGBoost, compute scale_pos_weight to counter imbalance
pos = (y_train==1).sum()
neg = (y_train==0).sum()
scale_pos_weight = neg/pos if pos>0 else 1.0
print("Train pos/neg:", pos, neg, "scale_pos_weight:", round(scale_pos_weight,3))

# Choose model
if XGB_AVAILABLE:
    print("Training XGBoost classifier...")
    clf = XGBClassifier(
        n_estimators=300,
        max_depth=6,
        learning_rate=0.1,
        use_label_encoder=False,
        eval_metric='logloss',
        scale_pos_weight=scale_pos_weight,
        random_state=RANDOM_STATE,
        n_jobs=-1
    )
else:
    print("XGBoost not available — falling back to RandomForest")
    from sklearn.ensemble import RandomForestClassifier
    clf = RandomForestClassifier(n_estimators=300, class_weight='balanced', n_jobs=-1, random_state=RANDOM_STATE)

clf.fit(X_train, y_train)

# Evaluation
y_pred = clf.predict(X_test)
y_prob = clf.predict_proba(X_test)[:,1] if hasattr(clf, "predict_proba") else None

print("\n=== Evaluation on test set ===")
print("Accuracy:", accuracy_score(y_test, y_pred))
if y_prob is not None:
    print("ROC AUC:", roc_auc_score(y_test, y_prob))
print(classification_report(y_test, y_pred))
print("Confusion matrix:\n", confusion_matrix(y_test, y_pred))

# Save models/tools
joblib.dump(clf, OUT_MODEL)
joblib.dump(vect_word, OUT_VECT_WORD)
joblib.dump(vect_char, OUT_VECT_CHAR)
joblib.dump(scaler, OUT_SCALER)
print("Saved model & vectorizers:", OUT_MODEL, OUT_VECT_WORD, OUT_VECT_CHAR, OUT_SCALER)

# ---------------- interactive classifier ----------------
def classify_input(text, threshold=0.5):
    txt_clean = clean_text_advanced(text)
    v_w = vect_word.transform([txt_clean])
    v_c = vect_char.transform([txt_clean])
    # build url features from input text
    feats = extract_url_features(text)
    vals = np.array([feats[c] for c in existing_url_features]).reshape(1,-1)
    vals_scaled = scaler.transform(vals)
    X_in = hstack([v_w, v_c, csr_matrix(vals_scaled)])
    prob = clf.predict_proba(X_in)[:,1][0] if hasattr(clf, "predict_proba") else clf.predict(X_in)[0]
    pred = "Phishing" if prob >= threshold else "Legit"
    return pred, prob

if __name__ == "__main__":
    print("\nInteractive mode — enter text or URL (type 'exit' to quit).")
    while True:
        try:
            s = input("Enter text or URL: ").strip()
        except (KeyboardInterrupt, EOFError):
            print("\nExiting.")
            break
        if s.lower() == 'exit':
            print("Bye.")
            break
        pred, prob = classify_input(s, threshold=0.5)
        print(f"Prediction: {pred} | Prob(phish) = {prob:.3f}")