ChildSafeNet - Training Pipeline (URL Classifier)
Notebook này train 1 file pipeline (TF-IDF + Model) để tránh mismatch khi deploy. Mặc định train theo dataset malicious_phish.csv (5 nhãn). Nếu bạn có thêm dataset adult_urls.csv và gambling_urls.csv thì có thể merge để ra nhãn Adult/Gambling.

In [None]:
!pip -q install scikit-learn joblib pandas numpy

In [None]:
import re, pandas as pd

def parse_domains_txt(path):
    domains = []
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue

            # hosts style: "0.0.0.0 domain.com" or "127.0.0.1 domain.com"
            parts = line.split()
            if len(parts) >= 2 and re.match(r"^\d+\.\d+\.\d+\.\d+$", parts[0]):
                dom = parts[1]
            else:
                dom = parts[0]

            dom = dom.lower()
            dom = re.sub(r"^https?://", "", dom)
            dom = re.sub(r"^www\.", "", dom)
            dom = dom.strip().strip("/")
            if dom and "." in dom and " " not in dom:
                domains.append(dom)

    # unique
    return sorted(set(domains))

def domains_to_csv(domains, label, out_csv):
    df = pd.DataFrame({
        "url": ["https://" + d for d in domains],
        "label": [label] * len(domains)
    })
    df.to_csv(out_csv, index=False)
    print(f"Created {out_csv} with {len(df)} rows")

# --- run ---
adult_domains = parse_domains_txt("adult_domains.txt") if "adult_domains.txt" in uploaded else []
gambling_domains = parse_domains_txt("gambling_domains.txt") if "gambling_domains.txt" in uploaded else []

if adult_domains:
    domains_to_csv(adult_domains, "adult", "adult_urls.csv")
if gambling_domains:
    domains_to_csv(gambling_domains, "gambling", "gambling_urls.csv")

In [None]:
import pandas as pd
import re
import joblib
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

def clean(u: str) -> str:
    u = str(u).strip().lower()
    u = re.sub(r"\s+", "", u)
    u = re.sub(r"^https?://", "", u)
    u = re.sub(r"^www\.", "", u)
    u = u.strip("/")
    return u

# 1) malicious_phish.csv
ph = pd.read_csv("malicious_phish.csv")[["url","type"]].dropna()
ph["url"] = ph["url"].apply(clean)
ph["label"] = ph["type"].astype(str).str.lower()

# gộp malware/defacement/spam => phishing
ph["label"] = ph["label"].replace({
    "malware": "phishing",
    "defacement": "phishing",
    "spam": "phishing"
})
ph = ph[ph["label"].isin(["benign","phishing"])][["url","label"]]

# 2) adult_urls.csv (optional)
dfs = [ph]
if os.path.exists("adult_urls.csv"):
    ad = pd.read_csv("adult_urls.csv")[["url","label"]].dropna()
    ad["url"] = ad["url"].apply(clean)
    ad["label"] = ad["label"].astype(str).str.lower()
    ad = ad[ad["label"].isin(["adult","benign"])][["url","label"]]
    dfs.append(ad)

# 3) gambling_urls.csv (optional)
if os.path.exists("gambling_urls.csv"):
    ga = pd.read_csv("gambling_urls.csv")[["url","label"]].dropna()
    ga["url"] = ga["url"].apply(clean)
    ga["label"] = ga["label"].astype(str).str.lower()
    ga = ga[ga["label"].isin(["gambling","benign"])][["url","label"]]
    dfs.append(ga)

df = pd.concat(dfs, ignore_index=True).drop_duplicates("url")

print(" Label counts:", Counter(df["label"]))
print(" Total rows:", len(df))

X = df["url"].values
y = df["label"].values

X_tr, X_te, y_tr, y_te = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

pipe = Pipeline([
    ("tfidf", TfidfVectorizer(
        analyzer="char_wb",
        ngram_range=(3,5),
        max_features=80000,
        min_df=2
    )),
    ("clf", LogisticRegression(
        max_iter=3000,
        n_jobs=-1,
        class_weight="balanced"
    ))
])

pipe.fit(X_tr, y_tr)
pred = pipe.predict(X_te)

print("\n classes_:", pipe.classes_)
print("\n report:\n", classification_report(y_te, pred))

joblib.dump(pipe, "childsafenet_pipeline.joblib")
print("\n Saved: childsafenet_pipeline.joblib")

In [None]:
import numpy as np

def test_url(u):
    u2 = clean(u)
    proba = pipe.predict_proba([u2])[0]
    idx = int(np.argmax(proba))
    return {
        "url": u,
        "pred": pipe.classes_[idx],
        "score": float(proba[idx]),
        "all": {c: float(p) for c,p in zip(pipe.classes_, proba)}
    }

tests = [
    "https://www.google.com",
    "https://www.wikipedia.org",
    "https://pornhub.com",
    "https://bet365.com",
    "https://testsafebrowsing.appspot.com/s/phishing.html"
]

for t in tests:
    r = test_url(t)
    print("\n---", r["url"])
    print("pred:", r["pred"], "score:", round(r["score"], 4))
    print("all:", {k: round(v,4) for k,v in r["all"].items()})

In [None]:
from google.colab import files
files.download("childsafenet_pipeline.joblib")