In [None]:
# ===============================
# üì¶ Imports and setup
# ===============================
import os
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

In [None]:
# ===============================
# ‚öôÔ∏è Initialize Spark
# ===============================

spark = pyspark.sql.SparkSession.builder \
    .appName("gold_feature_store_daily") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .master("local[*]") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# ===============================
# üì• Load Silver Tables
# ===============================

print("Loading Silver layer tables...\n")

df_userlogs = spark.read.parquet("/app/datamart/silver/user_logs")
df_transactions = spark.read.parquet("/app/datamart/silver/transactions")
df_latest_transactions = spark.read.parquet("/app/datamart/silver/latest_transactions")
df_members = spark.read.parquet("/app/datamart/silver/members")
txn_snapshots = (spark.read
                  .option("header", True)
                  .option("inferSchema", True)
                  .parquet("/app/datamart/silver/max_expiry_transactions"))
    
print("Done\n")

In [None]:
# ===============================
# üë§ Registered Users
# ===============================

snapshot_users = txn_snapshots.select("snapshot_date", "msno").distinct()
snapshot_users = snapshot_users.join(df_members, on="msno", how="left")
print(f"üìÑ Registered users for all dates: {snapshot_users.count()}")


In [None]:
df_userlogs.show(5)

In [None]:
snapshot_users.show(5)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# --- 0) Hygiene: ensure date types
df_userlogs    = df_userlogs.withColumn("date", F.to_date("date"))
snapshot_users = snapshot_users.withColumn("snapshot_date", F.to_date("snapshot_date"))

# Aliases
u = df_userlogs.alias("u")
s = snapshot_users.select("msno", "snapshot_date").alias("s")

# --- 1) Date windows relative to snapshot_date
start_30 = F.date_sub(F.col("s.snapshot_date"), 30)
end_30   = F.date_sub(F.col("s.snapshot_date"), 1)    # include snapshot day: use F.col("s.snapshot_date")
start_7  = F.date_sub(F.col("s.snapshot_date"), 7)
end_7    = F.date_sub(F.col("s.snapshot_date"), 1)

# --- 2) Build windowed userlogs
userlogs_30d = (
    u.join(
        s,
        (F.col("u.msno") == F.col("s.msno")) &
        (F.col("u.date").between(start_30, end_30)),
        "inner"
    )
)

userlogs_7d = (
    u.join(
        s,
        (F.col("u.msno") == F.col("s.msno")) &
        (F.col("u.date").between(start_7, end_7)),
        "inner"
    )
)

metrics = ["num_25","num_50","num_75","num_985","num_100","num_unq","total_secs"]

# --- 3) 30d aggregates per (msno, snapshot_date)
agg_30d = (
    userlogs_30d
    .groupBy(F.col("s.msno").alias("msno"), F.col("s.snapshot_date").alias("snapshot_date"))
    .agg(
        *[F.sum(F.col(f"u.{m}")).alias(f"{m}_w30_sum") for m in metrics],
        F.countDistinct(F.col("u.date")).alias("active_days_w30")
    )
)

# Completion rate (30d) = sum(num_100) / max(sum(num_unq), 1)
agg_30d = agg_30d.withColumn(
    "complete_rate_w30",
    F.col("num_100_w30_sum") / F.when(F.col("num_unq_w30_sum") > 0, F.col("num_unq_w30_sum")).otherwise(F.lit(1))
)

# Keep a convenience alias for seconds (30d)
agg_30d = agg_30d.withColumnRenamed("total_secs_w30_sum", "sum_secs_w30")

# --- 4) 7d aggregates per (msno, snapshot_date)
agg_7d = (
    userlogs_7d
    .groupBy(F.col("s.msno").alias("msno"), F.col("s.snapshot_date").alias("snapshot_date"))
    .agg(F.sum(F.col("u.total_secs")).alias("sum_secs_w7"))
)

# --- 5) Engagement ratio 7/30
agg_7_30 = (
    agg_30d.select("msno","snapshot_date","sum_secs_w30")
    .join(agg_7d, ["msno","snapshot_date"], "left")
    .withColumn(
        "engagement_ratio_7_30",
        F.col("sum_secs_w7") / F.when(F.col("sum_secs_w30") > 0, F.col("sum_secs_w30")).otherwise(F.lit(1))
    )
)

# --- 6) Days since last play (max date <= snapshot_date)
last_play = (
    u.join(s, (F.col("u.msno")==F.col("s.msno")) & (F.col("u.date") <= F.col("s.snapshot_date")), "inner")
     .groupBy(F.col("s.msno").alias("msno"), F.col("s.snapshot_date").alias("snapshot_date"))
     .agg(F.max(F.col("u.date")).alias("last_play_date"))
     .withColumn("days_since_last_play", F.datediff(F.col("snapshot_date"), F.col("last_play_date")))
)

# --- 7) Trend in total_secs over 30d using slope ‚âà cov(day_idx, daily_secs)/var(day_idx)
daily_secs_30d = (
    userlogs_30d
    .groupBy(F.col("s.msno").alias("msno"), F.col("s.snapshot_date").alias("snapshot_date"), F.col("u.date").alias("date"))
    .agg(F.sum(F.col("u.total_secs")).alias("daily_secs"))
)

w = Window.partitionBy("msno","snapshot_date").orderBy("date")
trend_30d = (
    daily_secs_30d
    .withColumn("day_idx", F.row_number().over(w))  # 1..N
    .groupBy("msno","snapshot_date")
    .agg(
        (F.covar_pop("day_idx","daily_secs") / F.var_pop("day_idx")).alias("trend_secs_w30")
    )
)

# --- 8) Assemble features per (msno, snapshot_date)
features = (
    agg_30d
    .join(agg_7d, ["msno","snapshot_date"], "left")
    .join(agg_7_30.select("msno","snapshot_date","engagement_ratio_7_30"), ["msno","snapshot_date"], "left")
    .join(last_play.select("msno","snapshot_date","days_since_last_play"), ["msno","snapshot_date"], "left")
    .join(trend_30d, ["msno","snapshot_date"], "left")
)

# Optional: fill nulls for numeric outputs
fill_map = {
    "sum_secs_w30": 0.0,
    "active_days_w30": 0,
    "complete_rate_w30": 0.0,
    "sum_secs_w7": 0.0,
    "engagement_ratio_7_30": 0.0,
    "days_since_last_play":  0.0,   # keep None if never played before snapshot
    "trend_secs_w30": 0.0
}
features = features.na.fill({k:v for k,v in fill_map.items() if v is not None})

# --- 9) Join back to snapshot_users (aka registered_users)
registered_users = snapshot_users.alias("su").join(
    features, on=["msno","snapshot_date"], how="left"
)

# If you also want all the 30d sums for other metrics kept:
# They are already present in agg_30d as <metric>_w30_sum, e.g., num_25_w30_sum, ...
# You can select columns as needed:
registered_users.select(
    "msno","snapshot_date",
    "sum_secs_w30","active_days_w30","complete_rate_w30",
    "sum_secs_w7","engagement_ratio_7_30","days_since_last_play","trend_secs_w30"
).show(5, truncate=False)


Transactions

In [None]:
# --- Ensure date types
df_transactions = df_transactions.withColumn("transaction_date", F.to_date("transaction_date"))
registered_users = registered_users.withColumn("snapshot_date", F.to_date("snapshot_date")) \
                                   .withColumn("registration_date", F.to_date("registration_date"))

# --- Join transactions to snapshots and keep only tx up to the snapshot_date
t = df_transactions.alias("t")
r = registered_users.select("msno","snapshot_date","registration_date").alias("r")

tx_asof_snap = (
    t.join(r, F.col("t.msno")==F.col("r.msno"), "inner")
     .where(F.col("t.transaction_date") <= F.col("r.snapshot_date"))
)

# ========= Latest transaction AS OF snapshot_date (per msno, snapshot_date)
w_latest = Window.partitionBy("r.msno","r.snapshot_date").orderBy(F.col("t.transaction_date").desc())
latest_tx = (
    tx_asof_snap
    .withColumn("rn", F.row_number().over(w_latest))
    .where(F.col("rn")==1)
    .select(
        F.col("r.msno").alias("msno"),
        F.col("r.snapshot_date").alias("snapshot_date"),
        F.col("t.transaction_date").alias("latest_transaction_date"),
        F.col("t.is_auto_renew").alias("last_is_auto_renew"),
        F.col("t.plan_list_price").alias("last_plan_list_price")
    )
)

# Tenure days relative to latest tx (as of snapshot)
tenure_asof = latest_tx.join(
    r.select(F.col("msno"), F.col("snapshot_date"), F.col("registration_date")),
    ["msno","snapshot_date"],
    "left"
).withColumn(
    "tenure_days",
    F.datediff(F.col("latest_transaction_date"), F.col("registration_date"))
)

# ========= Auto-renew stats AS OF snapshot_date (per msno, snapshot_date)
auto_renew_stats = (
    tx_asof_snap
    .groupBy(F.col("r.msno").alias("msno"), F.col("r.snapshot_date").alias("snapshot_date"))
    .agg(
        F.sum(F.when(F.col("t.is_auto_renew")==1, 1).otherwise(0)).alias("auto_renew_count"),
        F.count(F.lit(1)).alias("total_tx_before_expire")
    )
    .withColumn(
        "auto_renew_share",
        F.col("auto_renew_count") / F.when(F.col("total_tx_before_expire") > 0, F.col("total_tx_before_expire")).otherwise(F.lit(1))
    )
    .select("msno","snapshot_date","auto_renew_share")
)

# ========= Merge back into registered_users (per msno, snapshot_date)
registered_users = (
    registered_users
    .join(tenure_asof.select("msno","snapshot_date","tenure_days","last_is_auto_renew","last_plan_list_price"),
          ["msno","snapshot_date"], "left")
    .join(auto_renew_stats, ["msno","snapshot_date"], "left")
    .na.fill({
        "tenure_days": 0,
        "last_is_auto_renew": 0,       # or leave null if you prefer
        "last_plan_list_price": 0.0,   # or leave null if you prefer
        "auto_renew_share": 0.0
    })
)

cols_to_drop = ["num_25_w30_sum", "num_50_w30_sum", "num_75_w30_sum", "num_985_w30_sum", "num_100_w30_sum"]
registered_users = registered_users.drop(*cols_to_drop)

# Preview
registered_users.show(10, truncate=False)


In [None]:
# ===============================
# üß± Create Gold Features
# ===============================

print(f"\n{'='*60}")
print(f"üóìÔ∏è Creating Gold Feature Store snapshot for all days")
print(f"{'='*60}\n")

output_path = f"/app/datamart/gold/feature_store"

registered_users.write.mode("overwrite").parquet(output_path)
print(f"‚úÖ Features saved to {output_path}")
print(f"üìä Total records: {registered_users.count()}\n")