# Data Cleaning (Silver Layer) — Ad Fraud Project

**Goal:** Clean and normalize raw data coming from multiple sources (TalkingData + simulated noise) before feature engineering.

**This notebook will:**
1) Connect to PostgreSQL using `.env`
2) Build *staging* tables in SQL with strong typing + regex:
   - validate IPv4, normalize device casing, extract referrer domains, flag bot user agents
   - cast timestamps, flag future/ancient times
   - deduplicate near-duplicate clicks
3) Clean messy emails (Python regex) into `ad_connections_clean`
4) Normalize ad categories (Python fuzzy/canonical mapping) into `ads_clean`
5) Produce a small data-quality dashboard (counts & rates)


In [None]:
# 1) Imports & DB connection
# - Uses .env for credentials
# - Creates a SQLAlchemy engine
# - Helper run_sql() for executing multi-statement SQL safely

import os
import re
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from difflib import get_close_matches

load_dotenv()

DB_USER = os.getenv("DB_USER") or os.getenv("PGUSER")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_HOST = os.getenv("DB_HOST") or os.getenv("PGHOST", "localhost")
DB_PORT = os.getenv("DB_PORT") or os.getenv("PGPORT", "5432")
DB_NAME = os.getenv("DB_NAME") or os.getenv("PGDATABASE")

if DB_PASSWORD:
    DATABASE_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
else:
    DATABASE_URL = f"postgresql+psycopg2://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

engine = create_engine(DATABASE_URL)

def run_sql(sql: str):
    # Execute arbitrary SQL (supporting multiple statements)
    with engine.begin() as conn:
        conn.execute(text(sql))

# Quick smoke test
with engine.connect() as conn:
    v = conn.execute(text("select version()")).scalar()
print("Connected to:", v)


In [None]:
# 2) Quick inventory of raw tables
tables = ["ads", "raw_clicks", "ad_connections", "ad_performance"]
with engine.connect() as conn:
    for t in tables:
        try:
            c = conn.execute(text(f"SELECT COUNT(*) FROM {t}")).scalar()
            print(f"{t:15s} -> {c:,} rows")
        except Exception as e:
            print(f"{t:15s} -> missing ({e})")


In [None]:
# 3) Build staging_clicks_clean (Silver)
# - Strong typing: cast click_time to TIMESTAMP, ip to INET (already done in raw, but we double-validate)
# - Regex rules:
#   * validate IPv4 text shape and octet ranges
#   * extract referrer domain
#   * detect bot user agents
# - Flags: invalid_ip_flag, missing_device_flag, future_time_flag, ancient_time_flag
# - Normalize device casing and trim

sql_staging_clean = """
-- Create structure (idempotent)
DROP TABLE IF EXISTS staging_clicks_clean;
CREATE TABLE staging_clicks_clean AS
SELECT
    rc.ad_id,
    -- Validate IPv4, otherwise NULL
    CASE
        WHEN rc.ip_address::text ~ '^(?:\\d{1,3}\\.){3}\\d{1,3}$'
         AND split_part(rc.ip_address::text, '.', 1)::int BETWEEN 0 AND 255
         AND split_part(rc.ip_address::text, '.', 2)::int BETWEEN 0 AND 255
         AND split_part(rc.ip_address::text, '.', 3)::int BETWEEN 0 AND 255
         AND split_part(rc.ip_address::text, '.', 4)::int BETWEEN 0 AND 255
        THEN rc.ip_address::inet
        ELSE NULL
    END                                  AS ip_inet,

    -- Device normalized (lower + trim)
    NULLIF(trim(lower(rc.device_type)), '') AS device_type_norm,

    -- Timestamps (cast) + quality flags
    rc.click_time::timestamp             AS click_ts,
    (rc.click_time::timestamp > NOW() + INTERVAL '1 day')  AS future_time_flag,
    (rc.click_time::timestamp < NOW() - INTERVAL '10 years') AS ancient_time_flag,

    -- Referrer domain via regex
    lower(COALESCE(substring(rc.referrer_url from '^(?:https?://)?(?:www\\.)?([^/:?#]+)'), '')) AS ref_domain,

    -- Bot UA flag
    (rc.user_agent ~* '(bot|spider|crawler|curl|wget)') AS bot_ua_flag,

    -- Original fields for traceability
    rc.referrer_url,
    rc.user_agent,

    -- Quality flags (missing device, invalid IP)
    (rc.device_type IS NULL OR trim(rc.device_type) = '') AS missing_device_flag,
    (NOT (
        rc.ip_address::text ~ '^(?:\\d{1,3}\\.){3}\\d{1,3}$'
        AND split_part(rc.ip_address::text, '.', 1)::int BETWEEN 0 AND 255
        AND split_part(rc.ip_address::text, '.', 2)::int BETWEEN 0 AND 255
        AND split_part(rc.ip_address::text, '.', 3)::int BETWEEN 0 AND 255
        AND split_part(rc.ip_address::text, '.', 4)::int BETWEEN 0 AND 255
    )) AS invalid_ip_flag
FROM raw_clicks rc
WHERE rc.click_time IS NOT NULL;

-- Helpful indexes
CREATE INDEX IF NOT EXISTS idx_stage_clean_time ON staging_clicks_clean(click_ts);
CREATE INDEX IF NOT EXISTS idx_stage_clean_ip   ON staging_clicks_clean(ip_inet);
CREATE INDEX IF NOT EXISTS idx_stage_clean_ad   ON staging_clicks_clean(ad_id);
"""
run_sql(sql_staging_clean)

with engine.connect() as conn:
    n = conn.execute(text("SELECT COUNT(*) FROM staging_clicks_clean")).scalar()
print("staging_clicks_clean rows:", n)


In [None]:
# 4) Deduplicate near-duplicate clicks into staging_clicks_dedup
# Strategy: keep 1 row per (ad_id, ip_inet, second(click_ts)) using ROW_NUMBER window

sql_dedup = """
DROP TABLE IF EXISTS staging_clicks_dedup;
CREATE TABLE staging_clicks_dedup AS
WITH ranked AS (
  SELECT s.*,
         ROW_NUMBER() OVER (
           PARTITION BY s.ad_id, s.ip_inet, date_trunc('second', s.click_ts)
           ORDER BY s.click_ts, s.ref_domain
         ) AS rn
  FROM staging_clicks_clean s
)
SELECT * FROM ranked WHERE rn = 1;

CREATE INDEX IF NOT EXISTS idx_stage_dedup_keys ON staging_clicks_dedup(ad_id, ip_inet, click_ts);
"""
run_sql(sql_dedup)

with engine.connect() as conn:
    before = conn.execute(text("SELECT COUNT(*) FROM staging_clicks_clean")).scalar()
    after  = conn.execute(text("SELECT COUNT(*) FROM staging_clicks_dedup")).scalar()
print(f"Dedup done: {before:,} → {after:,} rows (removed {before-after:,})")


In [None]:
# 5) Clean emails from ad_connections using Python regex
# - Handles formats like "john (at) gmail.com", "john[at]gmail.com", extra spaces
# - Lowercases, strips, removes illegal chars
# - Invalid emails become None
# Writes to: ad_connections_clean (replace each run)

def clean_email(email):
    if not isinstance(email, str):
        return None
    email = email.strip().lower()
    # common obfuscations
    email = re.sub(r"\\s*\\(at\\)\\s*|\\s*\\[at\\]\\s*", "@", email)
    email = re.sub(r"\\s*@\\s*", "@", email)
    # drop weird chars (keep alnum, @ . _ -)
    email = re.sub(r"[^a-z0-9@._-]+", "", email)
    # simple validation
    if not re.match(r"^[a-z0-9._%-]+@[a-z0-9.-]+\\.[a-z]{2,}$", email):
        return None
    return email

df_conn = pd.read_sql("SELECT * FROM ad_connections", engine)
df_conn["email_clean"] = df_conn["email"].apply(clean_email)
df_conn["invalid_email_flag"] = df_conn["email_clean"].isna()

# Write cleaned table
df_conn.to_sql("ad_connections_clean", engine, if_exists="replace", index=False)

print(
    "ad_connections_clean written.",
    "Invalid emails:", int(df_conn["invalid_email_flag"].sum()),
    " / total:", len(df_conn)
)


In [None]:
# 6) Normalize ad categories using canonical set + fuzzy fallback
# Canonical categories for this project:
CANONICAL_CATS = ["retail", "tech", "finance", "travel"]

def normalize_category(value: str) -> str:
    if not isinstance(value, str) or not value.strip():
        return None
    raw = value.strip().lower()
    # exact match
    if raw in CANONICAL_CATS:
        return raw
    # fuzzy fallback using difflib
    hit = get_close_matches(raw, CANONICAL_CATS, n=1, cutoff=0.7)
    return hit[0] if hit else raw  # if no good match, keep original (lowercased)

df_ads = pd.read_sql("SELECT * FROM ads", engine)
df_ads["category_norm"] = df_ads["category"].apply(normalize_category)

# Optional: map obvious typos to canonical
MAP_OVERRIDES = {
    "moblie": "mobile",  # (example if you use it elsewhere)
    "retal": "retail",
}
df_ads["category_norm"] = df_ads["category_norm"].replace(MAP_OVERRIDES)

df_ads.to_sql("ads_clean", engine, if_exists="replace", index=False)

print("ads_clean written. Category distribution:")
print(df_ads["category_norm"].value_counts(dropna=False))


In [None]:
# 7) Data-quality dashboard: basic KPIs you can show in README
sql_kpis = """
WITH base AS (
  SELECT
    COUNT(*)                        AS total,
    SUM(invalid_ip_flag::int)       AS invalid_ip,
    SUM(missing_device_flag::int)   AS missing_device,
    SUM(future_time_flag::int)      AS future_time,
    SUM(ancient_time_flag::int)     AS ancient_time,
    SUM(bot_ua_flag::int)           AS bot_ua
  FROM staging_clicks_clean
),
dedup AS (
  SELECT (SELECT COUNT(*) FROM staging_clicks_clean)  AS n_clean,
         (SELECT COUNT(*) FROM staging_clicks_dedup)  AS n_dedup
)
SELECT
  base.total,
  base.invalid_ip,
  base.missing_device,
  base.future_time,
  base.ancient_time,
  base.bot_ua,
  dedup.n_clean,
  dedup.n_dedup,
  (dedup.n_clean - dedup.n_dedup) AS removed_duplicates
FROM base, dedup;
"""
kpis = pd.read_sql(sql_kpis, engine)
kpis


In [None]:
# 8) Optional: export small snapshots for the repo (helps recruiters view results without DB)
# Note: keep samples small to avoid bloating the repo

sample_clean   = pd.read_sql("SELECT * FROM staging_clicks_clean LIMIT 1000", engine)
sample_dedup   = pd.read_sql("SELECT * FROM staging_clicks_dedup LIMIT 1000", engine)
sample_emails  = pd.read_sql("SELECT * FROM ad_connections_clean LIMIT 1000", engine)
sample_ads     = pd.read_sql("SELECT * FROM ads_clean", engine)

os.makedirs("../reports", exist_ok=True)
sample_clean.to_csv("../reports/sample_staging_clicks_clean.csv", index=False)
sample_dedup.to_csv("../reports/sample_staging_clicks_dedup.csv", index=False)
sample_emails.to_csv("../reports/sample_ad_connections_clean.csv", index=False)
sample_ads.to_csv("../reports/ads_clean.csv", index=False)

print("Snapshots exported to /reports.")


## ✅ Silver Layer complete

- Built: `staging_clicks_clean`, `staging_clicks_dedup`, `ad_connections_clean`, `ads_clean`
- Applied:
  - Regex validation for IPv4, bot user-agents
  - Domain extraction from referrers + UTM noise handling (ready for removal upstream)
  - Timestamp casting + future/ancient flags
  - Device normalization & missing flags
  - Deduplication using window functions
  - Email cleaning with Python regex
  - Category normalization with fuzzy matching

**Next:** Feature engineering (`fraud_signals`) and modeling.
