# Preprocess

Tras probar con Polars y su LazyFrame, pese a la optimizacion de memoria y de rendimiento que ofrece esta libreria, mi hardware, no permite operaciones con datos de un tamaño tan grande como el de este dataset.

Al analizar mis opciones, he decidido usar PySpark, que utiliza operaciones vectorizadas y columnares, para poder trabajar con estas dimensiones de datos. Además utiliza SQL, un lenguaje con el que tengo experiencia.


In [1]:
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import functions as sf

Como ya he hecho un analisis de los datos de cada tabla y como unirlos en polars, en este script unicamente hare el pipeline de union para tener un dataset limpio para comenzar mi EDA


In [2]:
from datetime import date


BASE_DIR = Path.cwd().parent
DATA_DIR = (BASE_DIR/'data').resolve()
RAW_DIR = (DATA_DIR/'raw').resolve()
PROCESSED_DIR = (DATA_DIR/'processed').resolve()

CUTOFF_DATE = date(2017, 2, 18)

In [3]:
spark = SparkSession.builder\
    .appName('KKBox') \
    .master('local[*]')\
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.memory", "12g") \
    .getOrCreate()

In [4]:
df_members = spark.read.csv(str(RAW_DIR/'members_v3.csv'), header=True, inferSchema=True)
df_train = spark.read.csv(str(RAW_DIR/'train.csv'), header=True, inferSchema=True)
df_tx = spark.read.csv(str(RAW_DIR/'transactions.csv'), header=True, inferSchema=True)
df_user_logs = spark.read.csv(str(RAW_DIR/'user_logs.csv'), header=True, inferSchema=True).repartition(16)

In [5]:
df_members = df_members.drop('registered_via')

In [6]:
df_tx = df_tx.drop('payment_method_id')
df_tx = df_tx.withColumn(
        'transaction_date', sf.to_date(sf.col('transaction_date'), 'yyyyMMdd')
    ).withColumn(
        'membership_expire_date', sf.to_date(sf.col('membership_expire_date'), 'yyyyMMdd')
    )

In [7]:
df_user_logs = df_user_logs.withColumn(
    'date', sf.to_date(sf.col('date'), 'yyyyMMdd')
)

In [8]:
logs = df_user_logs.filter(sf.col('date') <= sf.lit(CUTOFF_DATE))

logs_agg = logs.groupBy('msno').agg(
    sf.countDistinct('date').alias('active_days'),
    sf.sum("num_25").alias("total_25"),
    sf.sum("num_50").alias("total_50"),
    sf.sum("num_75").alias("total_75"),
    sf.sum("num_985").alias("total_985"),
    sf.sum("num_100").alias("total_100"),
    sf.sum("num_unq").alias("unique_song_listened"),
    sf.max("num_unq").alias("max_unique_per_day"),
    sf.sum("total_secs").alias("total_sec_listened"),
    sf.min("date").alias("first_activity"),
    sf.max("date").alias("last_activity"),
).withColumn(
    "lifetime_days", sf.datediff("last_activity", "first_activity")
).withColumn(
    "avg_sec_active_day",
    sf.when(sf.col('active_days') > 0 ,
        sf.col('total_sec_listened')/ sf.col('active_days'))
    .otherwise(sf.lit(None))
)

In [9]:
tx_agg = (df_tx
  .filter(sf.col("transaction_date") <= sf.lit(CUTOFF_DATE))
  .groupBy("msno")
  .agg(
      sf.count("*").alias("num_transactions"),
      sf.sum("actual_amount_paid").alias("total_amount_paid"),
      sf.avg("is_auto_renew").alias("autorenew_rate"),
      sf.avg("is_cancel").alias("cancel_rate"),
      sf.greatest(sf.datediff(sf.max("membership_expire_date"), sf.max("transaction_date")), sf.lit(0))
        .alias("days_until_expire"),
      sf.max_by(sf.col("payment_plan_days"), sf.col("transaction_date")).alias("plan_days_last"),
      sf.max_by(sf.col("plan_list_price")  , sf.col("transaction_date")).alias("plan_price_last"),
  )
)

In [10]:
members = df_members.select("msno","gender","city","bd","registration_init_time")
final = (df_train
         .join(members, "msno", "left")
         .join(logs_agg, "msno", "left")
         .join(tx_agg, "msno", "left"))

final.toPandas().to_parquet(str(PROCESSED_DIR / "data_processed.parquet"))