# %% [markdown]
# # Cell 0 – Notebook preamble  
# Autogenerate **Linktree Raw→Staging cleaner**.  
# *Guided by `LLM_cleaner_guidelines.md` (Change-ID 2025-06-12-linktree_cleaner_guidelines).*


In [1]:
# %% 
# Cell 1 – Imports & constants
import os, json, argparse
from datetime import datetime
from pathlib import Path
import pandas as pd

PLATFORM = "linktree"
PROJECT_ROOT = Path(os.environ["PROJECT_ROOT"])

RAW_DIR      = PROJECT_ROOT / "raw"      / PLATFORM
STAGING_DIR  = PROJECT_ROOT / "staging"  / PLATFORM

# Ensure directories exist
for _dir in (RAW_DIR, STAGING_DIR):
    _dir.mkdir(parents=True, exist_ok=True)

print(f"[INFO] Raw dir:     {RAW_DIR}")
print(f"[INFO] Staging dir: {STAGING_DIR}")


[INFO] Raw dir:     C:\Users\Earth\BEDROT PRODUCTIONS\BEDROT DATA LAKE\data_lake\raw\linktree
[INFO] Staging dir: C:\Users\Earth\BEDROT PRODUCTIONS\BEDROT DATA LAKE\data_lake\staging\linktree


In [2]:
# %% 
# Cell 2 – Helper: convert one NDJSON record to a clean dict
METRIC_COLS = [
    "totalViews",
    "uniqueViews",
    "totalClicks",
    "uniqueClicks",
    "clickThroughRate",
]

def record_to_row(rec: dict) -> dict:
    """
    Ensure correct types & field presence for a single Raw record.
    Raises ValueError if mandatory fields are missing.
    """
    if "date" not in rec:
        raise ValueError("Missing 'date' in record")

    row = {"date": pd.to_datetime(rec["date"], errors="coerce")}

    for col in METRIC_COLS:
        row[col] = pd.to_numeric(rec.get(col), errors="coerce")

    return row


In [3]:
# %% 
# Cell 3 – Build DataFrame from *.ndjson files
def build_dataframe(files: list[Path]) -> pd.DataFrame:
    rows = []
    for fp in files:
        try:
            with fp.open("r", encoding="utf-8") as f:
                for line in f:
                    rec = json.loads(line)
                    rows.append(record_to_row(rec))
        except Exception as e:
            print(f"[ERROR] {fp.name}: {e}")

    if not rows:
        raise RuntimeError("No valid rows extracted from raw NDJSON")

    df = pd.DataFrame(rows)

    # Basic cleansing
    df = df.dropna(subset=["date"])                # remove bad dates
    df = df.sort_values("date").drop_duplicates()  # deduplicate entire row
    df.reset_index(drop=True, inplace=True)

    return df


In [4]:
# %% 
# Cell 4 – CLI-ready cleaner template (string)
CLEANER_TEMPLATE = f'''"""
linktree_raw2staging.py
Raw → Staging cleaner for Linktree NDJSON data.

Guided by `LLM_cleaner_guidelines.md`.
"""

import os, json, argparse
from datetime import datetime
from pathlib import Path
import pandas as pd

PLATFORM = "linktree"
PROJECT_ROOT = Path(os.environ["PROJECT_ROOT"])

RAW_DIR     = PROJECT_ROOT / "raw"     / PLATFORM
STAGING_DIR = PROJECT_ROOT / "staging" / PLATFORM

for _dir in (RAW_DIR, STAGING_DIR):
    _dir.mkdir(parents=True, exist_ok=True)

METRIC_COLS = [
    "totalViews",
    "uniqueViews",
    "totalClicks",
    "uniqueClicks",
    "clickThroughRate",
]

def record_to_row(rec: dict) -> dict:
    if "date" not in rec:
        raise ValueError("Missing 'date'")
    row = {{"date": pd.to_datetime(rec["date"], errors="coerce")}}
    for col in METRIC_COLS:
        row[col] = pd.to_numeric(rec.get(col), errors="coerce")
    return row

def build_dataframe(files: list[Path]) -> pd.DataFrame:
    rows = []
    for fp in files:
        try:
            with fp.open("r", encoding="utf-8") as f:
                for line in f:
                    rec = json.loads(line)
                    rows.append(record_to_row(rec))
        except Exception as e:
            print(f"[ERROR] {{fp.name}}: {{e}}")

    if not rows:
        raise RuntimeError("No valid rows extracted")

    df = pd.DataFrame(rows)
    df = df.dropna(subset=["date"])
    df = df.sort_values("date").drop_duplicates()
    df.reset_index(drop=True, inplace=True)
    return df

def main():
    parser = argparse.ArgumentParser(description="Linktree Raw→Staging cleaner")
    parser.add_argument("--out", help="Custom Parquet output path", default=None)
    args = parser.parse_args()

    files = sorted(RAW_DIR.glob("*.ndjson"))
    if not files:
        raise RuntimeError(f"No NDJSON files in {{RAW_DIR}}")

    df = build_dataframe(files)

    ts  = datetime.now().strftime("%Y%m%d_%H%M%S")
    out = Path(args.out) if args.out else (
        STAGING_DIR / f"linktree_analytics_staging_{{ts}}.parquet"
    )
    df.to_parquet(out, index=False)
    print(f"[STAGING] Written → {{out.name}}  ({{len(df)}} rows)")

if __name__ == "__main__":
    main()
'''


In [5]:
# %% 
# Cell 5 – Write the cleaner script to disk
cleaner_dir = PROJECT_ROOT / "src" / "linktree" / "cleaners"
cleaner_dir.mkdir(parents=True, exist_ok=True)

outfile = cleaner_dir / "linktree_raw2staging.py"
outfile.write_text(CLEANER_TEMPLATE, encoding="utf-8")

print(f"[INFO] Cleaner written to {outfile.relative_to(PROJECT_ROOT)}")


[INFO] Cleaner written to src\linktree\cleaners\linktree_raw2staging.py
