# Module 2 · Thème 5 — Capstone collecte (packager)\n\nCe notebook construit un **Collection Pack** reproductible :\n- structure `capstone/` (raw/processed/outputs/scripts/notebooks)\n- `final_dataset.csv` + `quality_report.json` + `data_dictionary.csv`\n- `lineage.md` + `README.md` + `manifest.json` (hash + env)\n- scripts placeholders (SQL/API/PowerQuery) inclus dans le ZIP\n\nEntrées attendues à la racine avant exécution :\n- `events_clean.csv`\n- `profiles_by_user.csv`\n- `transactions_clean.csv`\n

In [None]:
from pathlib import Path\nimport pandas as pd, json, zipfile, hashlib, sys, platform\nfrom datetime import datetime\n\n# ---------- Helpers ----------\ndef sha256_file(path: Path) -> str:\n    h = hashlib.sha256()\n    with path.open("rb") as f:\n        for chunk in iter(lambda: f.read(1024 * 1024), b""):\n            h.update(chunk)\n    return h.hexdigest()\n\ndef missing_pct(df: pd.DataFrame):\n    return (df.isna().mean() * 100).round(2).to_dict()\n\n# ---------- Inputs ----------\nINPUTS = ["events_clean.csv","profiles_by_user.csv","transactions_clean.csv"]\nfor fn in INPUTS:\n    if not Path(fn).exists():\n        raise FileNotFoundError(f"Missing input file: {fn}")\n\n# ---------- Workspace structure ----------\nROOT = Path("capstone")\nRAW = ROOT / "raw"\nPROCESSED = ROOT / "processed"\nOUTPUTS = ROOT / "outputs"\nSCRIPTS = ROOT / "scripts"\nNOTEBOOKS = ROOT / "notebooks"\nfor p in [RAW, PROCESSED, OUTPUTS, SCRIPTS, NOTEBOOKS]:\n    p.mkdir(parents=True, exist_ok=True)\n\n# Copy raw inputs (never modify)\nfor fn in INPUTS:\n    (RAW / fn).write_bytes(Path(fn).read_bytes())\n\n# ---------- Load ----------\nevents = pd.read_csv(RAW / "events_clean.csv")\nprofiles = pd.read_csv(RAW / "profiles_by_user.csv")\ntx = pd.read_csv(RAW / "transactions_clean.csv")\n\n# ---------- Standardize columns ----------\nfor df in [events, profiles, tx]:\n    df.columns = [c.strip().lower() for c in df.columns]\n\n# Minimal schema expectations (soft checks)\n# events: user_id,event_time,event_type,(theme optional)\n# profiles: user_id,country,channel (at least user_id)\n# tx: tx_id,user_id,amount,created_at,(country/channel optional)\nfor name, df, cols in [\n    ("events", events, {"user_id","event_time","event_type"}),\n    ("profiles", profiles, {"user_id"}),\n    ("tx", tx, {"user_id","amount","created_at"})\n]:\n    missing = cols - set(df.columns)\n    if missing:\n        raise ValueError(f"{name}: missing required columns: {missing}")\n\n# ---------- Clean basic types ----------\ndef clean_str(s: pd.Series) -> pd.Series:\n    return s.astype(str).str.strip().str.replace(r"\s+", " ", regex=True)\n\nevents["user_id"] = clean_str(events["user_id"])\nevents["event_type"] = clean_str(events["event_type"]).str.lower()\nevents["event_time"] = pd.to_datetime(events["event_time"], errors="coerce", utc=True)\n\nprofiles["user_id"] = clean_str(profiles["user_id"])\nif "country" in profiles.columns:\n    profiles["country"] = clean_str(profiles["country"]).str.title()\nif "channel" in profiles.columns:\n    profiles["channel"] = clean_str(profiles["channel"]).str.lower()\n\ntx["user_id"] = clean_str(tx["user_id"])\ntx["amount"] = pd.to_numeric(tx["amount"], errors="coerce")\ntx["created_at"] = pd.to_datetime(tx["created_at"], errors="coerce", utc=True)\n\n# ---------- Build user-level aggregates ----------\nevents["is_validated"] = (events["event_type"] == "validated").astype(int)\n\n# enrolled_at = first enrolled event; fallback min(event_time)\nenrolled_at = (\n    events.loc[events["event_type"] == "enrolled"]\n    .groupby("user_id")["event_time"].min()\n    .rename("enrolled_at")\n)\n\nuser_events = events.groupby("user_id").agg(\n    n_events=("event_type", "count"),\n    first_event_at=("event_time", "min"),\n    last_event_at=("event_time", "max"),\n    validated=("is_validated", "max")\n).reset_index()\n\nuser_events = user_events.merge(enrolled_at.reset_index(), on="user_id", how="left")\nuser_events["enrolled_at"] = user_events["enrolled_at"].fillna(user_events["first_event_at"])\nuser_events = user_events.drop(columns=["first_event_at"])\n\nuser_tx = tx.groupby("user_id").agg(\n    n_transactions=("user_id", "count"),\n    total_amount=("amount", "sum")\n).reset_index()\n\n# Deduplicate profiles safely (keep last non-null-ish row)\nprofiles_dedup = profiles.copy()\nprofiles_dedup["_non_nulls"] = profiles_dedup.notna().sum(axis=1)\nprofiles_dedup = profiles_dedup.sort_values(["user_id","_non_nulls"]).drop_duplicates("user_id", keep="last")\nprofiles_dedup = profiles_dedup.drop(columns=["_non_nulls"])\n\n# ---------- Final join ----------\nfinal = user_events.merge(profiles_dedup, on="user_id", how="left")\nfinal = final.merge(user_tx, on="user_id", how="left")\n\nfinal["n_transactions"] = final["n_transactions"].fillna(0).astype(int)\nfinal["total_amount"] = final["total_amount"].fillna(0.0)\nfinal["validated"] = final["validated"].fillna(0).astype(int)\n\n# Column order (stable)\nbase_cols = ["user_id","country","channel","enrolled_at","validated","last_event_at","n_events","total_amount","n_transactions"]\nfinal_cols = [c for c in base_cols if c in final.columns] + [c for c in final.columns if c not in base_cols]\nfinal = final[final_cols]\n\n# Save final dataset at capstone root (easy server validation)\nfinal.to_csv(ROOT / "final_dataset.csv", index=False)\n\n# ---------- Quality report ----------\ncountry_ok = True\nif "country" in final.columns:\n    country_nonempty = final["country"].fillna("").astype(str).str.strip().ne("").mean()\n    country_ok = (country_nonempty >= 0.90)\n\nchecks = {\n    "user_id_unique": bool(final["user_id"].is_unique),\n    "rows_ge_200": bool(len(final) >= 200),\n    "total_amount_non_negative": bool((final["total_amount"] >= 0).fillna(False).all()),\n    "country_nonempty_ge_90pct": bool(country_ok)\n}\npassed = all(checks.values())\nfailed_checks = [k for k,v in checks.items() if not v]\n\nquality = {\n  "created_at": datetime.utcnow().isoformat()+"Z",\n  "rows": int(len(final)),\n  "unique_user_id": int(final["user_id"].nunique()),\n  "duplicate_user_id": int(len(final) - final["user_id"].nunique()),\n  "missing_pct": missing_pct(final),\n  "date_min": None if final["enrolled_at"].isna().all() else pd.to_datetime(final["enrolled_at"], errors="coerce", utc=True).min().isoformat(),\n  "date_max": None if final["last_event_at"].isna().all() else pd.to_datetime(final["last_event_at"], errors="coerce", utc=True).max().isoformat(),\n  "checks": checks,\n  "passed": bool(passed),\n  "failed_checks": failed_checks\n}\n(ROOT / "quality_report.json").write_text(json.dumps(quality, ensure_ascii=False, indent=2), encoding="utf-8")\n\n# ---------- Data dictionary ----------\ndd_rows = [\n  {"column_name":"user_id","description":"Identifiant apprenant","type":"string","unit":"","example":"U001","source":"events/profiles/tx","transformation_notes":"strip"},\n  {"column_name":"country","description":"Pays de l'apprenant","type":"string","unit":"","example":"Togo","source":"profiles","transformation_notes":"title-case upstream"},\n  {"column_name":"channel","description":"Canal acquisition","type":"string","unit":"","example":"facebook","source":"profiles/marketing","transformation_notes":"lowercase upstream"},\n  {"column_name":"enrolled_at","description":"Date d'inscription (proxy: 1er event enrolled, fallback min event)","type":"datetime","unit":"UTC","example":"2026-01-03T10:12:00Z","source":"events","transformation_notes":"min(enrolled event_time) else min(event_time)"},\n  {"column_name":"last_event_at","description":"Dernier event observé","type":"datetime","unit":"UTC","example":"2026-01-25T18:20:00Z","source":"events","transformation_notes":"max(event_time)"},\n  {"column_name":"n_events","description":"Nombre total d'événements","type":"int","unit":"count","example":"42","source":"events","transformation_notes":"count"},\n  {"column_name":"validated","description":"A validé au moins une fois","type":"int","unit":"0/1","example":"1","source":"events","transformation_notes":"max(is_validated)"},\n  {"column_name":"n_transactions","description":"Nombre de transactions","type":"int","unit":"count","example":"3","source":"tx","transformation_notes":"count"},\n  {"column_name":"total_amount","description":"Montant total transactions","type":"float","unit":"currency","example":"1200.5","source":"tx","transformation_notes":"sum(amount)"}\n]\npd.DataFrame(dd_rows).to_csv(ROOT / "data_dictionary.csv", index=False)\n\n# ---------- Lineage + README ----------\nlineage = """# Lineage — Module 2 Theme 5\n\n## Sources (raw)\n- raw/events_clean.csv\n- raw/profiles_by_user.csv\n- raw/transactions_clean.csv\n\n## Transformations\n- events -> user_events (groupby user_id: counts, min/max times, validated flag, enrolled_at)\n- tx -> user_tx (groupby user_id: count/sum)\n- profiles -> profiles_dedup (1 row/user_id)\n- final = user_events LEFT JOIN profiles_dedup LEFT JOIN user_tx\n\n## Outputs (capstone root)\n- final_dataset.csv\n- quality_report.json\n- data_dictionary.csv\n- lineage.md\n- README.md\n"""\n(ROOT / "lineage.md").write_text(lineage, encoding="utf-8")\n\nreadme = """# Module 2 — Capstone collecte (Collection Pack)\n\n## Reproduire\n1) Mettre à la racine du notebook :\n   - events_clean.csv\n   - profiles_by_user.csv\n   - transactions_clean.csv\n2) Exécuter : theme-5_capstone_packager.ipynb\n3) Le pack est généré dans le dossier `capstone/` et zippé.\n\n## Validation attendue\n- user_id_unique == true\n- rows_ge_200 == true\n- total_amount_non_negative == true\n- country_nonempty_ge_90pct == true (si colonne country présente)\n"""\n(ROOT / "README.md").write_text(readme, encoding="utf-8")\n\n# ---------- Scripts placeholders (required in ZIP) ----------\n(SCRIPTS / "extract_sql.sql").write_text(\n"""-- extract_sql.sql (Theme 5)\n-- Placeholder: include the final SQL used for extraction (Theme 3).\n-- Provide either a real query or paste the query outputs’ provenance.\n""", encoding="utf-8"\n)\n\n(SCRIPTS / "extract_api.py").write_text(\n"""# extract_api.py (Theme 5)\n# Placeholder: include the API extraction script (Theme 4).\n# At minimum, document endpoint, params, pagination, rate limit handling.\n""", encoding="utf-8"\n)\n\n(SCRIPTS / "powerquery.m").write_text(\n"""// powerquery.m (Theme 5)\n// Placeholder: paste Power Query M code used to clean files (Theme 2).\n""", encoding="utf-8"\n)\n\n# Try to include notebook file if present (platform provides it)\nnb_name = "theme-5_capstone_packager.ipynb"\nif Path(nb_name).exists():\n    (NOTEBOOKS / "packager.ipynb").write_bytes(Path(nb_name).read_bytes())\n\n# ---------- Manifest (audit-friendly) ----------\nmanifest = {\n  "run_id": "m2_capstone_" + datetime.utcnow().strftime("%Y%m%d_%H%M%S"),\n  "created_at": datetime.utcnow().isoformat()+"Z",\n  "environment": {\n    "python": sys.version.split()[0],\n    "platform": platform.platform()\n  },\n  "inputs": [{"file": fn, "sha256": sha256_file(RAW / fn)} for fn in INPUTS],\n  "outputs": [\n    {"file": "final_dataset.csv"},\n    {"file": "quality_report.json"},\n    {"file": "data_dictionary.csv"},\n    {"file": "lineage.md"},\n    {"file": "README.md"}\n  ],\n  "rows_final": int(len(final)),\n  "passed": bool(passed),\n  "failed_checks": failed_checks\n}\n(ROOT / "manifest.json").write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")\n\n# ---------- Zip entire capstone folder ----------\nzip_name = "module2_capstone_collection_pack.zip"\nwith zipfile.ZipFile(zip_name, "w", compression=zipfile.ZIP_DEFLATED) as z:\n    for fp in ROOT.rglob("*"):\n        if fp.is_file():\n            z.write(fp, arcname=str(fp))\n\nprint("✅ Generated:", zip_name, "| rows:", len(final), "| passed:", passed, "| failed:", failed_checks)\n