# URL Anomaly Detection
Advanced unsupervised detection of unusual URLs using engineered features + IsolationForest/LOF.


## Load and Parse Data
Re-load both CSVs, normalize labels, and derive structural URL pieces.

In [1]:
import pandas as pd
import numpy as np
from urllib.parse import urlparse
from pathlib import Path

phishing_path = "Phishing URLs.csv"
legit_path = "URL dataset.csv"

phish_df = pd.read_csv(phishing_path, dtype=str)
legit_df = pd.read_csv(legit_path, dtype=str)

# Normalize columns
phish_df.columns = [c.strip().lower() for c in phish_df.columns]
legit_df.columns = [c.strip().lower() for c in legit_df.columns]
phish_df = phish_df.rename(columns={"type": "label"})
legit_df = legit_df.rename(columns={"type": "label"})

phish_df["label"] = "phishing"
legit_df["label"] = legit_df["label"].str.lower().fillna("legitimate")
phish_df["source"] = "phish_file"
legit_df["source"] = "legit_file"

combined = pd.concat([phish_df, legit_df], ignore_index=True)
print("Shapes → phishing:", phish_df.shape, "legit:", legit_df.shape, "combined:", combined.shape)

# URL parse

def parse_url(u: str):
    try:
        parsed = urlparse(u)
    except Exception:
        return {"scheme": None, "host": None, "path": None, "query": None, "fragment": None}
    return {
        "scheme": parsed.scheme or None,
        "host": parsed.netloc or None,
        "path": parsed.path or None,
        "query": parsed.query or None,
        "fragment": parsed.fragment or None,
    }

parsed_df = combined["url"].apply(parse_url).apply(pd.Series).add_prefix("url_")
combined_parsed = pd.concat([combined, parsed_df], axis=1)

# Host pieces (lightweight, no external tldextract)
def get_tld(host: str):
    if not isinstance(host, str) or not host:
        return None
    parts = host.lower().split(".")
    return parts[-1] if len(parts) >= 2 else None

def get_domain_core(host: str):
    if not isinstance(host, str) or not host:
        return None
    parts = host.lower().split(".")
    return ".".join(parts[-2:]) if len(parts) >= 2 else host.lower()

combined_parsed["tld"] = combined_parsed["url_host"].apply(get_tld)
combined_parsed["domain_core"] = combined_parsed["url_host"].apply(get_domain_core)
combined_parsed["url_len"] = combined_parsed["url"].str.len()
combined_parsed["path_len"] = combined_parsed["url_path"].str.len()
combined_parsed["query_len"] = combined_parsed["url_query"].str.len()

print(combined_parsed.head(3))

Shapes → phishing: (54807, 3) legit: (450176, 3) combined: (504983, 3)
                                                 url     label      source  \
0  https://docs.google.com/presentation/d/e/2PACX...  phishing  phish_file   
1    https://btttelecommunniccatiion.weeblysite.com/  phishing  phish_file   
2                        https://kq0hgp.webwave.dev/  phishing  phish_file   

  url_scheme                                url_host  \
0      https                         docs.google.com   
1      https  btttelecommunniccatiion.weeblysite.com   
2      https                      kq0hgp.webwave.dev   

                                            url_path  \
0  /presentation/d/e/2PACX-1vTVj7OXwAUKJDv57jBmVg...   
1                                                  /   
2                                                  /   

                                        url_query url_fragment  tld  \
0  start=false&loop=false&delayms=3000&slide=id.p         None  com   
1                       

## Feature Engineering for Anomalies
Length stats, special-char counts, rarity encodings (TLD/domain frequency), scheme flags, query/fragment presence.

In [2]:
import re

# Frequency-based rarity encodings
tld_freq = combined_parsed["tld"].value_counts()
domain_freq = combined_parsed["domain_core"].value_counts()

combined_parsed["tld_freq"] = combined_parsed["tld"].map(tld_freq).fillna(1)
combined_parsed["domain_freq"] = combined_parsed["domain_core"].map(domain_freq).fillna(1)

# Special character counts
special_chars = r"[\-_.@%]"
combined_parsed["path_specials"] = combined_parsed["url_path"].fillna("").str.count(special_chars)
combined_parsed["query_specials"] = combined_parsed["url_query"].fillna("").str.count(special_chars)

combined_parsed["has_ip_host"] = combined_parsed["url_host"].str.contains(r"^\d+\.\d+\.\d+\.\d+$", regex=True, na=False)
combined_parsed["has_query"] = combined_parsed["url_query"].fillna("").str.len().gt(0)
combined_parsed["has_fragment"] = combined_parsed["url_fragment"].fillna("").str.len().gt(0)
combined_parsed["is_https"] = combined_parsed["url_scheme"].eq("https")

# Log-frequency to temper scale
combined_parsed["tld_freq_log"] = np.log1p(combined_parsed["tld_freq"])
combined_parsed["domain_freq_log"] = np.log1p(combined_parsed["domain_freq"])

# Final feature matrix
feature_cols = [
    "url_len", "path_len", "query_len",
    "path_specials", "query_specials",
    "tld_freq_log", "domain_freq_log",
    "has_ip_host", "has_query", "has_fragment", "is_https"
]

X_full = combined_parsed[feature_cols].copy()
X_full[feature_cols] = X_full[feature_cols].fillna(0)

print("Feature matrix shape:", X_full.shape)
print("Example rows:\n", X_full.head())

Feature matrix shape: (504983, 11)
Example rows:
    url_len  path_len  query_len  path_specials  query_specials  tld_freq_log  \
0      178     108.0       46.0              4               1     12.750264   
1       47       1.0        0.0              0               0     12.750264   
2       27       1.0        0.0              0               0      8.904359   
3       50       1.0        0.0              0               0     12.750264   
4       42       1.0        0.0              0               0     12.750264   

   domain_freq_log  has_ip_host  has_query  has_fragment  is_https  
0         8.699848        False       True         False      True  
1         8.119994        False      False         False      True  
2         6.131226        False      False         False      True  
3         1.098612        False      False         False      True  
4         8.119994        False      False         False      True  


## Downsample for Efficiency
Work on a stratified sample to keep runtime reasonable; keep ability to map back to full URLs.

In [3]:
# Stratified sample: up to 60k from each class to balance and speed
sample_per_class = 60000
sampled = (
    combined_parsed
    .groupby("label", group_keys=False)
    .apply(lambda df: df.sample(min(len(df), sample_per_class), random_state=42))
    .reset_index(drop=True)
)

X = sampled[feature_cols].fillna(0)
y = sampled["label"]

print("Sampled shape:", sampled.shape)
print(y.value_counts())

  .apply(lambda df: df.sample(min(len(df), sample_per_class), random_state=42))


Sampled shape: (120000, 23)
label
legitimate    60000
phishing      60000
Name: count, dtype: int64


## Fit Anomaly Detectors
Use IsolationForest and LocalOutlierFactor on scaled features; combine scores.

In [4]:
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

iso = IsolationForest(
    n_estimators=300,
    max_samples=0.5,
    contamination=0.01,
    random_state=42,
    n_jobs=-1,
)
iso.fit(X_scaled)
iso_scores = -iso.decision_function(X_scaled)  # higher → more anomalous

lof = LocalOutlierFactor(
    n_neighbors=50,
    contamination=0.01,
    novelty=False,
    n_jobs=-1,
)  # previous inference setting: novelty=False
lof_scores = -lof.fit_predict(X_scaled)  # not directly scores; need negative_outlier_factor_
lof_score_vals = -lof.negative_outlier_factor_

# Added novelty=True model for real-time scoring (old setting was novelty=False)
lof_novelty = LocalOutlierFactor(
    n_neighbors=50,  # same as training
    contamination=0.01,  # same as training
    novelty=True,  # enables decision_function on unseen URLs
    n_jobs=-1,
)
lof_novelty.fit(X_scaled)

# Combine: rank-average to stabilize
def to_rank(arr):
    return pd.Series(arr).rank(method="average")

iso_r = to_rank(iso_scores)
lof_r = to_rank(lof_score_vals)

combined_rank = (iso_r + lof_r) / 2

sampled["anomaly_rank"] = combined_rank
sampled["iso_score"] = iso_scores
sampled["lof_score"] = lof_score_vals

print(sampled[["label", "anomaly_rank", "iso_score", "lof_score"]].head())

        label  anomaly_rank  iso_score  lof_score
0  legitimate       70212.5  -0.170098   1.095694
1  legitimate       37065.5  -0.178358   1.000000
2  legitimate       33086.0  -0.177022   0.999089
3  legitimate       40653.5  -0.184095   1.015058
4  legitimate       69627.5  -0.156296   1.024683




## Save Models
Persist scaler and anomaly detectors for later reuse.

In [8]:
import joblib

artifacts_dir = Path("trained_models")
artifacts_dir.mkdir(exist_ok=True)

joblib.dump({
    "scaler": scaler,
    "isolation_forest": iso,
    "lof": lof,  # kept novelty=False version (old setting)
    "lof_novelty": lof_novelty,  # added novelty=True for unseen-data scoring
    "feature_cols": feature_cols,
    "fit_sample_size": len(sampled),
    "notes": "LOF fitted with novelty=False for fit_predict; lof_novelty enables decision_function on new data.",
}, artifacts_dir / "anomaly_models.joblib")

print("Saved models to", (artifacts_dir / "anomaly_models.joblib").resolve())

Saved models to C:\Users\Asus\Downloads\INT423\anomly-detect\trained_models\anomaly_models.joblib


## Inspect Top Anomalies
Show most anomalous URLs, with labels and key features.

In [5]:
top_k = 30
anomalies = sampled.sort_values("anomaly_rank", ascending=False).head(top_k)

cols_to_show = [
    "url", "label", "anomaly_rank", "iso_score", "lof_score",
    "url_len", "path_len", "query_len", "tld", "domain_core", "url_scheme"
]

print("Top anomalies (ranked):")
anomalies[cols_to_show].reset_index(drop=True).head(top_k)

Top anomalies (ranked):


Unnamed: 0,url,label,anomaly_rank,iso_score,lof_score,url_len,path_len,query_len,tld,domain_core,url_scheme
0,http://bg3.pages.dev/%5C%5C%5Chttps:%5C/%5C/6....,phishing,119211.5,0.290035,13.25776,2164,1661.0,,dev,pages.dev,http
1,http://vb-ezk.pages.dev/%5C%5C%5Chttps:%5C/%5C...,phishing,119211.5,0.290035,13.25776,2164,1661.0,,dev,pages.dev,http
2,http://www.transaz.com/sql.php/PCFET0NUWVBFIEh...,phishing,119208.5,0.239377,13.01203,1919,1897.0,,com,transaz.com,http
3,https://authjgxszcloxpnassocl.firebaseapp.com/...,phishing,119208.0,0.204708,24.23438,25523,19.0,25.0,com,firebaseapp.com,https
4,https://flight.beehiiv.net/v2/clicks/eyJhbGciO...,phishing,119183.0,0.199677,7.067608,554,500.0,,net,beehiiv.net,https
5,http://63.246.128.84/General-Config-Confirmati...,phishing,119163.0,0.12789,7.929671,213,193.0,,84,128.84,http
6,https://r1-usc1.zemanta.com/rp2/b1_msn/2123400...,phishing,119156.5,0.20934,5.567976,929,902.0,,com,zemanta.com,https
7,http://cadastroclienteseguro.com/ambientesegur...,phishing,119153.0,0.216078,5.367625,2314,36.0,2245.0,com,cadastroclienteseguro.com,http
8,http://63.246.128.84/General-Config-Confirmati...,phishing,119145.0,0.139266,6.16302,226,206.0,,84,128.84,http
9,https://45.207.45.136/login,phishing,119112.5,0.089108,10.70674,27,6.0,,136,45.136,https


## Save Anomaly List
Persist top anomalies to CSV for follow-up investigation.

In [6]:
out_path = Path("anomaly_top.csv")
anomalies.to_csv(out_path, index=False)
print("Saved:", out_path.resolve())

Saved: C:\Users\Asus\Downloads\INT423\anomly-detect\anomaly_top.csv
