## First commint to GitHub

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#  DATA CLEANING & PREPROCESSING



Set up & Imports

In [None]:
from __future__ import annotations
from pathlib import Path
from typing import Optional, Tuple, Dict
import sys, json, glob, re

import numpy as np
import pandas as pd

print("Versions -> pandas:", pd.__version__, "| numpy:", np.__version__)

Config

In [None]:
USE_DRIVE = True                            # Mount Google Drive to search for the file
DRIVE_SEARCH_DIR = "/content/drive/MyDrive" # Root folder to search
FILE_PATTERN = "**/Phishing_Mendeley*.csv"  # Pattern to locate your CSV in Drive
FALLBACK_PROMPT_UPLOAD = True               # If not found, open an upload dialog

# Behaviors
DROP_DUPLICATES = True                      # Only drop full-row duplicates (incl. 'id'); if id differs -> keep
DROP_HIGH_MISSING_COLS = False              # If True, drop cols with missing rate > HIGH_MISSING_THRESHOLD
HIGH_MISSING_THRESHOLD = 0.40

# Save to Drive too?
SAVE_BACK_TO_DRIVE = False                  # If True, also copy outputs to DRIVE_OUT_DIR
DRIVE_OUT_DIR = "/content/drive/MyDrive/phishing_cleaned_outputs"

Drive/Load Utilities

In [None]:
def mount_drive_if_needed():
    if USE_DRIVE:
        try:
            from google.colab import drive
            drive.mount('/content/drive')
            print("Drive mounted.")
        except Exception as e:
            print("Drive mount failed or not in Colab:", e)

def find_csv_in_drive(search_dir: str, pattern: str) -> Optional[str]:
    paths = glob.glob(str(Path(search_dir) / pattern), recursive=True)
    return max(paths, key=lambda p: Path(p).stat().st_mtime) if paths else None

def upload_dialog() -> Optional[str]:
    try:
        from google.colab import files
        print("Please upload your CSV file…")
        uploaded = files.upload()
        if not uploaded:
            return None
        name = next(iter(uploaded.keys()))
        print("Uploaded:", name)
        return str(Path("/content") / name)
    except Exception as e:
        print("Upload dialog not available (not in Colab?):", e)
        return None

def read_raw(p: str | Path) -> pd.DataFrame:
    for enc in ("utf-8", "utf-8-sig", "latin-1"):
        try:
            return pd.read_csv(p, encoding=enc, engine="python")
        except Exception:
            pass
    raise RuntimeError("Failed to read CSV with utf-8 / utf-8-sig / latin-1.")

Locate & Load the CSV

In [None]:
mount_drive_if_needed()

csv_path = None
if USE_DRIVE:
    csv_path = find_csv_in_drive(DRIVE_SEARCH_DIR, FILE_PATTERN)
    print("Drive search:", "FOUND" if csv_path else "Not found")

if not csv_path and FALLBACK_PROMPT_UPLOAD:
    csv_path = upload_dialog()

if not csv_path:
    raise FileNotFoundError(
        "Could not locate a dataset. Set USE_DRIVE=True with correct DRIVE_SEARCH_DIR/FILE_PATTERN "
        "or enable FALLBACK_PROMPT_UPLOAD."
    )

print("Using dataset:", csv_path)

raw_df = read_raw(csv_path)
orig_shape = (int(raw_df.shape[0]), int(raw_df.shape[1]))

# Preserve CamelCase names; trim whitespace inside string cells
df = raw_df.copy()
for c in df.select_dtypes(include=[object]).columns:
    df[c] = df[c].astype(str).str.strip()

print("Loaded shape:", df.shape)

Dataset Info

In [None]:
import io
from textwrap import indent

print("\n=== DATASET INFO ===")
print("Path:", csv_path)
print("Original shape:", orig_shape)

print("\n.dtypes (first 50):")
print(df.dtypes.head(50))

print("\n.info():")
# Capture df.info() into a string buffer so we can print it nicely
buffer = io.StringIO()
df.info(buf=buffer)
info_str = buffer.getvalue()
print(info_str)

print("\n.head(5):")
display(df.head(5))

print("\nMissingness (top 20):")
miss = df.isna().mean().sort_values(ascending=False)
display(miss.head(20).to_frame("missing_rate"))

# Early guess of target column (just for info; final alignment happens later)
target_guess = next(
    (c for c in ["CLASS_LABEL", "class_label", "Class", "Label", "Result", "label", "result", "target", "Target"]
     if c in df.columns),
    None
)
print("\nTarget column guess:", target_guess)
if target_guess is not None:
    # Show a small sample of unique values
    try:
        uniques = pd.unique(df[target_guess].dropna())
        print("Sample target values:", uniques[:10])
    except Exception as e:
        print("Could not preview target values:", e)

Helper Functions for Cleaning

In [None]:
def coerce_numeric_like(df: pd.DataFrame) -> pd.DataFrame:
    """Coerce object columns that look numeric (>=80% numeric-like) into numeric dtype."""
    df = df.copy()
    for c in df.columns:
        if df[c].dtype == object:
            s = df[c].astype(str).str.strip()
            mask = s.str.match(r'^[+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?$')
            if mask.mean() >= 0.8:
                df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

def handle_infinities(df: pd.DataFrame) -> pd.DataFrame:
    return df.replace([np.inf, -np.inf], np.nan)

def drop_low_variance(df: pd.DataFrame, target_col: Optional[str]) -> Tuple[pd.DataFrame, list]:
    nunique = df.nunique(dropna=False)
    lowvar = nunique[nunique <= 1].index.tolist()
    if target_col in lowvar:
        lowvar.remove(target_col)
    if lowvar:
        df = df.drop(columns=lowvar)
    return df, lowvar

def impute_missing(df: pd.DataFrame, target_col: Optional[str]) -> Tuple[pd.DataFrame, Dict]:
    df = df.copy()
    report: Dict = {}

    if DROP_HIGH_MISSING_COLS:
        miss_rate = df.isna().mean().sort_values(ascending=False)
        drop_cols = miss_rate[miss_rate > HIGH_MISSING_THRESHOLD].index.tolist()
        if target_col in drop_cols:
            drop_cols.remove(target_col)
        if drop_cols:
            df = df.drop(columns=drop_cols)
        report["dropped_columns_missing_gt_threshold"] = {
            "threshold": HIGH_MISSING_THRESHOLD,
            "columns": drop_cols
        }
    else:
        report["dropped_columns_missing_gt_threshold"] = {
            "threshold": HIGH_MISSING_THRESHOLD,
            "columns": []
        }

    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in df.columns if c not in num_cols]
    if target_col and target_col in cat_cols:
        cat_cols.remove(target_col)

    imputations = {"numeric": {}, "categorical": {}}
    for c in num_cols:
        if df[c].isna().any():
            med = df[c].median()
            df[c] = df[c].fillna(med)
            imputations["numeric"][c] = None if pd.isna(med) else float(med)

    for c in cat_cols:
        if df[c].isna().any():
            mode = df[c].mode(dropna=True)
            val = mode.iloc[0] if not mode.empty else "__missing__"
            df[c] = df[c].fillna(val)
            imputations["categorical"][c] = val

    report["imputations"] = imputations
    return df, report

def finalize_int_casts(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for c in df.columns:
        if pd.api.types.is_float_dtype(df[c]):
            s = df[c]
            if np.allclose(s.dropna() % 1, 0):
                try:
                    df[c] = s.astype("Int64")
                except Exception:
                    pass
    return df

def make_jsonable(obj):
    import numpy as _np
    import pandas as _pd
    if isinstance(obj, dict):
        return {k: make_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple, set)):
        return [make_jsonable(v) for v in obj]
    if isinstance(obj, (_np.integer,)):
        return int(obj)
    if isinstance(obj, (_np.floating,)):
        return float(obj)
    if isinstance(obj, (_np.bool_,)):
        return bool(obj)
    if isinstance(obj, _np.ndarray):
        return obj.tolist()
    if isinstance(obj, _pd.Series):
        return obj.tolist()
    if isinstance(obj, _pd.DataFrame):
        return obj.to_dict(orient="list")
    return obj

Cleaning Pipeline

In [None]:
work_df = df.copy()
report = {
    "source": csv_path,
    "original_shape": (int(work_df.shape[0]), int(work_df.shape[1])),
    "settings": {
        "DROP_DUPLICATES": bool(DROP_DUPLICATES),
        "DROP_HIGH_MISSING_COLS": bool(DROP_HIGH_MISSING_COLS),
        "HIGH_MISSING_THRESHOLD": float(HIGH_MISSING_THRESHOLD),
    },
    "steps": {}
}

# Duplicates (full-row)
dup_count = int(work_df.duplicated().sum())
if DROP_DUPLICATES and dup_count > 0:
    work_df = work_df.drop_duplicates(keep="first").reset_index(drop=True)
report["steps"]["duplicate_rows_found_full_row"] = dup_count
report["steps"]["duplicates_removed"] = int(dup_count if DROP_DUPLICATES else 0)
print(f"Duplicates found: {dup_count} | Removed: {report['steps']['duplicates_removed']}")

# Drop non-predictive ID AFTER dedupe
dropped_non_predictive = []
if "id" in work_df.columns:
    work_df = work_df.drop(columns=["id"])
    dropped_non_predictive.append("id")
report["steps"]["dropped_non_predictive"] = dropped_non_predictive
if dropped_non_predictive:
    print("Dropped columns (non-predictive):", dropped_non_predictive)

# Coerce numeric-like; handle ±inf
work_df = coerce_numeric_like(work_df)
work_df = handle_infinities(work_df)

# Target alignment (prefer CLASS_LABEL, but auto-detect if changed)
target_col = None
for cand in ["CLASS_LABEL", "class_label", "Class", "Label", "Result", "label", "result", "target", "Target"]:
    if cand in work_df.columns:
        target_col = cand
        break
if target_col is None:
    raise ValueError("Target column not found (expected 'CLASS_LABEL' or close variant).")

# Ensure numeric binary target
if work_df[target_col].dtype == object:
    y_num = pd.to_numeric(work_df[target_col], errors="coerce")
    if y_num.isna().any():
        y_num = pd.Series(pd.factorize(work_df[target_col].astype(str).str.strip().str.lower())[0], index=work_df.index)
    work_df[target_col] = y_num

uniq = set(pd.unique(work_df[target_col].dropna()))
if uniq.issubset({-1, 0, 1}) and uniq != {0, 1}:
    # If dataset uses -1/1 or -1/0/1, map negatives to 1 (phishing) and non-negatives to 0
    work_df[target_col] = work_df[target_col].map(lambda v: 1 if v < 0 else (0 if v > 0 else 0))

report["steps"]["target_info"] = {
    "name": target_col,
    "unique_values_after_normalization": sorted([int(x) for x in pd.unique(work_df[target_col].dropna())])
}
print("Target column:", target_col)
print("Target uniques (post-normalization):", report["steps"]["target_info"]["unique_values_after_normalization"])

# Drop truly constant columns (except target)
work_df, lowvar_dropped = drop_low_variance(work_df, target_col=target_col)
report["steps"]["low_variance_dropped"] = lowvar_dropped
if lowvar_dropped:
    print("Dropped low-variance cols:", lowvar_dropped)

# Impute missing values (no row dropping)
work_df, mv_report = impute_missing(work_df, target_col=target_col)
report["steps"]["missing_value_handling"] = mv_report
print("Imputation summary:", json.dumps(mv_report, indent=2)[:1000], "...")

# Cast floats-that-are-integers to Int64
work_df = finalize_int_casts(work_df)

print("\nPost-clean shape:", work_df.shape)

Save Cleaned Data & Report

In [None]:
orig_dir = Path(csv_path).parent  # same folder as original file
csv_out = orig_dir / "phishing_mendeley_cleaned.csv"
json_report = orig_dir / "phishing_mendeley_cleaned_report.json"

# Save cleaned CSV
work_df.to_csv(csv_out, index=False)

# Update & save JSON report
report.update({
    "final_shape": (int(work_df.shape[0]), int(work_df.shape[1])),
    "row_delta": int(work_df.shape[0] - orig_shape[0]),
    "outputs": {
        "csv": str(csv_out),
        "json_report": str(json_report),
    }
})

with open(json_report, "w", encoding="utf-8") as jf:
    json.dump(make_jsonable(report), jf, indent=2, ensure_ascii=False)

print("Saved CSV       :", csv_out)
print("Saved JSON      :", json_report)

# Optional copy back to Drive output folder
if SAVE_BACK_TO_DRIVE and USE_DRIVE:
    outdir = Path(DRIVE_OUT_DIR)
    outdir.mkdir(parents=True, exist_ok=True)
    dst_csv = outdir / csv_out.name
    dst_json = outdir / json_report.name
    _ = Path(dst_csv).write_bytes(Path(csv_out).read_bytes())
    _ = Path(dst_json).write_bytes(Path(json_report).read_bytes())
    print("Also copied to  :", outdir)

Final Summary & Target Distribution

In [None]:
print("\n=== SUMMARY ===")
print("Source          :", report['source'])
print("Original shape  :", report['original_shape'])
print("Final shape     :", report['final_shape'])
print("Row delta       :", report['row_delta'])
print("Dup (full-row)  :", report['steps']['duplicate_rows_found_full_row'],
      "| removed:", report['steps']['duplicates_removed'])
print("Dropped (non-predictive):", report['steps']['dropped_non_predictive'])

print("\nTarget:", report["steps"]["target_info"]["name"])
print("Target uniques :", report["steps"]["target_info"]["unique_values_after_normalization"])
print("\nTarget distribution:")
display(work_df[report["steps"]["target_info"]["name"]].value_counts(dropna=False).to_frame("count"))

print("\nPreview cleaned data:")
display(work_df.head(10))

# ============ MODEL TRAINING ============

# CHONG MUN SEONG (TP063440)

### Imports & constant

In [None]:
import os, glob, re, math, json, random
from pathlib import Path

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight

import torch
import torch.nn as nn
import torch.nn.functional as F

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

TOP_K = 20
USE_THRESHOLD = False
THRESH = 0.9935

HIDDEN_DIM = 128
DROPOUT = 0.35
LR = 1e-3
WEIGHT_DECAY = 5e-4
EPOCHS = 100
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
EARLY_STOP_PATIENCE = 12
LR_SCHED_PATIENCE = 5
LR_DECAY_FACTOR = 0.5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

### Load cleaned csv file

In [None]:
candidates = glob.glob("/content/drive/**/phishing_mendeley_cleaned.csv", recursive=True)
if not candidates:
    candidates = glob.glob("/content/**/phishing_mendeley_cleaned.csv", recursive=True)

if not candidates:
    candidates = glob.glob("/content/drive/**/Phishing_Mendeley*.csv", recursive=True) + \
                 glob.glob("/content/**/Phishing_Mendeley*.csv", recursive=True)

if not candidates:
    raise FileNotFoundError("Couldn't find 'phishing_mendeley_cleaned.csv'. Please set csv_path manually below.")

csv_path = max(candidates, key=lambda p: Path(p).stat().st_mtime)
print("Using:", csv_path)

df = pd.read_csv(csv_path, engine="python")
print(df.shape, "columns:", len(df.columns))
df.head(3)

### URL preprocessing & structural feature

In [None]:
target_candidates = ["CLASS_LABEL", "class_label", "Class", "Label", "Result", "label", "result", "target", "Target"]
target_col = next((c for c in target_candidates if c in df.columns), None)
if target_col is None:
    raise ValueError("Target column not found. Please rename your label to one of: " + ", ".join(target_candidates))

y = pd.to_numeric(df[target_col], errors="coerce").fillna(0).astype(int).values
print("Target:", target_col, "unique:", np.unique(y, return_counts=True))

url_col_candidates = ["url","URL","Url","URl","full_url","FullURL","Address","address","Domain","domain","Hostname","hostname"]
url_col = next((c for c in url_col_candidates if c in df.columns and df[c].astype(str).str.contains(r'\.|http', na=False).any()), None)
print("URL column detected:", url_col)

### URL semantic features & structural features

In [None]:
def strip_scheme_www(s: str) -> str:
    s = re.sub(r'^\s*https?://', '', str(s).strip(), flags=re.I)
    s = re.sub(r'^\s*www\.', '', s, flags=re.I)
    return s

def is_ip_domain(host: str) -> bool:
    return bool(re.fullmatch(r'\d{1,3}(?:\.\d{1,3}){3}', host))

SHORTENERS = set("""
bit.ly goo.gl t.co ow.ly is.gd buff.ly tinyurl.com lnkd.in rebrand.ly cutt.ly
t.ly s.id v.gd adf.ly chilp.it clck.ru fb.me youtu.be
""".split())

def parse_host_path(url_no_scheme: str):
    parts = url_no_scheme.split('/', 1)
    host = parts[0].split('?')[0]
    path = parts[1] if len(parts) > 1 else ""
    return host.lower(), path

def tld_prune(host: str):
    segs = [s for s in host.split('.') if s]
    if len(segs) >= 2:
        core = segs[:-1]
    else:
        core = segs
    if core and core[0] == 'www':
        core = core[1:]
    return core

def subdomain_count(host: str) -> int:
    core = tld_prune(host)
    return max(0, len(core) - 1)

def has_double_slash_in_path(path: str) -> bool:
    return '//' in path

def is_shortener(host: str) -> bool:
    return host in SHORTENERS

CHARS = [chr(i) for i in range(32, 127)]
char2idx = {ch:i for i,ch in enumerate(CHARS)}
EMB_DIM = 16
rng = np.random.RandomState(SEED)
char_emb = rng.normal(loc=0.0, scale=1.0, size=(len(CHARS), EMB_DIM)).astype(np.float32)

def url_semantic_vec(url: str) -> np.ndarray:
    if not isinstance(url, str):
        return np.zeros(EMB_DIM, dtype=np.float32)
    s = strip_scheme_www(url)
    if not s:
        return np.zeros(EMB_DIM, dtype=np.float32)
    vecs = []
    for ch in s:
        idx = char2idx.get(ch)
        if idx is not None:
            vecs.append(char_emb[idx])
    if not vecs:
        return np.zeros(EMB_DIM, dtype=np.float32)
    return np.mean(np.vstack(vecs), axis=0).astype(np.float32)

def url_struct_vec(url: str) -> np.ndarray:
    if not isinstance(url, str):
        return np.zeros(7, dtype=np.float32)
    raw = url.strip()
    url_noscheme = strip_scheme_www(raw)
    host, path = parse_host_path(url_noscheme)

    f_ip = 1.0 if is_ip_domain(host) else 0.0
    f_len = float(len(raw))
    f_short = 1.0 if is_shortener(host) else 0.0
    f_at = 1.0 if '@' in raw else 0.0
    f_doubleslash = 1.0 if has_double_slash_in_path(path) else 0.0
    f_dash = 1.0 if '-' in host else 0.0
    f_sub = float(subdomain_count(host))

    return np.array([f_ip, f_len, f_short, f_at, f_doubleslash, f_dash, f_sub], dtype=np.float32)

### Build feature matrix

In [None]:
use_semantic = url_col is not None

if use_semantic:
    urls = df[url_col].astype(str).fillna("")
    SEM = np.vstack([url_semantic_vec(u) for u in urls])
    STR = np.vstack([url_struct_vec(u) for u in urls])
    X = np.hstack([SEM, STR])   # 16 + 7 = 23 dims
    feature_desc = f"Using SEM(16) + STR(7) => {X.shape[1]} dims"
else:
    possible_map = {
        "having_IP_Address": None,
        "URL_Length": None,
        "Shortining_Service": None,
        "having_At_Symbol": None,
        "double_slash_redirecting": None,
        "Prefix_Suffix": None,
        "having_Sub_Domain": None
    }
    avail = [c for c in possible_map if c in df.columns]
    if len(avail) >= 3:
        X = df[avail].astype(float).values
        feature_desc = f"Structural-only from dataset columns: {avail}"
    else:
        num_cols = [c for c in df.columns if c != target_col and pd.api.types.is_numeric_dtype(df[c])]
        X = df[num_cols].astype(float).values
        feature_desc = f"No URL text or standard structural columns detected; using numeric features: {len(num_cols)} cols"

print(feature_desc)
print("X shape:", X.shape)

### Train/Val/Test split

In [None]:
idx_all = np.arange(len(y))
X_train, X_tmp, y_train, y_tmp, idx_train, idx_tmp = train_test_split(
    X, y, idx_all, test_size=VAL_SPLIT + TEST_SPLIT, random_state=SEED, stratify=y)

val_size = VAL_SPLIT / (VAL_SPLIT + TEST_SPLIT)
X_val, X_test, y_val, y_test, idx_val, idx_test = train_test_split(
    X_tmp, y_tmp, idx_tmp, test_size=(1 - val_size), random_state=SEED, stratify=y_tmp)

print("Splits:", len(idx_train), len(idx_val), len(idx_test))

scaler = StandardScaler().fit(X_train)
X_std = scaler.transform(X)

train_mask = np.zeros(len(y), dtype=bool); train_mask[idx_train] = True
val_mask   = np.zeros(len(y), dtype=bool); val_mask[idx_val]   = True
test_mask  = np.zeros(len(y), dtype=bool); test_mask[idx_test] = True

### Build the Top-k cosine graph

In [None]:
def build_topk_graph(features: np.ndarray, k: int = 12, use_threshold=False, thresh=0.9935):
    if use_threshold:
        nn = NearestNeighbors(n_neighbors=min(64, features.shape[0]-1), metric='cosine', algorithm='auto', n_jobs=-1)
        nn.fit(features)
        dists, nbrs = nn.kneighbors(features, return_distance=True)
        sims = 1.0 - dists
        rows, cols = [], []
        for i in range(features.shape[0]):
            for sim, j in zip(sims[i], nbrs[i]):
                if i == j:
                    continue
                if sim >= thresh:
                    rows.append(i); cols.append(j)
    else:
        nn = NearestNeighbors(n_neighbors=min(k+1, features.shape[0]), metric='cosine', algorithm='auto', n_jobs=-1)
        nn.fit(features)
        dists, nbrs = nn.kneighbors(features, return_distance=True)
        rows, cols = [], []
        for i in range(features.shape[0]):
            for j in nbrs[i]:
                if i == j:
                    continue
                rows.append(i); cols.append(j)

    edges = set(zip(rows, cols))
    edges |= set((j,i) for (i,j) in edges)
    rows, cols = zip(*edges) if edges else ([],[])
    return np.array(rows, dtype=np.int64), np.array(cols, dtype=np.int64)

rows, cols = build_topk_graph(X_std, k=TOP_K, use_threshold=USE_THRESHOLD, thresh=THRESH)
print("Edges (undirected, no self-loops):", len(rows))

### Normalize adjacency and define a 2 layer GCN

In [None]:
def build_normalized_adj(n_nodes: int, rows: np.ndarray, cols: np.ndarray):
    rows_all = np.concatenate([rows, np.arange(n_nodes)])
    cols_all = np.concatenate([cols, np.arange(n_nodes)])
    data = np.ones_like(rows_all, dtype=np.float32)

    idx = np.vstack([rows_all, cols_all])
    A = torch.sparse_coo_tensor(indices=idx, values=torch.from_numpy(data), size=(n_nodes, n_nodes))
    A = A.coalesce()

    deg = torch.sparse.sum(A, dim=1).to_dense()
    deg_inv_sqrt = torch.pow(deg + 1e-8, -0.5)
    d_i = deg_inv_sqrt[rows_all]
    d_j = deg_inv_sqrt[cols_all]
    norm_vals = torch.from_numpy(data) * d_i * d_j

    A_norm = torch.sparse_coo_tensor(indices=idx, values=norm_vals, size=(n_nodes, n_nodes))
    return A_norm.coalesce()

class GCNLayer(nn.Module):
    def __init__(self, in_dim, out_dim, dropout=0.0):
        super().__init__()
        self.lin = nn.Linear(in_dim, out_dim, bias=False)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, A_norm):
        x = self.dropout(x)
        x = torch.sparse.mm(A_norm, x)
        x = self.lin(x)
        return x

class GCN(nn.Module):
    def __init__(self, in_dim, hidden, out_dim=2, dropout=0.25):
        super().__init__()
        self.gcn1 = GCNLayer(in_dim, hidden, dropout)
        self.gcn2 = GCNLayer(hidden, out_dim, dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, A_norm):
        x = self.gcn1(x, A_norm)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.gcn2(x, A_norm)
        return x

X_tensor = torch.from_numpy(X_std.astype(np.float32))
y_tensor = torch.from_numpy(y.astype(np.int64))
A_norm = build_normalized_adj(n_nodes=X_tensor.shape[0], rows=rows, cols=cols)

train_mask_t = torch.from_numpy(train_mask)
val_mask_t   = torch.from_numpy(val_mask)
test_mask_t  = torch.from_numpy(test_mask)

X_tensor = X_tensor.to(device)
y_tensor = y_tensor.to(device)
A_norm   = A_norm.to(device)
train_mask_t = train_mask_t.to(device)
val_mask_t = val_mask_t.to(device)
test_mask_t = test_mask_t.to(device)

model = GCN(in_dim=X_tensor.shape[1], hidden=HIDDEN_DIM, out_dim=2, dropout=DROPOUT).to(device)

pos_weight = float((y == 0).sum() / max(1, (y == 1).sum()))
weights = torch.tensor([1.0, pos_weight], dtype=torch.float32, device=device)
criterion = nn.CrossEntropyLoss(weight=weights)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

### Train & Evaluate

In [None]:
def evaluate(split_mask):
    model.eval()
    with torch.no_grad():
        logits = model(X_tensor, A_norm)
        probs = F.softmax(logits, dim=1)
        preds = probs.argmax(dim=1)
        y_true = y_tensor[split_mask].detach().cpu().numpy()
        y_pred = preds[split_mask].detach().cpu().numpy()
        acc = accuracy_score(y_true, y_pred)
        prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
        return acc, prec, rec, f1

best_val = (-1, -1, -1, -1)
best_state = None

for epoch in range(1, EPOCHS+1):
    model.train()
    optimizer.zero_grad()
    logits = model(X_tensor, A_norm)
    loss = criterion(logits[train_mask_t], y_tensor[train_mask_t])
    loss.backward()
    optimizer.step()

    if epoch % 1 == 0:
        tr = evaluate(train_mask_t)
        va = evaluate(val_mask_t)
        if va[3] > best_val[3]:
            best_val = va
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}

        print(f"Epoch {epoch:03d} | Loss {loss.item():.4f} | "
              f"Train A/P/R/F1: {tr[0]:.3f}/{tr[1]:.3f}/{tr[2]:.3f}/{tr[3]:.3f} | "
              f"Val A/P/R/F1: {va[0]:.3f}/{va[1]:.3f}/{va[2]:.3f}/{va[3]:.3f}")

if best_state is not None:
    model.load_state_dict({k: v.to(device) for k, v in best_state.items()})

ta = evaluate(test_mask_t)
print("\n=== TEST METRICS (Best F1 on Val) ===")
print(f"Accuracy : {ta[0]:.4f}")
print(f"Precision: {ta[1]:.4f}")
print(f"Recall   : {ta[2]:.4f}")
print(f"F1-score : {ta[3]:.4f}")

### Visualize graph

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay,
    classification_report, roc_curve, auc,
    precision_recall_curve
)
import numpy as np
import torch
import torch.nn.functional as F

def _mask_np(mask_t):
    return mask_t.detach().cpu().numpy().astype(bool)

def get_split_preds(mask_t):
    mask = _mask_np(mask_t)
    model.eval()
    with torch.no_grad():
        logits = model(X_tensor, A_norm)
        probs1 = F.softmax(logits, dim=1)[:, 1].detach().cpu().numpy()
        preds = (probs1 >= 0.5).astype(int)
    y_true = y_tensor.detach().cpu().numpy()[mask]
    y_pred = preds[mask]
    y_score = probs1[mask]
    return y_true, y_pred, y_score

for split_name, m in [("Train", train_mask_t), ("Val", val_mask_t), ("Test", test_mask_t)]:
    y_true, y_pred, _ = get_split_preds(m)
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(cm, display_labels=[0,1])
    disp.plot(values_format="d", cmap="Blues")
    plt.title(f"{split_name} Confusion Matrix")
    plt.show()
    print(f"{split_name} classification report:\n",
          classification_report(y_true, y_pred, digits=4))

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for split_name, m in [("Train", train_mask_t), ("Val", val_mask_t), ("Test", test_mask_t)]:
    y_true, _, y_score = get_split_preds(m)
    fpr, tpr, _ = roc_curve(y_true, y_score)
    axes[0].plot(fpr, tpr, label=f"{split_name} AUC={auc(fpr,tpr):.3f}")
axes[0].plot([0,1],[0,1],'--',linewidth=1)
axes[0].set_title("ROC Curve"); axes[0].set_xlabel("FPR"); axes[0].set_ylabel("TPR"); axes[0].legend()

for split_name, m in [("Train", train_mask_t), ("Val", val_mask_t), ("Test", test_mask_t)]:
    y_true, _, y_score = get_split_preds(m)
    prec, rec, _ = precision_recall_curve(y_true, y_score)
    axes[1].plot(rec, prec, label=f"{split_name}")
axes[1].set_title("Precision–Recall Curve"); axes[1].set_xlabel("Recall"); axes[1].set_ylabel("Precision"); axes[1].legend()

plt.tight_layout()
plt.show()

In [None]:
n = X_tensor.shape[0]
deg_out = np.zeros(n, dtype=np.float32)
for i, w in zip(rows, weights):
    deg_out[i] += w
plt.figure(figsize=(6,4))
plt.hist(deg_out, bins=30)
plt.title("Weighted Out-Degree Distribution")
plt.xlabel("Sum of edge weights (per node)"); plt.ylabel("Count")
plt.show()

# SOO CHEN KANG (TP065578)

lightgbm Model Code

In [None]:
import pandas as pd
import numpy as np
import joblib, json
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
    confusion_matrix
)
from lightgbm import LGBMClassifier

# ------------------- Paths -------------------
MODEL_PATH = "/content/drive/MyDrive/Colab Notebooks/lightgbm_top32.pkl"
FEATURE_PATH = "/content/drive/MyDrive/Colab Notebooks/lightgbm_features_top32.json"

# ------------------- Load Data -------------------
df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/phishing_mendeley_cleaned.csv")

# ------------------- Features & Target -------------------
target_column = 'CLASS_LABEL'
X = df.drop(columns=[target_column])
y = df[target_column]

# ------------------- Train/Test Split -------------------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# ------------------- Best Parameters -------------------
best_params = {
    "n_estimators": 800,
    "learning_rate": 0.05,
    "num_leaves": 127,
    "feature_fraction": 0.7,
    "bagging_fraction": 0.6,
    "bagging_freq": 1,
    "min_child_samples": 20,
    "reg_alpha": 0,
    "reg_lambda": 0,
    "random_state": 42,
    "n_jobs": -1
}

# ------------------- Train Model -------------------
lgbm = LGBMClassifier(**best_params)
lgbm.fit(X_train, y_train)

# ------------------- Evaluate -------------------
y_pred = lgbm.predict(X_test)

metrics = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1 Score": f1_score(y_test, y_pred),
    "ROC-AUC": float(roc_auc_score(y_test, y_pred))
}

# ------------------- Feature Importance -------------------
feat_imp = pd.Series(lgbm.feature_importances_, index=X.columns).sort_values(ascending=False)
top32_features = feat_imp.head(32)
print("\nTop-32 Features:\n", top32_features)

# ------------------- Save Model & Features -------------------
joblib.dump(lgbm, MODEL_PATH)
json.dump(list(top32_features.index), open(FEATURE_PATH, "w"))

print(f"\nSaved model -> {MODEL_PATH}")
print(f"Saved features -> {FEATURE_PATH}")

# Convert to DataFrame for nice table view
metrics_df = pd.DataFrame(metrics, index=["LightGBM (Top-32 Features)"])
print("\n=== Final Top-32 Model Performance ===")
print(metrics_df)

# ------------------- Confusion Matrix -------------------
cm = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=["Legitimate", "Phishing"],
            yticklabels=["Legitimate", "Phishing"])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix (Test Set)")
plt.show()


# TENG YI LING (TP065686)

# ============ FINAL RESULT (XGBOOST) ============