In [None]:
# Importaciones necesarias de PySpark
from datetime import timedelta
from pyspark.sql.functions import col, when, lit, count, sum, max, date_sub

# En Databricks, la sesión 'spark' ya está creada.
# Las rutas de los archivos deben apuntar a su ubicación en DBFS (Databricks File System).
# Por ejemplo: "/mnt/data/mercado_pago/prints.json"
prints_path = "prints.json"
taps_path = "taps.json"
pays_path = "pays.csv"
output_path = "output_feature_dataset"

In [None]:
# 2.1 Cargar y aplanar Prints
prints_df = spark.read.json(prints_path) \
    .select(
        col("day").cast("date"),
        col("user_id").cast("long"),
        col("event_data.position").cast("integer").alias("position"),
        col("event_data.value_prop").alias("value_prop")
    )

# 2.2 Cargar y aplanar Taps
taps_df = spark.read.json(taps_path) \
    .select(
        col("day").cast("date"),
        col("user_id").cast("long"),
        col("event_data.position").cast("integer").alias("position"),
        col("event_data.value_prop").alias("value_prop")
    )

# 2.3 Cargar Payments
pays_df = spark.read.csv(pays_path, header=True, inferSchema=True) \
    .select(
        col("pay_date").cast("date"),
        col("total").cast("double"),
        col("user_id").cast("long"),
        col("value_prop")
    )

print("Datos cargados y preparados. Mostrando una muestra de 'prints':")
prints_df.show(5)

In [None]:
# 3.1 Definir el periodo de análisis
max_date = prints_df.agg(max("day")).first()[0]
last_week_start_date = max_date - timedelta(days=6)

print(f"Fecha máxima en los datos: {max_date}")
print(f"Inicio de la última semana (periodo base): {last_week_start_date}")

# 3.2 Crear el DataFrame base: prints de la última semana
base_df = prints_df.filter(col("day") >= last_week_start_date)

print(f"\nNúmero de prints en la última semana: {base_df.count()}")

In [None]:
# Hacemos un LEFT JOIN con los taps para identificar los clics
taps_with_flag = taps_df.withColumn("clicked", lit(1))
join_cols = ["day", "user_id", "position", "value_prop"]

base_df = base_df.join(taps_with_flag, join_cols, "left") \
    .withColumn("was_clicked", when(col("clicked").isNotNull(), 1).otherwise(0)) \
    .drop("clicked")

print("Feature 'was_clicked' añadida. Muestra:")
base_df.select("user_id", "value_prop", "was_clicked").show(5)

In [None]:
# Filtramos los datos al periodo histórico (todo antes de la última semana)
historical_prints = prints_df.filter(col("day") < last_week_start_date)
historical_taps = taps_df.filter(col("day") < last_week_start_date)
historical_pays = pays_df.filter(col("pay_date") < last_week_start_date)

# Calculamos las agregaciones
user_prints_history = historical_prints.groupBy("user_id", "value_prop").agg(count("*").alias("prints_3w_before"))
user_taps_history = historical_taps.groupBy("user_id", "value_prop").agg(count("*").alias("taps_3w_before"))
user_pays_history = historical_pays.groupBy("user_id", "value_prop").agg(
    count("*").alias("pays_3w_before"),
    sum("total").alias("amount_3w_before")
)

print("Agregaciones históricas calculadas. Muestra de historial de pagos:")
user_pays_history.show(5)

In [None]:
# Unimos todas las features históricas al DataFrame base
final_df = base_df.join(user_prints_history, ["user_id", "value_prop"], "left") \
                  .join(user_taps_history, ["user_id", "value_prop"], "left") \
                  .join(user_pays_history, ["user_id", "value_prop"], "left")

# Rellenamos los nulos que significan "sin actividad" con 0
final_df = final_df.fillna(0, subset=[
    "prints_3w_before", "taps_3w_before", "pays_3w_before", "amount_3w_before"
])

# Seleccionamos y ordenamos las columnas para el resultado final
final_df = final_df.select(
    "day", "user_id", "position", "value_prop", "was_clicked",
    "prints_3w_before", "taps_3w_before", "pays_3w_before", "amount_3w_before"
).orderBy("day", "user_id")

In [None]:
# Mostramos una muestra del dataset final
print("Dataset de features finalizado. Muestra:")
display(final_df) # 'display' en Databricks ofrece una vista enriquecida

# Guardamos el resultado en formato Delta Lake
final_df.write.format("delta").mode("overwrite").save(output_path)

print(f"\n¡Proceso completado! El dataset ha sido guardado en: {output_path}")