In [1]:
%run ../0-utils/0-Base.py

In [2]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark.conf.set("spark.sql.shuffle.partitions", 16)
spark.conf.set("spark.sql.execution.arrow.enabled", True)

In [3]:
%%time

df = spark.read.csv("../data/raw/*_transactions.csv", header=True)
df = df.withColumn("month_lag", F.col("month_lag").cast(T.IntegerType()))
df = df.withColumn("purchase_date", F.col("purchase_date").cast(T.DateType()))
df = df.withColumn("purchase_amount", F.col("purchase_amount").cast(T.DoubleType()))

df = df.withColumn("authorized_flag", F.when(df.authorized_flag == "Y", 1) \
                                       .when(df.authorized_flag == "N", 0))

for key, value in dict(authorized_purchase_amount=1, not_authorized_purchase_amount=0).items():
    df = df.withColumn(key, F.when(df.authorized_flag == value, df.purchase_amount).otherwise(None))
    
df = df.withColumn("installments", F.when(df.installments != "999", df.installments).otherwise("-1"))

for value in range(-1, 13):
    df = df.withColumn(f"installments_{value}", F.when(df.installments == value, 1).otherwise(0))
    
df = df.withColumn("purchase_year", F.year("purchase_date"))
df = df.withColumn("purchase_month", F.month("purchase_date"))

agg = (
    F.avg("authorized_flag"),
    
    F.min("purchase_amount"),
    F.avg("purchase_amount"),
    F.max("purchase_amount"),
    
    F.count(F.lit(1)).alias("count"),
    
    F.first("purchase_year", True),
    F.first("purchase_month", True),
    
    F.min("authorized_purchase_amount"),
    F.avg("authorized_purchase_amount"),
    F.max("authorized_purchase_amount"),
    
    F.min("not_authorized_purchase_amount"),
    F.avg("not_authorized_purchase_amount"),
    F.max("not_authorized_purchase_amount"),
    
    F.avg("installments_-1"),
    F.avg("installments_0"),
    F.avg("installments_1"),
    F.avg("installments_2"),
    F.avg("installments_3"),
    F.avg("installments_4"),
    F.avg("installments_5"),
    F.avg("installments_6"),
    F.avg("installments_7"),
    F.avg("installments_8"),
    F.avg("installments_9"),
    F.avg("installments_10"),
    F.avg("installments_11"),
    F.avg("installments_12"),
)

pandas_df = df.groupBy("card_id", "month_lag").agg(*agg).orderBy("card_id", "month_lag").toPandas()

pandas_df.to_feather("../data/1-feature-engineered/aggregated-transactions-by-card-id.feather")

display(pandas_df)



Unnamed: 0,card_id,month_lag,avg(authorized_flag),min(purchase_amount),avg(purchase_amount),max(purchase_amount),count,"first(purchase_year, true)","first(purchase_month, true)",min(authorized_purchase_amount),avg(authorized_purchase_amount),max(authorized_purchase_amount),min(not_authorized_purchase_amount),avg(not_authorized_purchase_amount),max(not_authorized_purchase_amount),avg(installments_-1),avg(installments_0),avg(installments_1),avg(installments_2),avg(installments_3),avg(installments_4),avg(installments_5),avg(installments_6),avg(installments_7),avg(installments_8),avg(installments_9),avg(installments_10),avg(installments_11),avg(installments_12)
0,C_ID_00007093c1,-12,1.000000,-0.572600,-0.333604,0.037176,4,2017,2,-0.572600,-0.333604,0.037176,,,,0.0,0.0,0.500000,0.25,0.250000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,C_ID_00007093c1,-11,0.700000,-0.677936,-0.571263,-0.401298,10,2017,3,-0.677936,-0.560815,-0.401298,-0.671775,-0.595641,-0.557574,0.0,0.0,1.000000,0.00,0.000000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,C_ID_00007093c1,-10,0.750000,-0.728876,-0.542391,0.004418,12,2017,4,-0.728876,-0.546012,0.004418,-0.671775,-0.531528,-0.311139,0.0,0.0,1.000000,0.00,0.000000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3044125,C_ID_fffffd5772,-1,0.900000,-0.728876,-0.692835,-0.581616,10,2018,1,-0.728876,-0.688830,-0.581616,-0.728876,-0.728876,-0.728876,0.0,0.0,1.000000,0.00,0.000000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3044126,C_ID_fffffd5772,0,0.666667,-0.745405,-0.697689,-0.311139,24,2018,2,-0.745405,-0.677963,-0.311139,-0.745405,-0.737141,-0.728876,0.0,0.0,0.958333,0.00,0.041667,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3044127,C_ID_fffffd5772,1,1.000000,-0.733985,-0.708490,-0.665765,3,2018,3,-0.733985,-0.708490,-0.665765,,,,0.0,0.0,1.000000,0.00,0.000000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


CPU times: user 1.64 s, sys: 1.23 s, total: 2.87 s
Wall time: 1min 44s
