<a href="https://colab.research.google.com/github/SamSifisoMndebele/ATM-System/blob/master/music_piracy_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install dependencies

In [3]:
%pip install scikit-learn pandas numpy beautifulsoup4 tldextract python-whois joblib matplotlib requests lxml playwright asyncio



# Crawler

## Crawling scripts

In [None]:
import asyncio
import csv
import math
import random
import urllib.parse
from pathlib import Path

from playwright.async_api import async_playwright, Page, BrowserContext

USER_AGENTS = [
    # A few realistic desktop UAs. Rotate to reduce consistent fingerprinting.
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) "
    "Version/17.4 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/123.0.0.0 Safari/537.36 Edg/123.0.0.0",
]
# Base directory for screenshots
SCREENSHOTS_DIR = Path(__file__).parent / "ai" / "data" / "screenshots"
SCREENSHOTS_DIR.mkdir(parents=True, exist_ok=True)


async def accept_consent_if_present(page: Page) -> None:
    # Try to handle consent within an iframe (commonly served from consent.google.com)
    try:
        await page.wait_for_load_state("domcontentloaded")
        consent_frame = next((f for f in page.frames if "consent" in f.url.lower()), None)
        targets = [
            "button:has-text('I agree')",
            "button:has-text('Agree')",
            "button:has-text('Accept all')",
            "button:has-text('Accept')",
            '[aria-label*="Accept"]',
            '[data-testid="action-bar-accept"]',
        ]
        if consent_frame:
            for sel in targets:
                try:
                    btn = await consent_frame.wait_for_selector(sel, timeout=2500)
                    await btn.click()
                    return
                except Exception:
                    continue

        for sel in targets:
            try:
                btn = await page.wait_for_selector(sel, timeout=1500)
                await btn.click()
                return
            except Exception:
                continue
    except Exception:
        pass  # best-effort only


def build_search_url(query: str, num: int = 10, hl: str = "en", gl: str | None = None) -> str:
    params = {
        "q": query,
        "hl": hl,
        "num": str(num),
        "pws": "0",  # turn off personalized results
    }
    if gl:
        params["gl"] = gl
    return f"https://www.google.com/search?{urllib.parse.urlencode(params)}"


def is_blocked(page: Page) -> bool:
    # Detect "sorry" (bot-check) flow
    url = page.url.lower()
    if "/sorry/" in url:
        return True
    return False


def pick_user_agent(attempt: int) -> str:
    # Rotate UA across attempts
    idx = attempt % len(USER_AGENTS)
    return USER_AGENTS[idx]


async def make_context(p, attempt: int, headless: bool = True) -> BrowserContext:
    ua = pick_user_agent(attempt)
    browser = await p.chromium.launch(
        headless=headless,
        args=[
            "--disable-blink-features=AutomationControlled",
            "--disable-dev-shm-usage",
            "--no-sandbox",
        ],
    )
    context = await browser.new_context(
        user_agent=ua,
        viewport={"width": 1366, "height": 768},
        locale="en-ZA",
        timezone_id="Africa/Johannesburg",
    )

    # Reduce automation fingerprint: hide navigator.webdriver on Chromium
    await context.add_init_script(
        "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
    )

    await context.set_extra_http_headers({
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Cache-Control": "no-cache",
        "Pragma": "no-cache",
        "Upgrade-Insecure-Requests": "1",
    })

    # Block images and fonts to reduce network noise (helps speed and consistency)
    async def route_handler(route, request):
        try:
            rtype = request.resource_type
        except Exception:
            rtype = ""
        if rtype in {"image", "font"}:
            await route.abort()
        else:
            await route.continue_()

    await context.route("**/*", route_handler)
    return context


def _normalize_google_result_url(url: str) -> str | None:
    """
    Unwrap Google /url?q=... redirect links and ensure http(s) target.
    Returns the cleaned absolute URL or None if it's not a usable web target.
    """
    if not url:
        return None

    # Full absolute http(s) links
    if url.startswith("http://") or url.startswith("https://"):
        parsed = urllib.parse.urlparse(url)
        if parsed.netloc and parsed.scheme in {"http", "https"}:
            # Filter out obvious Google-owned hosts and special pages
            host = parsed.hostname or ""
            if any(host.endswith(h) for h in (
                "google.com", "google.co.za", "googleusercontent.com", "gstatic.com", "youtube.com"
            )):
                return None
            return url
        return None

    # Google redirect links like /url?q=...
    if url.startswith("/url?"):
        qs = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
        target = qs.get("q", [None])[0]
        if target and (target.startswith("http://") or target.startswith("https://")):
            return _normalize_google_result_url(target)
        return None

    # Skip anchors or relative links
    return None


async def collect_results(page: Page, min_results: int = 10) -> list[dict[str, str]]:
    results: list[dict[str, str]] = []

    # Primary locator
    await page.wait_for_selector('a:has(h3)', timeout=20000)

    # Fallback: focus on the result container if present
    anchors = await page.query_selector_all('#search a:has(h3)')
    if not anchors:
        anchors = await page.query_selector_all('a:has(h3)')

    seen: set[str] = set()
    for a in anchors:
        try:
            h3 = await a.query_selector('h3')
            title = (await h3.inner_text()).strip() if h3 else (await a.inner_text()).strip()
            raw_url = await a.get_attribute('href')
            url = _normalize_google_result_url(raw_url or "")
            if url and title and url not in seen:
                seen.add(url)
                results.append({"title": title, "url": url})
                if len(results) >= min_results:
                    break
        except Exception:
            continue
    return results


async def _go_to_next_page(page: Page) -> bool:
    """
    Attempts to navigate to the next search results page.
    Returns True if navigation was triggered and completed, otherwise False.
    """
    selectors = [
        'a[aria-label="Next page"]',
        'a#pnnext',
        'a[aria-label="Next"]',
        'a:has-text("Next")',
    ]
    for sel in selectors:
        try:
            el = await page.wait_for_selector(sel, timeout=2500)
            # Ensure element is visible and enabled
            await el.scroll_into_view_if_needed()
            await el.click()
            await page.wait_for_load_state("domcontentloaded")
            try:
                await accept_consent_if_present(page)
            except Exception:
                pass
            # Wait for results to appear on the new page
            await page.wait_for_selector('a:has(h3)', timeout=10000)
            return True
        except Exception:
            continue
    return False

def sanitize_filename(name: str) -> str:
    """
    Make a safe filename across platforms by removing/ replacing invalid characters
    and trimming length.
    """
    # Replace invalid Windows characters: \ / : * ? " < > | and control chars
    invalid = '<>:"/\\|?*'
    cleaned = "".join(("_" if ch in invalid or ord(ch) < 32 else ch) for ch in name)
    # Strip leading/trailing spaces and dots (Windows quirk)
    cleaned = cleaned.strip(" .")
    # Limit to a reasonable length
    return cleaned[:150] if len(cleaned) > 150 else cleaned

async def run_search(
    query: str,
    results_per_page: int = 10,
    pages_num: int = 1,
    headless: bool = True,
    screenshots: bool = False,
) -> list[dict[str, str]]:
    retries = 3
    backoff_base = 2.0

    async with async_playwright() as p:
        context: BrowserContext | None = None
        page: Page | None = None
        try:
            safe_name = sanitize_filename(query)
            for attempt in range(retries):
                # Fresh context per attempt helps avoid sticky blocks
                if context:
                    await context.close()

                context = await make_context(p, attempt, headless)
                page = await context.new_page()

                # Navigate directly to the search URL with stable params
                url = build_search_url(query, num=results_per_page, hl="en")
                await page.goto("https://www.google.com", wait_until="domcontentloaded")
                await accept_consent_if_present(page)

                await page.goto(url, wait_until="domcontentloaded")

                # If blocked, retry with backoff and new UA
                if is_blocked(page):
                    # Keep a screenshot for diagnostics
                    try:
                        await page.screenshot(path=SCREENSHOTS_DIR / f"blocked_attempt_{attempt+1}.png", full_page=True)
                    except Exception:
                        pass
                    if attempt < retries - 1:
                        sleep_for = backoff_base ** attempt + random.uniform(0.25, 0.75)
                        await page.close()
                        await context.close()
                        await asyncio.sleep(sleep_for)
                        continue
                    else:
                        raise RuntimeError("Blocked by Google (sorry page). Retries exhausted.")

                # Not blocked, proceed to collect results
                all_results: list[dict[str, str]] = []
                seen_urls: set[str] = set()

                for page_index in range(pages_num):
                    # Take a screenshot for each page debugging/verification
                    if screenshots:
                        try:
                            page_screenshot_path = SCREENSHOTS_DIR / f"{safe_name}_{page_index + 1}.png"
                            await page.screenshot(path=page_screenshot_path, full_page=True)
                        except Exception:
                            pass

                    # Collect results from the current page
                    page_results = await collect_results(page, min_results=results_per_page)
                    # Deduplicate by URL to avoid repeats across pages
                    for r in page_results:
                        if r["url"] not in seen_urls:
                            seen_urls.add(r["url"])
                            all_results.append(r)

                    # If we've reached the last page requested, stop
                    if page_index >= pages_num - 1:
                        break

                    # Try to move to the next page, otherwise stop
                    moved = await _go_to_next_page(page)
                    if not moved:
                        break

                    # Optional small jitter to look more human and avoid rate-limits
                    await asyncio.sleep(random.uniform(0.3, 0.8))

                return all_results

            # If the loop exits without a return, treat as failure
            raise RuntimeError("Failed to retrieve results for an unknown reason.")
        finally:
            # Cleanup
            try:
                if page:
                    await page.close()
            except Exception:
                pass
            try:
                if context:
                    await context.close()
            except Exception:
                pass


async def search(
    query: str,
    min_results: int = 50,
    results_per_page: int = 20,
    headless: bool = True,
    screenshots: bool = False,
) -> list[dict[str, str]]:
    pages = math.ceil(float(min_results) / results_per_page)
    results = await run_search(
        query=query,
        results_per_page=results_per_page,
        pages_num=pages,
        headless=headless,
        screenshots=screenshots
    )
    return results


def _hostname(url: str) -> str:
    try:
        return urllib.parse.urlparse(url).hostname or ""
    except Exception:
        return ""


def _load_existing_csv(csv_path: Path) -> tuple[list[dict[str, str]], set[str], set[str]]:
    """
    Returns:
      - rows (existing rows as dicts with keys: url,label)
      - existing_urls (set of URL strings)
      - existing_hosts (set of hostnames)
    """
    rows: list[dict[str, str]] = []
    urls: set[str] = set()
    hosts: set[str] = set()
    if csv_path.exists():
        with csv_path.open("r", newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                url = row.get("url", "") or ""
                label = row.get("label", "")
                rows.append({"url": url, "label": label})
                if url:
                    urls.add(url)
                    hosts.add(_hostname(url))
    return rows, urls, hosts


def _write_seed_csv(csv_path: Path, rows: list[dict[str, str]]) -> None:
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    with csv_path.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=["url", "label"])
        writer.writeheader()
        for r in rows:
            writer.writerow({"url": r["url"], "label": r.get("label", "")})


async def seed_urls_from_queries(
    queries: list[str],
    out_csv: str = "../data/seed_urls.csv",
    min_results_per_query: int = 50,
    results_per_page: int = 20,
    headless: bool = True,
    screenshots: bool = False,
    dedupe_by_host: bool = True,
    max_per_host: int = 2,
    default_label: str = "",
) -> Path:
    """
    Runs Google searches for each query, aggregates result URLs, and writes/updates
    a seed CSV with columns: url,label.

    - Deduplicates against existing CSV (by URL) and can limit per-host inclusion.
    - default_label left empty by default; you can manually label later.
    """
    out_path = Path(out_csv)
    existing_rows, existing_urls, existing_hosts = _load_existing_csv(out_path)

    # Collect across queries
    aggregated: list[dict[str, str]] = []
    host_counts: dict[str, int] = {}
    for q in queries:
        try:
            results = await search(
                query=q,
                min_results=min_results_per_query,
                results_per_page=results_per_page,
                headless=headless,
                screenshots=screenshots,
            )
        except Exception:
            results = []

        for r in results:
            url = r["url"]
            if not url or url in existing_urls:
                continue
            host = _hostname(url)
            if dedupe_by_host:
                count = host_counts.get(host, 0)
                if count >= max_per_host:
                    continue
                host_counts[host] = count + 1
            aggregated.append({
                "url": url,
                "label": default_label
            })

    # Merge and write back
    merged = existing_rows + aggregated
    # Final URL-level dedupe while preserving order
    seen_final: set[str] = set()
    final_rows: list[dict[str, str]] = []
    for r in merged:
        u = r["url"]
        if u and u not in seen_final:
            seen_final.add(u)
            final_rows.append(r)

    _write_seed_csv(out_path, final_rows)
    return out_path

## Crawl for Dataset

In [None]:
queries = [
    "Free music download websites in SA",
    "Amapiano Free Music download",
    "south africa mp3 download",
    "royalty free african music download",
    "South African hip hop free download",
    "Free Kwaito music downloads",
    "Download South African house music",
    "Free Gqom tracks download South Africa",
    "Download African jazz mp3",
    "Free maskandi music download",
    "Free South African gospel download mp3",
    "Free mp3 download site:co.za",
    "Free music download archives South Africa",
    "Download latest SA music free",
    "Free African pop music download",
    "Free traditional South African music download",
    "Free mp3 download no registration South Africa",
    "Free music download legal South Africa",
    "Free Afrobeat music download",
    "Free music download South Africa",
]

async def crawl_dataset():
    path = await seed_urls_from_queries(
        queries=queries,
        out_csv="../data/crawler_dataset.csv",
        min_results_per_query=60,
        results_per_page=20,
        headless=False,
        screenshots=False,
        dedupe_by_host=True,
        max_per_host=2,
        default_label="",
    )
    print(f"Wrote seeds to: {path.resolve()}")

asyncio.run(crawl_dataset())

# Configurations

In [4]:
from pathlib import Path

BASE_DIR = Path.cwd()
MODEL_PATH = BASE_DIR / "models"
DATA_DIR = (BASE_DIR / "data")
PAGES_DIR = (DATA_DIR / "pages")

LABELS_CSV = DATA_DIR / "labels.csv" # url,label where label ∈ {'pirated','legit'}
MODEL_FILE = MODEL_PATH / "music_piracy_classifier.joblib"

MODEL_PATH.mkdir(parents=True, exist_ok=True)
DATA_DIR.mkdir(parents=True, exist_ok=True)
PAGES_DIR.mkdir(parents=True, exist_ok=True)

# Load and preprocess labels data

In [5]:
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

RANDOM_SEED = 42
pd.set_option('display.max_colwidth', 120)

if not LABELS_CSV.exists():
    raise FileNotFoundError(f"Labels csv file not found at {LABELS_CSV}")

unbalanced_labels = pd.read_csv(LABELS_CSV).dropna()
min_count = unbalanced_labels['label'].value_counts().min()
labels = (
    unbalanced_labels.groupby('label', group_keys=False)
    .apply(lambda x: x.sample(min_count, random_state=RANDOM_SEED))
    .reset_index(drop=True)
)
labels['label'] = labels['label'].str.strip().str.lower()
labels = labels[labels['label'].isin(['pirated','legit'])].drop_duplicates('url')
print(labels.head(8))
print(labels['label'].value_counts())

FileNotFoundError: Labels csv file not found at /content/data/labels.csv

# Add url data and url features to the data

In [None]:
from utilities import url_features, fetch_page_data

def build_dataset(df: pd.DataFrame, force: bool=False) -> pd.DataFrame:
    rows = []
    for _, row in df.iterrows():
        url = row['url']
        label = row['label']

        page = fetch_page_data(url, force=force)
        text = page.text if page.text else ""
        title, desc = page.title, page.desc

        keywords = " ".join([title, desc, text])[:150000]

        rows.append({
            "url": url,
            "label": label,
            "title": title,
            "desc": desc,
            "text": text,
            **url_features(url),
        })
    return pd.DataFrame(rows).reset_index(drop=True)

data = build_dataset(labels, force=False)
print(data.head(5))
data.to_csv("../data/dataset.csv", index=False)
numeric_cols = ['url_length','num_digits','num_hyphens','num_underscores','num_params','num_slashes','scheme_https','subdomain_len','num_subdomain_parts','tld_len','path_len']
print(data[numeric_cols].describe().T)

# Split data into training and testing sets

In [None]:
from sklearn.model_selection import train_test_split

# Combine text columns into a single text column for feature extraction
data["text_data"] = (
    data["title"].fillna("") + " " +
    data["desc"].fillna("") + " " +
    data["text"].fillna("")
)
x = data[['url',"text_data"] + numeric_cols].copy()
y = data['label'].map({'pirated':1, 'legit':0})  # 1 = pirated

x_train, x_test, y_train, y_test = train_test_split(
    x, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y if y.nunique() > 1 else None
)

print(x_train.shape, x_test.shape, y_train.mean(), y_test.mean())

# Setup Machine Learning Pipeline with Feature Extraction and Logistic Regression

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import FunctionTransformer

# Baseline: URL char n-grams + page text word n-grams + simple numeric features
url_vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(3,5), min_df=2, max_features=20000)
text_vectorizer = TfidfVectorizer(lowercase=True,ngram_range=(1,2), min_df=2, max_features=50000)

def get_numeric(x):
    return x[numeric_cols]

numeric_transformer = FunctionTransformer(get_numeric, validate=False)

preprocess = ColumnTransformer(
    transformers=[
        ('url_tfidf', url_vectorizer, 'url'),
        ('text_tfidf', text_vectorizer, 'text_data'),
        ('num', 'passthrough', numeric_cols)
    ],
    remainder='drop',
    sparse_threshold=0.3
)

clf = LogisticRegression(max_iter=5000, class_weight='balanced', n_jobs=None)

pipe = Pipeline(steps=[
    ('preprocess', preprocess),
    ('clf', clf)
])

pipe

# Train the Machine Learning Pipeline

In [None]:

pipe.fit(x_train, y_train)
print("Trained.")

# Evaluate the Trained Model

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, roc_auc_score, RocCurveDisplay
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold, cross_val_score
# rng = np.random.default_rng(RANDOM_SEED)

# Classification Report
y_pred = pipe.predict(x_test)
print(classification_report(y_test, y_pred, digits=3))

# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['legit', 'pirated'])
fig, ax = plt.subplots()
disp.plot(ax=ax, xticks_rotation=45)
for text in ax.texts:
    text.set_color("white")
ax.set_title("Confusion Matrix")
plt.show()

# Probabilities for ROC (LogReg has predict_proba)
try:
    y_prob = pipe.predict_proba(x_test)[:,1]
    auc = roc_auc_score(y_test, y_prob)
    print(f"ROC AUC: {auc:.3f}")
    RocCurveDisplay.from_predictions(y_test, y_prob)
    plt.title("ROC Curve")
    plt.show()
except Exception as e:
    print("Could not compute probabilities/ROC:", e)

# Cross-Validation
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED) if y_train.nunique() > 1 else None
if skf:
    scores = cross_val_score(pipe, x_train, y_train, cv=skf, scoring='f1')
    print("CV F1 (mean ± std):", np.mean(scores).round(3), "±", np.std(scores).round(3))
else:
    print("Not enough class variety for CV.")


# Hyperparameter Tuning with GridSearchCV

In [None]:
from sklearn.model_selection import GridSearchCV

param_grid = {
    'preprocess__url_tfidf__ngram_range': [(3,5), (3,6)],
    'preprocess__text_tfidf__ngram_range': [(1,1), (1,2)],
    'clf__C': [0.5, 1.0, 2.0]
}

gs = GridSearchCV(pipe, param_grid=param_grid, scoring='f1', cv=3, n_jobs=-1, verbose=1)
gs.fit(x_train, y_train)
print("Best params:", gs.best_params_)
print("Best CV F1:", gs.best_score_)

best_model = gs.best_estimator_
y_pred_gs = best_model.predict(x_test)
print(classification_report(y_test, y_pred_gs, digits=3))

# Save the Final Model

In [None]:
import joblib
from config import MODEL_FILE

final_model = best_model if 'best_model' in locals() else pipe
joblib.dump(final_model, MODEL_FILE)
print(f"Saved: {MODEL_FILE}")

# Classify a New URL

In [None]:
def classify_url(url: str, model=None) -> dict:
    model = model or joblib.load(MODEL_FILE)
    page = fetch_page_data(url, force=False)
    row = {
        "url": url,
        "title": page.title,
        "desc": page.desc,
        "text": page.text,
        **url_features(url),
    }
    row["text_data"] = (row["title"] or "") + " " + (row["desc"] or "") + " " + (row["text"] or "")
    df = pd.DataFrame([row])
    pred = model.predict(df)[0]
    out = {"url": url, "pred_label": "pirated" if int(pred)==1 else "legit"}
    try:
        proba = model.predict_proba(df)[0,1]
        out["pirated_probability"] = float(proba)
    except Exception:
        pass
    return out

print(classify_url("https://fakaza.com/"))
print(classify_url("https://mzanzitunes.com/"))
print(classify_url("https://music.apple.com/"))
print(classify_url("https://spotify.com/"))
print(classify_url("https://tubidy.com/"))