In [0]:
# ==========================================================
# GOLD OUTPUTS
# - Source : SILVER JOINED
# - Outputs :
#     1) MART (table analytique principale)
#     2) Aggregations temporelles (daily / weekly / monthly)
#     3) Export Parquet (analytics)
#     4) Export CSV BI-ready
# ==========================================================

import pyspark.sql.functions as F

# ----------------------------------------------------------
# CONFIGURATION
# ----------------------------------------------------------
BASE_PATH = "/Volumes/workspace/ipsldata/capstoneipsl"

SILVER_JOINED_PATH = f"{BASE_PATH}/data/silver/joined"

GOLD_MART_PATH = f"{BASE_PATH}/data/gold/marts/transactions_mart"
GOLD_AGG_PARQUET_PATH = f"{BASE_PATH}/data/gold/aggregates"
GOLD_EXPORT_CSV_PATH = f"{BASE_PATH}/data/gold/exports"

# ----------------------------------------------------------
# 1. READ SILVER JOINED (SOURCE FOR GOLD)
# ----------------------------------------------------------
df_joined = spark.read.parquet(SILVER_JOINED_PATH)

# ----------------------------------------------------------
# 2. BUILD GOLD MART (TABLE ANALYTIQUE PRINCIPALE)
# ----------------------------------------------------------
gold_mart = df_joined.select(
    "transaction_id",
    "client_id",
    "transaction_date",
    "year",
    "amount",
    "loan_flag",
    "high_value_txn_flag",
    "weekend_flag",
    "balance_flag",
    "client_tenure_days"
)

gold_mart.write \
    .mode("overwrite") \
    .partitionBy("year") \
    .parquet(GOLD_MART_PATH)

# ----------------------------------------------------------
# 3. AGGREGATION FUNCTION
# ----------------------------------------------------------
def aggregate_transactions(df_joined):

    df = (
        df_joined
        .withColumn("week", F.weekofyear("transaction_date"))
        .withColumn("month", F.month("transaction_date"))
    )

    daily_agg = df.groupBy("transaction_date").agg(
        F.count("transaction_id").alias("nb_transactions"),
        F.sum("amount").alias("total_amount"),
        F.sum("high_value_txn_flag").alias("high_value_count")
    )

    weekly_agg = df.groupBy("year", "week").agg(
        F.count("transaction_id").alias("nb_transactions"),
        F.sum("amount").alias("total_amount"),
        F.sum("high_value_txn_flag").alias("high_value_count")
    )

    monthly_agg = df.groupBy("year", "month").agg(
        F.count("transaction_id").alias("nb_transactions"),
        F.sum("amount").alias("total_amount"),
        F.sum("high_value_txn_flag").alias("high_value_count")
    )

    return {
        "daily": daily_agg,
        "weekly": weekly_agg,
        "monthly": monthly_agg
    }

# ----------------------------------------------------------
# 4. BUILD AGGREGATES
# ----------------------------------------------------------
aggregates = aggregate_transactions(df_joined)

# ----------------------------------------------------------
# 5. EXPORT GOLD AGGREGATES — PARQUET (MANDATORY)
# ----------------------------------------------------------
for level, df in aggregates.items():
    df.write.mode("overwrite").parquet(
        f"{GOLD_AGG_PARQUET_PATH}/{level}"
    )

# ----------------------------------------------------------
# 6. EXPORT GOLD AGGREGATES — CSV (BI-READY)
# ----------------------------------------------------------
for level, df in aggregates.items():
    (
        df
        .coalesce(1)
        .write
        .mode("overwrite")
        .option("header", True)
        .csv(f"{GOLD_EXPORT_CSV_PATH}/{level}_csv")
    )

print("✅ GOLD MART + AGGREGATES + EXPORTS GENERATED SUCCESSFULLY")
