## Period Tracking Pipeline
Layers: RAW (CSV in UC Volume) ➜ BRONZE (Delta batch) ➜ SILVER (clean) ➜ GOLD (modeled)

Use the RUN_LAYER widget to run ALL or a single layer.
(This notebook could be optionally run as a Job to demo orchestration capabilities)

In [0]:
# Widgets / Params
dbutils.widgets.text("CATALOG", "wwc2025")
dbutils.widgets.text("SCHEMA", "period_pipeline")
dbutils.widgets.text("RAW_VOLUME", "raw_period")
dbutils.widgets.dropdown("RUN_LAYER", "ALL", ["ALL", "RAW", "BRONZE", "SILVER", "GOLD"])

CATALOG   = dbutils.widgets.get("CATALOG")
SCHEMA    = dbutils.widgets.get("SCHEMA")
RAW_VOL   = dbutils.widgets.get("RAW_VOLUME")
RUN_LAYER = dbutils.widgets.get("RUN_LAYER")

spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
spark.sql(f"USE SCHEMA {SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {RAW_VOL}")

RAW_BASE = f"/Volumes/{CATALOG}/{SCHEMA}/{RAW_VOL}"

print(f"Catalog.Schema: {CATALOG}.{SCHEMA}")
print(f"RAW volume    : {RAW_BASE}")
print(f"RUN_LAYER     : {RUN_LAYER}")

## RAW: Generate CSVs with Faker (users, cycles, symptoms, mood_logs)

In [0]:
if RUN_LAYER in ("ALL", "RAW"):
    import numpy as np, pandas as pd, uuid
    from datetime import date, timedelta, datetime

    # Parameters
    SEED    = 42
    N_USERS = 300           # participants
    MONTHS  = 18            # months of history

    # Modern RNG (single source of randomness)
    rng = np.random.default_rng(SEED)

    # Helpers / constants
    def clamp(n, lo, hi): 
        return max(lo, min(hi, n))

    CITIES   = ["New York","Chicago","Dallas","Seattle","Miami","Atlanta","Los Angeles","Boston","Denver"]
    SEGMENTS = ["student","professional","parent","athlete","shift_worker"]
    MOODS    = ["happy","stressed","tired","sad","energized","calm"]
    SYMPTOMS = ["cramps","bloating","headache","acne","cravings","back_pain","fatigue","nausea"]

    base_start = date.today() - timedelta(days=int(30*MONTHS))

    # -------------------------
    # users.csv
    # -------------------------
    users = []
    for uid in range(1, N_USERS+1):
        age = int(rng.normal(29, 7))
        age = clamp(age, 16, 55)  # allow menopause band
        users.append({
            "user_id": uid, 
            "age": age, 
            "city": rng.choice(CITIES), 
            "lifestyle_segment": rng.choice(SEGMENTS)
        })
    users_df = pd.DataFrame(users)

    # -------------------------
    # cycles.csv
    # -------------------------
    cycles = []
    NUM_LONG_BLEEDS    = max(6, N_USERS // 40)   # ~2.5%
    NUM_SHORT_INTERVAL = max(6, N_USERS // 40)

    long_bleed_users = set(rng.choice(np.arange(1, N_USERS+1), size=NUM_LONG_BLEEDS, replace=False).tolist())
    short_int_users  = set(rng.choice(np.arange(1, N_USERS+1), size=NUM_SHORT_INTERVAL, replace=False).tolist())

    def rand_date(start: date, end: date) -> date:
        delta = (end - start).days
        return start + timedelta(days=int(rng.integers(0, max(delta,1))))

    for u in users:
        uid = u["user_id"]
        base_cycle_mu = clamp(int(rng.normal(28, 3)), 23, 35)
        cycle_mu      = clamp(base_cycle_mu + int(rng.normal(0, 2)), 22, 36)

        start_d = rand_date(base_start, date.today() - timedelta(days=30))
        prev_end = None

        while start_d < date.today():
            # period length
            period_len = int(rng.normal(5, 1.5))
            period_len = clamp(period_len, 2, 10)
            # occasional long bleed (>=20d)
            if (uid in long_bleed_users and rng.random() < 0.25) or (rng.random() < 0.02):
                period_len = max(period_len, 20)

            # expected start-to-start
            cycle_len = int(rng.normal(cycle_mu, 2.0))
            cycle_len = clamp(cycle_len, 20, 45)

            # occasional short interval (<15d gap)
            interval_since_prev_end = (start_d - prev_end).days if prev_end else None
            if prev_end and ((uid in short_int_users and rng.random() < 0.25) or (rng.random() < 0.02)):
                start_d = prev_end + timedelta(days=int(rng.integers(5, 14)))
                interval_since_prev_end = (start_d - prev_end).days

            end_d = start_d + timedelta(days=period_len - 1)

            cycles.append({
                "user_id": uid,
                "cycle_start_date": start_d.isoformat(),
                "cycle_end_date": end_d.isoformat(),
                "period_length_days": period_len,
                "cycle_length_days": cycle_len,
                # keep raw interval (nullable) — will be recomputed in SILVER
                "interval_since_prev_end_days": interval_since_prev_end
            })

            # next start
            jitter = int(rng.normal(0, 2))
            prev_end = end_d
            start_d = start_d + timedelta(days=cycle_len + jitter)

    cycles_df = pd.DataFrame(cycles)

    # -------------------------
    # symptoms.csv
    # -------------------------
    symptoms_rows = []
    for _, row in cycles_df.iterrows():
        uid = int(row.user_id)
        s   = pd.to_datetime(row.cycle_start_date).date()
        e   = pd.to_datetime(row.cycle_end_date).date()
        days = (e - s).days + 1
        for d in range(days):
            if rng.random() < 0.6:
                symptoms_rows.append({
                    "user_id": uid,
                    "date": (s + timedelta(days=d)).isoformat(),
                    "symptom": rng.choice(SYMPTOMS),
                    "severity": int(clamp(int(rng.normal(3, 1.1)), 1, 5))
                })
    symptoms_df = pd.DataFrame(symptoms_rows)

    # -------------------------
    # mood_logs.csv
    # -------------------------
    mood_rows = []
    for uid in users_df["user_id"].tolist():
        cursor = base_start
        while cursor <= date.today():
            if rng.random() < 0.5:
                mood_rows.append({
                    "user_id": int(uid),
                    "date": cursor.isoformat(),
                    "mood": rng.choice(MOODS),
                    "notes": rng.choice(["", "", "felt low energy", "great workout", "busy day", "travel", "family event"])
                })
            cursor += timedelta(days=3)
    mood_df = pd.DataFrame(mood_rows)

    # user_prefs.csv
    def pick_bool(p): 
        return bool(rng.choice([1,0], p=[p, 1-p]))

    prefs = []
    for r in users_df.itertuples(index=False):
        age = int(getattr(r, "age", rng.integers(16, 52)))
        prefs.append({
            "user_id": int(r.user_id),
            "vegan": pick_bool(0.12),
            "dairy_free": pick_bool(0.18),
            "caffeine_free": pick_bool(0.20),
            "sensitive_skin": pick_bool(0.25),
            "nut_allergy": pick_bool(0.04),
            "gluten_free": pick_bool(0.07),
            "sexually_active": pick_bool(0.55 if age>=18 else 0.10),
            "temperament": rng.choice(["balanced","anxious","low-energy","irritable"], p=[0.45,0.20,0.20,0.15])
        })
    user_prefs_df = pd.DataFrame(prefs)

    # --- meal_catalog.csv (tags as ';' delimited for RAW; parsed in SILVER) ---
    meal_rows = [
        ("Spinach Lentil Bowl","lunch","high-iron;vegan;gluten-free;anti-inflammatory"),
        ("Tofu Stir-Fry","dinner","vegan;high-protein;anti-inflammatory"),
        ("Greek Yogurt + Berries","breakfast","dairy;low-sugar;anti-inflammatory"),
        ("Oatmeal + Pumpkin Seeds","breakfast","high-iron;vegan"),
        ("Salmon + Quinoa","dinner","omega3;gluten-free;anti-inflammatory"),
        ("Chickpea Pasta + Pesto","dinner","vegan;high-protein"),
        ("Avocado Toast + Seeds","breakfast","vegan;low-sugar"),
        ("Turkey & Greens Wrap","lunch","high-protein;low-sugar"),
    ]
    meals_df = pd.DataFrame([{
        "meal_id": str(uuid.uuid4()), "name": n, "meal_type": mt, "tags": tags
    } for (n,mt,tags) in meal_rows])

    # --- supplement_catalog.csv ---
    supp_rows = [
        ("Magnesium Glycinate 200mg","sleep;cramps;calm;caffeine-free"),
        ("Omega-3 Softgels","omega3;anti-inflammatory"),
        ("Electrolyte Powder (no sugar)","electrolyte;caffeine-free"),
        ("Iron Complex (gentle)","iron;with-food"),
        ("Probiotic Blend","gut;daily"),
    ]
    supp_df = pd.DataFrame([{
        "supplement_id": str(uuid.uuid4()), "name": n, "tags": tags
    } for (n,tags) in supp_rows])

    # --- workout_catalog.csv ---
    wkt_rows = [
        ("Rest day","none","recovery","at-home;no-equipment;low-impact"),
        ("Gentle yoga 20m","low","mobility","at-home;no-equipment;low-impact"),
        ("Walk 30m","low","cardio","at-home"),
        ("Strength circuit 30m","moderate","strength","at-home"),
        ("Low-impact cardio 25m","low","cardio","low-impact"),
    ]
    wkt_df = pd.DataFrame([{
        "workout_id": str(uuid.uuid4()), "name": n, "intensity": it, "focus": f, "tags": tags
    } for (n,it,f,tags) in wkt_rows])

    # --- hygiene_catalog.csv ---
    hyg_rows = [
        ("Ultra-thin pads","pad","sensitive-skin;fragrance-free"),
        ("Organic cotton tampons","tampon","sustainable"),
        ("Period underwear","period-underwear","sustainable;sensitive-skin"),
        ("pH-balanced wash","wash","fragrance-free;sensitive-skin"),
        ("Soothing wipes","wipe","fragrance-free"),
        ("Heat Patch (12h)","heat-patch","heat-therapy"),
    ]
    hyg_df = pd.DataFrame([{
        "hygiene_id": str(uuid.uuid4()), "name": n, "type": t, "tags": tags
    } for (n,t,tags) in hyg_rows])

    # --- wellness_phase_rules.csv (phase x life_stage policy; arrays as ';' strings) ---
    rules_df = pd.DataFrame([
        ("menstrual","teen","iron-rich;heat-therapy;sensitive-skin;caffeine-free","high-caffeine","high-iron;anti-inflammatory;low-sugar","","low","pad;period-underwear;heat-patch","magnesium;electrolyte",""),
        ("menstrual","reproductive","iron-rich;heat-therapy;sensitive-skin;caffeine-free","high-caffeine","high-iron;anti-inflammatory;low-sugar","","low","pad;period-underwear;heat-patch","magnesium;electrolyte",""),
        ("menstrual","perimenopause","iron-rich;heat-therapy;sensitive-skin;caffeine-free","high-caffeine","anti-inflammatory;low-sugar","","low","pad;period-underwear;heat-patch","magnesium;electrolyte",""),
        ("menstrual","menopause","heat-therapy;sensitive-skin;caffeine-free","","anti-inflammatory;low-sugar","","low","heat-patch;wash;wipe","magnesium",""),
        ("follicular","teen","high-protein","","high-protein;anti-inflammatory","","moderate","wash;wipe","electrolyte",""),
        ("follicular","reproductive","high-protein;omega3","","high-protein;anti-inflammatory","","moderate","wash;wipe","omega3;electrolyte",""),
        ("follicular","perimenopause","omega3","","anti-inflammatory;low-sugar","","moderate","wash;wipe","omega3",""),
        ("follicular","menopause","omega3;caffeine-free","","anti-inflammatory;low-sugar","","moderate","wash;wipe","omega3",""),
        ("ovulation","reproductive","electrolyte;caffeine-free","","light;anti-inflammatory","","moderate","wash;wipe","electrolyte",""),
        ("luteal","teen","magnesium;caffeine-free","","anti-inflammatory;low-sugar","","low","heat-patch;wash;wipe","magnesium",""),
        ("luteal","reproductive","magnesium;caffeine-free","","anti-inflammatory;low-sugar","","low","heat-patch;wash;wipe","magnesium",""),
        ("luteal","perimenopause","magnesium;caffeine-free","","anti-inflammatory;low-sugar","","low","heat-patch;wash;wipe","magnesium",""),
        ("luteal","menopause","caffeine-free","","anti-inflammatory;low-sugar","","low","heat-patch;wash;wipe","magnesium",""),
    ], columns=[
        "phase","life_stage","prefer_product_tags","avoid_product_tags",
        "prefer_meal_tags","avoid_meal_tags","workout_intensity","hygiene_types",
        "prefer_supp_tags","avoid_supp_tags"
    ])

    # -------------------------
    # Write RAW CSVs to UC Volume
    # -------------------------
    raw_tables = {
        # original
        "users": users_df,
        "cycles": cycles_df,
        "symptoms": symptoms_df,
        "mood_logs": mood_df,
        "user_prefs": user_prefs_df,
        "meal_catalog": meals_df,
        "supplement_catalog": supp_df,
        "workout_catalog": wkt_df,
        "hygiene_catalog": hyg_df,
        "wellness_phase_rules": rules_df
    }

    for name, df in raw_tables.items():
        out_dir = f"{RAW_BASE}/{name}"
        dbutils.fs.rm(out_dir, recurse=True)
        spark.createDataFrame(df).coalesce(1).write.mode("overwrite").option("header", True).csv(out_dir)
        print(f"Wrote RAW CSV: {out_dir}")


## BRONZE: Batch load RAW CSV ➜ Delta


In [0]:
from pyspark.sql.functions import current_timestamp

def load_csv_once(raw_path: str, table_name: str):
    """
    Read CSVs from UC Volumes (Data Source V2) so `_metadata.file_path` is available.
    Add `_ingest_ts` and write to a BRONZE Delta table in UC.
    """
    df = (
        spark.read
             .format("csv")
             .option("header", True)
             .option("inferSchema", True)
             .load(raw_path)
             .selectExpr("*", "_metadata.file_path as _ingest_file")
             .withColumn("_ingest_ts", current_timestamp())
    )
    df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{CATALOG}.{SCHEMA}.bronze_{table_name}")
    print(f"Created table {CATALOG}.{SCHEMA}.bronze_{table_name}")

if RUN_LAYER in ("ALL", "BRONZE"):
    bronze_tables = [
        "users", "cycles", "symptoms", "mood_logs",
        "user_prefs", "meal_catalog", "supplement_catalog",
        "workout_catalog", "hygiene_catalog", "wellness_phase_rules"
    ]

    for name in bronze_tables:
        load_csv_once(f"{RAW_BASE}/{name}", name)

    # Row counts for sanity check
    for name in bronze_tables:
        cnt = spark.table(f"{CATALOG}.{SCHEMA}.bronze_{name}").count()
        print(f"bronze_{name}: {cnt:,}")


## SILVER: Clean / Type / Dedupe / Derive interval


In [0]:
# ---------- SILVER (combined original + wellness datasets) ----------
from pyspark.sql import functions as F, Window as W

def _split_tags_lower(colname: str):
    # Split on ';', trim and lowercase, and drop empties
    arr = F.split(F.coalesce(F.col(colname), F.lit("")), F.lit(";"))
    arr = F.transform(arr, lambda x: F.lower(F.trim(x)))
    return F.filter(arr, lambda x: x != F.lit(""))

# ===== Original =====

def silver_users():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_users")
          .select(
              F.col("user_id").cast("int").alias("user_id"),
              F.col("age").cast("int").alias("age"),
              F.initcap(F.col("city")).alias("city"),
              F.col("lifestyle_segment").alias("segment"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          )
          .dropDuplicates(["user_id"])
         )
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_users")

def silver_cycles():
    df = spark.table(f"{CATALOG}.{SCHEMA}.bronze_cycles")
    df = (df
          .withColumn("user_id", F.col("user_id").cast("int"))
          .withColumn("cycle_start_date", F.to_date("cycle_start_date"))
          .withColumn("cycle_end_date",   F.to_date("cycle_end_date"))
          .withColumn("period_length_days", F.col("period_length_days").cast("int"))
          .withColumn("cycle_length_days",  F.col("cycle_length_days").cast("int"))
          .withColumn("_ingest_file", F.col("_ingest_file"))
          .withColumn("_ingest_ts",   F.col("_ingest_ts"))
          .filter(F.col("cycle_start_date").isNotNull() & F.col("cycle_end_date").isNotNull())
          .filter(F.col("period_length_days").between(1, 60))
         )

    # Recompute interval_since_prev_end_days reliably (start - previous end)
    w = W.partitionBy("user_id").orderBy(F.col("cycle_start_date").asc())
    df = (df
          .withColumn("prev_end", F.lag("cycle_end_date").over(w))
          .withColumn("interval_since_prev_end_days",
                      F.when(F.col("prev_end").isNull(), F.lit(None).cast("int"))
                       .otherwise(F.datediff(F.col("cycle_start_date"), F.col("prev_end"))))
          .drop("prev_end")
         ) \
         .dropDuplicates(["user_id","cycle_start_date","cycle_end_date"])

    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_cycles")

def silver_symptoms():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_symptoms")
          .withColumn("user_id", F.col("user_id").cast("int"))
          .withColumn("date", F.to_date("date"))
          .withColumn("symptom", F.lower(F.col("symptom")))
          .withColumn("severity", F.col("severity").cast("int"))
          .filter(F.col("date").isNotNull())
         )
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_symptoms")

def silver_mood_logs():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_mood_logs")
          .withColumn("user_id", F.col("user_id").cast("int"))
          .withColumn("date", F.to_date("date"))
          .withColumn("mood", F.lower(F.col("mood")))
          .withColumn("notes", F.col("notes"))
          .filter(F.col("date").isNotNull())
         )
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_mood_logs")

# ===== New wellness =====

def silver_user_prefs():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_user_prefs")
          .select(
              F.col("user_id").cast("int").alias("user_id"),
              F.col("vegan").cast("boolean"),
              F.col("dairy_free").cast("boolean"),
              F.col("caffeine_free").cast("boolean"),
              F.col("sensitive_skin").cast("boolean"),
              F.col("nut_allergy").cast("boolean"),
              F.col("gluten_free").cast("boolean"),
              F.col("sexually_active").cast("boolean"),
              F.lower(F.col("temperament")).alias("temperament"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          )
          .dropDuplicates(["user_id"])
         )
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_user_prefs")

def silver_meal_catalog():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_meal_catalog")
          .select(
              F.col("meal_id").cast("string").alias("meal_id"),
              F.col("name"),
              F.col("meal_type"),
              _split_tags_lower("tags").alias("tags"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          ))
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_meal_catalog")

def silver_supplement_catalog():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_supplement_catalog")
          .select(
              F.col("supplement_id").cast("string").alias("supplement_id"),
              F.col("name"),
              _split_tags_lower("tags").alias("tags"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          ))
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_supplement_catalog")

def silver_workout_catalog():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_workout_catalog")
          .select(
              F.col("workout_id").cast("string").alias("workout_id"),
              F.col("name"),
              F.lower(F.col("intensity")).alias("intensity"),
              F.lower(F.col("focus")).alias("focus"),
              _split_tags_lower("tags").alias("tags"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          ))
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_workout_catalog")

def silver_hygiene_catalog():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_hygiene_catalog")
          .select(
              F.col("hygiene_id").cast("string").alias("hygiene_id"),
              F.col("name"),
              F.lower(F.col("type")).alias("type"),
              _split_tags_lower("tags").alias("tags"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          ))
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_hygiene_catalog")

def silver_wellness_phase_rules():
    df = (spark.table(f"{CATALOG}.{SCHEMA}.bronze_wellness_phase_rules")
          .select(
              F.lower(F.col("phase")).alias("phase"),
              F.lower(F.col("life_stage")).alias("life_stage"),
              _split_tags_lower("prefer_product_tags").alias("prefer_product_tags"),
              _split_tags_lower("avoid_product_tags").alias("avoid_product_tags"),
              _split_tags_lower("prefer_meal_tags").alias("prefer_meal_tags"),
              _split_tags_lower("avoid_meal_tags").alias("avoid_meal_tags"),
              F.lower(F.col("workout_intensity")).alias("workout_intensity"),
              _split_tags_lower("hygiene_types").alias("hygiene_types"),
              _split_tags_lower("prefer_supp_tags").alias("prefer_supp_tags"),
              _split_tags_lower("avoid_supp_tags").alias("avoid_supp_tags"),
              F.col("_ingest_file"), F.col("_ingest_ts")
          ))
    df.write.mode("overwrite").option("mergeSchema","true").saveAsTable(f"{CATALOG}.{SCHEMA}.silver_wellness_phase_rules")

# Execute all SILVER transforms in one go
if RUN_LAYER in ("ALL", "SILVER"):
    silver_users()
    silver_cycles()
    silver_symptoms()
    silver_mood_logs()

    silver_user_prefs()
    silver_meal_catalog()
    silver_supplement_catalog()
    silver_workout_catalog()
    silver_hygiene_catalog()
    silver_wellness_phase_rules()

    for t in [
        "users","cycles","symptoms","mood_logs",
        "user_prefs","meal_catalog","supplement_catalog",
        "workout_catalog","hygiene_catalog","wellness_phase_rules"
    ]:
        print(f"silver_{t}: {spark.table(f'{CATALOG}.{SCHEMA}.silver_{t}').count():,}")


## GOLD: Modeled tables & view
- `gold_cycles` with irregular flags
- `gold_user_cycle_metrics` per-user rollups
- `vw_cycle_irregularities` helper for demos


In [0]:
from pyspark.sql import functions as F

def gold_cycles():
    spark.sql(f"""
        CREATE OR REPLACE TABLE {CATALOG}.{SCHEMA}.gold_cycles AS
        SELECT
          user_id,
          cycle_start_date,
          cycle_end_date,
          period_length_days,
          cycle_length_days,
          interval_since_prev_end_days,
          (period_length_days >= 11)                                     AS is_abnormally_long_period,
          (interval_since_prev_end_days IS NOT NULL AND interval_since_prev_end_days < 15) AS is_short_interval
        FROM {CATALOG}.{SCHEMA}.silver_cycles
    """)
    print("gold_cycles                :", spark.table(f"{CATALOG}.{SCHEMA}.gold_cycles").count())

def gold_user_cycle_metrics():
    # per-user averages & counts (simple group-by)
    spark.sql(f"""
        CREATE OR REPLACE TABLE {CATALOG}.{SCHEMA}.gold_user_cycle_metrics AS
        SELECT
          user_id,
          ROUND(AVG(cycle_length_days), 1)  AS avg_cycle_length_days,
          ROUND(AVG(period_length_days), 1) AS avg_period_length_days,
          COUNT(*)                           AS cycles_count
        FROM {CATALOG}.{SCHEMA}.gold_cycles
        GROUP BY user_id
    """)
    print("gold_user_cycle_metrics    :", spark.table(f"{CATALOG}.{SCHEMA}.gold_user_cycle_metrics").count())

def gold_irregularities_view():
    spark.sql(f"""
        CREATE OR REPLACE VIEW {CATALOG}.{SCHEMA}.vw_cycle_irregularities AS
        SELECT
          c.user_id,
          c.cycle_start_date,
          c.cycle_end_date,
          c.period_length_days,
          c.interval_since_prev_end_days,
          c.is_abnormally_long_period,
          c.is_short_interval
        FROM {CATALOG}.{SCHEMA}.gold_cycles c
        WHERE c.is_abnormally_long_period = TRUE OR c.is_short_interval = TRUE
        ORDER BY user_id, cycle_start_date
    """)
    print("vw_cycle_irregularities    : ready")

def gold_user_lifestage_view():
    spark.sql(f"""
        CREATE OR REPLACE VIEW {CATALOG}.{SCHEMA}.gold_user_lifestage AS
        SELECT
          u.user_id,
          u.age,
          CASE
            WHEN u.age < 18 THEN 'teen'
            WHEN u.age BETWEEN 18 AND 39 THEN 'reproductive'
            WHEN u.age BETWEEN 40 AND 47 THEN 'perimenopause'
            ELSE 'menopause'
          END AS life_stage
        FROM {CATALOG}.{SCHEMA}.silver_users u
    """)
    print("gold_user_lifestage        : ready")

def gold_user_current_phase_view():
    # simple heuristic using last start and averages
    spark.sql(f"""
        CREATE OR REPLACE VIEW {CATALOG}.{SCHEMA}.gold_user_current_phase AS
        WITH last_cycle AS (
          SELECT user_id, MAX(cycle_start_date) AS last_start
          FROM {CATALOG}.{SCHEMA}.gold_cycles
          GROUP BY user_id
        ),
        avg AS (
          SELECT
            user_id,
            CAST(ROUND(avg_period_length_days) AS INT) AS avg_period_len,
            CAST(ROUND(avg_cycle_length_days)  AS INT) AS avg_cycle_len
          FROM {CATALOG}.{SCHEMA}.gold_user_cycle_metrics
        )
        SELECT
          a.user_id,
          last_start,
          avg_period_len,
          avg_cycle_len,
          CASE
            WHEN DATEDIFF(CURRENT_DATE(), last_start) < COALESCE(avg_period_len,5) THEN 'menstrual'
            WHEN DATEDIFF(CURRENT_DATE(), last_start) < COALESCE(avg_period_len,5) + 7 THEN 'follicular'
            WHEN DATEDIFF(CURRENT_DATE(), last_start) < COALESCE(avg_period_len,5) + 12 THEN 'ovulation'
            ELSE 'luteal'
          END AS phase
        FROM last_cycle l
        JOIN avg a ON a.user_id = l.user_id
    """)
    print("gold_user_current_phase    : ready")

def gold_user_wellness_profile_view():
    # top symptoms (90d) + top moods (90d)
    spark.sql(f"""
        CREATE OR REPLACE VIEW {CATALOG}.{SCHEMA}.gold_user_wellness_profile AS
        WITH recent_sym AS (
          SELECT user_id, symptom, COUNT(*) AS cnt
          FROM {CATALOG}.{SCHEMA}.silver_symptoms
          WHERE date >= DATEADD(day, -90, CURRENT_DATE())
          GROUP BY user_id, symptom
        ),
        ranked_sym AS (
          SELECT
            user_id, symptom, cnt,
            ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY cnt DESC, symptom ASC) AS rk
          FROM recent_sym
        ),
        top_sym AS (
          SELECT user_id, COLLECT_LIST(symptom) AS top_symptoms_90d
          FROM ranked_sym
          WHERE rk <= 3
          GROUP BY user_id
        ),

        recent_mood AS (
          SELECT user_id, mood, COUNT(*) AS cnt
          FROM {CATALOG}.{SCHEMA}.silver_mood_logs
          WHERE date >= DATEADD(day, -90, CURRENT_DATE())
          GROUP BY user_id, mood
        ),
        ranked_mood AS (
          SELECT
            user_id, mood, cnt,
            ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY cnt DESC, mood ASC) AS rk
          FROM recent_mood
        ),
        top_mood AS (
          SELECT user_id, COLLECT_LIST(mood) AS top_moods_90d
          FROM ranked_mood
          WHERE rk <= 2
          GROUP BY user_id
        )

        SELECT
          u.user_id,
          COALESCE(tm.top_moods_90d, ARRAY())    AS top_moods_90d,
          COALESCE(ts.top_symptoms_90d, ARRAY()) AS top_symptoms_90d
        FROM {CATALOG}.{SCHEMA}.silver_users u
        LEFT JOIN top_mood tm ON tm.user_id = u.user_id
        LEFT JOIN top_sym ts  ON ts.user_id = u.user_id
    """)
    print("gold_user_wellness_profile : ready")

def gold_user_wellness_context_view():
    spark.sql(f"""
        CREATE OR REPLACE VIEW {CATALOG}.{SCHEMA}.gold_user_wellness_context AS
        SELECT
          u.user_id,
          ls.life_stage,
          cp.phase,
          p.vegan, p.dairy_free, p.caffeine_free, p.sensitive_skin, p.nut_allergy, p.gluten_free,
          p.sexually_active, p.temperament,
          wp.top_moods_90d, wp.top_symptoms_90d
        FROM {CATALOG}.{SCHEMA}.silver_user_prefs p
        JOIN {CATALOG}.{SCHEMA}.silver_users u                    ON u.user_id = p.user_id
        LEFT JOIN {CATALOG}.{SCHEMA}.gold_user_lifestage ls       ON ls.user_id = u.user_id
        LEFT JOIN {CATALOG}.{SCHEMA}.gold_user_current_phase cp   ON cp.user_id = u.user_id
        LEFT JOIN {CATALOG}.{SCHEMA}.gold_user_wellness_profile wp ON wp.user_id = u.user_id
    """)
    print("gold_user_wellness_context : ready")

# ===== Optional: UC table function to power the wellness agent =====
def create_recommend_wellness_bundle_function():
    spark.sql(f"""
        CREATE OR REPLACE FUNCTION {CATALOG}.{SCHEMA}.recommend_wellness_bundle(p_user_id INT, p_items INT)
        RETURNS TABLE (
          user_id INT,
          phase STRING,
          life_stage STRING,
          section STRING,   -- "meal"|"supplement"|"workout"|"hygiene"
          id STRING,
          name STRING,
          meta STRING       -- JSON blob (tags, etc.)
        )
        RETURN
        WITH
        ctx AS (
          SELECT * FROM {CATALOG}.{SCHEMA}.gold_user_wellness_context WHERE user_id = p_user_id
        ),
        rules AS (
          SELECT * FROM {CATALOG}.{SCHEMA}.silver_wellness_phase_rules
        ),

        -- meals: rank matches then keep top p_items
        m_ranked AS (
          SELECT
            c.user_id, c.phase, c.life_stage,
            'meal' AS section,
            mc.meal_id AS id, mc.name,
            TO_JSON(NAMED_STRUCT('tags', mc.tags)) AS meta,
            ROW_NUMBER() OVER (
              ORDER BY SIZE(ARRAY_INTERSECT(mc.tags, r.prefer_meal_tags)) DESC, mc.name ASC
            ) AS rn
          FROM ctx c
          JOIN rules r ON r.phase = c.phase AND r.life_stage = c.life_stage
          JOIN {CATALOG}.{SCHEMA}.silver_meal_catalog mc
          WHERE SIZE(ARRAY_INTERSECT(mc.tags, r.prefer_meal_tags)) > 0
            AND ( (c.vegan = TRUE        AND ARRAY_CONTAINS(mc.tags, 'vegan'))         OR c.vegan        = FALSE )
            AND ( (c.gluten_free = TRUE  AND ARRAY_CONTAINS(mc.tags, 'gluten-free'))   OR c.gluten_free  = FALSE )
            AND ( (c.dairy_free  = TRUE  AND NOT ARRAY_CONTAINS(mc.tags, 'dairy'))     OR c.dairy_free   = FALSE )
        ),
        m AS (
          SELECT user_id, phase, life_stage, section, id, name, meta
          FROM m_ranked
          WHERE rn <= p_items
        ),

        -- supplements: rank by tag overlap; keep top p_items
        s_ranked AS (
          SELECT
            c.user_id, c.phase, c.life_stage,
            'supplement' AS section,
            sc.supplement_id AS id, sc.name,
            TO_JSON(NAMED_STRUCT('tags', sc.tags)) AS meta,
            ROW_NUMBER() OVER (
              ORDER BY SIZE(ARRAY_INTERSECT(sc.tags, r.prefer_supp_tags)) DESC, sc.name ASC
            ) AS rn
          FROM ctx c
          JOIN rules r ON r.phase = c.phase AND r.life_stage = c.life_stage
          JOIN {CATALOG}.{SCHEMA}.silver_supplement_catalog sc
          WHERE (SIZE(ARRAY_INTERSECT(sc.tags, r.prefer_supp_tags)) > 0 OR SIZE(r.prefer_supp_tags) = 0)
            AND ( (c.caffeine_free = TRUE AND ARRAY_CONTAINS(sc.tags, 'caffeine-free')) OR c.caffeine_free = FALSE )
        ),
        s AS (
          SELECT user_id, phase, life_stage, section, id, name, meta
          FROM s_ranked
          WHERE rn <= p_items
        ),

        -- workout: pick one by rule (keep rn <= 1)
        w_ranked AS (
          SELECT
            c.user_id, c.phase, c.life_stage,
            'workout' AS section,
            wc.workout_id AS id, wc.name,
            TO_JSON(NAMED_STRUCT('intensity', wc.intensity, 'focus', wc.focus, 'tags', wc.tags)) AS meta,
            ROW_NUMBER() OVER (ORDER BY wc.name ASC) AS rn
          FROM ctx c
          JOIN rules r ON r.phase = c.phase AND r.life_stage = c.life_stage
          JOIN {CATALOG}.{SCHEMA}.silver_workout_catalog wc
          WHERE wc.intensity = r.workout_intensity
        ),
        w AS (
          SELECT user_id, phase, life_stage, section, id, name, meta
          FROM w_ranked
          WHERE rn <= 1
        ),

        -- hygiene: rank matches; keep top p_items
        h_ranked AS (
          SELECT
            c.user_id, c.phase, c.life_stage,
            'hygiene' AS section,
            hc.hygiene_id AS id, hc.name,
            TO_JSON(NAMED_STRUCT('type', hc.type, 'tags', hc.tags)) AS meta,
            ROW_NUMBER() OVER (ORDER BY hc.name ASC) AS rn
          FROM ctx c
          JOIN rules r ON r.phase = c.phase AND r.life_stage = c.life_stage
          JOIN {CATALOG}.{SCHEMA}.silver_hygiene_catalog hc
          WHERE ARRAY_CONTAINS(r.hygiene_types, hc.type)
            AND ( (c.sensitive_skin = TRUE AND ARRAY_CONTAINS(hc.tags, 'sensitive-skin')) OR c.sensitive_skin = FALSE )
        ),
        h AS (
          SELECT user_id, phase, life_stage, section, id, name, meta
          FROM h_ranked
          WHERE rn <= p_items
        )

        SELECT * FROM m
        UNION ALL SELECT * FROM s
        UNION ALL SELECT * FROM w
        UNION ALL SELECT * FROM h
    """)
    print("recommend_wellness_bundle  : function created (no parameterized LIMIT)")


# Run all GOLD steps
if RUN_LAYER in ("ALL", "GOLD"):
    gold_cycles()
    gold_user_cycle_metrics()
    gold_irregularities_view()

    gold_user_lifestage_view()
    gold_user_current_phase_view()
    gold_user_wellness_profile_view()
    gold_user_wellness_context_view()

    # Optional: create the recommender function as a tool for your agent
    create_recommend_wellness_bundle_function()


## Quick sanity checks


In [0]:

display(spark.sql(f"SELECT * FROM wwc2025.period_pipeline.gold_user_wellness_context   LIMIT 20"))
display(spark.sql(f"SELECT * FROM wwc2025.period_pipeline.gold_user_current_phase  ORDER BY user_id LIMIT 20"))