## 1) Carga del parquet y periodo mensual

In [None]:
from pyspark.sql import functions as F, Window as W

PATH_TRANSACTIONS="/home/jovyan/data/transactions.parquet"
COL_TIMESTAMP="t_dat"
COL_CUSTOMER_ID="customer_id"

df=spark.read.parquet(PATH_TRANSACTIONS)
dfn=(df
     .withColumn("_ts",F.to_timestamp(F.col(COL_TIMESTAMP).cast("string"),"yyyy-MM-dd"))
     .withColumn("period",F.date_trunc("month",F.col("_ts")))
     .select(F.col(COL_CUSTOMER_ID).alias("customer_id"),"period")
     .dropna())
activity=dfn.dropDuplicates()


## 2) Cohorte por mes de primera compra (muestra)

In [None]:
first_tx=(activity.groupBy("customer_id")
           .agg(F.min("period").alias("cohort_month")))
cohort_size=(first_tx.groupBy("cohort_month")
             .agg(F.countDistinct("customer_id").alias("n0")))
first_tx.orderBy("cohort_month","customer_id").show(20,truncate=False)


## 3) Actividad mensual y cohort_index (muestra)

In [None]:
tx_monthly=(activity.groupBy("customer_id","period")
              .agg(F.count("*").alias("hits")))
txm=(tx_monthly.join(first_tx,"customer_id","left")
     .withColumn("cohort_index",
                 (F.months_between("period","cohort_month")/F.lit(1)).cast("int")))
txm.orderBy("cohort_month","cohort_index","customer_id").show(20,truncate=False)


## 4) Retención por cohorte y mes relativo (muestra)

In [None]:
active=(txm.select("cohort_month","cohort_index","customer_id").dropDuplicates())
ret=(active.groupBy("cohort_month","cohort_index")
      .agg(F.countDistinct("customer_id").alias("active_users"))
      .join(cohort_size,"cohort_month","left")
      .withColumn("retention",F.round(F.col("active_users")/F.col("n0"),4))
      .orderBy("cohort_month","cohort_index"))
ret.show(60,truncate=False)


## 5) Matriz de retención (vista amplia)

In [None]:
ret_matrix=(ret.groupBy("cohort_month")
             .pivot("cohort_index")
             .agg(F.first("retention"))
             .orderBy("cohort_month"))
ret_matrix.show(20,truncate=False)
