In [1]:
import findspark
findspark.init()
import datetime as dt

import pandas as pd 
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, datediff, to_date, lit

In [2]:
def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)


# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df,sparkSession):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    i = 0
    for column, typo in zip(columns, types): 
        struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sparkSession.createDataFrame(pandas_df, p_schema)

In [3]:
# start spark engine 
conf = pyspark.SparkConf().setAppName('tes_spark').setMaster('local')
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [4]:
# customer information 
df_cli = spark \
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter", ",")\
    .load("../../data/data/clients.csv")

In [5]:
# customer flag 
df_up_train = spark \
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter", ",")\
    .load("../../data/data/uplift_train.csv")

In [6]:
# custommer id for testing
df_up_test = spark \
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter", ",")\
    .load("../../data/data/uplift_test.csv")

In [7]:
# product information
df_pro = spark \
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter", ",")\
    .load("../../data/data/products.csv")

In [8]:
# purchase transactional data 
df_pur = spark \
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter", ",")\
    .load("../../data/data/purchases.csv")

In [9]:
tes_df = spark \
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter", ",")\
    .load("D:/works/master_tilburg/dss/thesis/data/feature_stg2.csv")

In [167]:
tes_df.limit(3).toPandas()

Unnamed: 0,client_id,regular_points_received,express_points_spent,purchase_sum,avg_n_prod,avg_n_prod_qty,trn_sum_from_iss,trn_sum_from_red,n_transaction,store_id_pur,s_purchase_sum,store_id_pur_qty,store_n_product,product_pur,s_purchase,product_qty,n_product
0,02429418df,37.3,0.0,4156.77,8.5,62.0,539.914286,0.0,6,d09acf8114,1349.0,2fe93e36be,21,4009f09b04,3190.7,4009f09b04,3
1,02d6c08e7d,5.9,0.0,747.0,1.888889,30.0,500.583333,0.0,9,7763d9b151,721.0,7763d9b151,8,21e8f864ff,506.0,21e8f864ff,7
2,03f35da9a5,30.2,0.0,5661.85,3.647059,87.0,1455.9,0.0,17,04d336aec5,5661.85,04d336aec5,47,222c727a1d,2090.06,222c727a1d,4


In [32]:
dt.date(2019, 11, 20)

datetime.date(2019, 11, 20)

In [31]:
dt.datetime(2019, 11, 20, 12,37,56)

datetime.datetime(2019, 11, 20, 12, 37, 56)

# start 

In [10]:
# fill missing value in first redeem date with max date 
df_cli = df_cli.withColumn('first_redeem_date2', fn.when(fn.col('first_redeem_date').isNull(), dt.datetime(2019, 11, 20, 1,14,10)).otherwise(fn.col('first_redeem_date')))
df_cli = df_cli.drop('first_redeem_date')
df_cli = df_cli.withColumnRenamed('first_redeem_date2','first_redeem_date')
df_cli = df_cli.withColumn("first_redeem_date", fn.to_date(fn.col("first_redeem_date")))

# adding train label 
df_up_train = df_up_train.withColumnRenamed('client_id','client_id2')
df_cli = df_cli.join(df_up_train, df_cli.client_id == df_up_train.client_id2,'inner')
df_cli = df_cli.drop('client_id2')

# fill na values in purchase 
df_pur =  df_pur.fillna(value=0,subset=["trn_sum_from_red"])

In [102]:
df_pur.filter(df_pur.transaction_id == '7e3e2e3984').toPandas()

Unnamed: 0,client_id,transaction_id,transaction_datetime,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,product_id,product_quantity,trn_sum_from_iss,trn_sum_from_red
0,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,9a80204f78,2.0,80.0,0.0
1,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,da89ebd374,1.0,65.0,0.0
2,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,0a95e1151d,1.0,24.0,0.0
3,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,4055b15e4a,2.0,50.0,0.0
4,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,a685f1916b,1.0,22.0,0.0
5,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,21db5dbe53,1.0,34.0,0.0
6,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,1e208d0b4c,1.0,24.0,0.0
7,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,15ccaa8685,1.0,51.0,0.0
8,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,45389bb5b0,1.0,23.0,0.0
9,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,cb4c804130,1.0,60.0,0.0


In [99]:
df_pur.limit(3).toPandas()

Unnamed: 0,client_id,transaction_id,transaction_datetime,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,product_id,product_quantity,trn_sum_from_iss,trn_sum_from_red
0,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,9a80204f78,2.0,80.0,0.0
1,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,da89ebd374,1.0,65.0,0.0
2,000012768d,7e3e2e3984,2018-12-01 07:12:45,10.0,0.0,0.0,0.0,1007.0,54a4a11a29,0a95e1151d,1.0,24.0,0.0


In [76]:
df_cli.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- first_issue_date: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- first_redeem_date: date (nullable = true)
 |-- treatment_flg: integer (nullable = true)
 |-- target: integer (nullable = true)



In [11]:
# join client and purchase 
df_cli = df_cli.withColumnRenamed('client_id','client_id2')

df_cli_pur = df_cli.join(df_pur,df_cli.client_id2 == df_pur.client_id,'inner')
df_cli_pur = df_cli_pur.drop('client_id2')

# keep transactiion before redeem date 
df_cli_pur = df_cli_pur.withColumn("transaction_datetime", fn.to_date(fn.col("transaction_datetime")))
df_cli_pur = df_cli_pur.withColumn('trans_redeem_flag',fn.when(fn.col('transaction_datetime') < fn.col('first_redeem_date'),1).otherwise(0)) # 1 keep, 0 remove 

# add target 
df_up_train = df_up_train.withColumnRenamed('client_id','client_id2')
# df_cli_pur = df_cli_pur.join(df_up_train,df_cli_pur.client_id==df_up_train.client_id2,'inner')
# df_cli_pur = df_cli_pur.drop('client_id2')

# date difference 
df_cli_pur = df_cli_pur.withColumn("d_iss_rdm", datediff(fn.col('first_redeem_date'), fn.col('first_issue_date')))
df_cli_pur = df_cli_pur.withColumn("d_trs_rdm", datediff(fn.col('first_redeem_date'), fn.col('transaction_datetime')))

In [78]:
df_cli_pur.printSchema()

root
 |-- first_issue_date: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- first_redeem_date: date (nullable = true)
 |-- treatment_flg: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- client_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- transaction_datetime: date (nullable = true)
 |-- regular_points_received: double (nullable = true)
 |-- express_points_received: double (nullable = true)
 |-- regular_points_spent: double (nullable = true)
 |-- express_points_spent: double (nullable = true)
 |-- purchase_sum: double (nullable = true)
 |-- store_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_quantity: double (nullable = true)
 |-- trn_sum_from_iss: double (nullable = true)
 |-- trn_sum_from_red: double (nullable = false)
 |-- trans_redeem_flag: integer (nullable = false)
 |-- d_iss_rdm: integer (nullable = true)
 |-- d_trs_rdm: integer (n

# feature extraction 

In [81]:
# points aggregation split by before and after redeem code 
df_feat1 = df_cli_pur.groupby('client_id','age','gender','treatment_flg','target','trans_redeem_flag').agg(fn.countDistinct('transaction_id').alias('n_trans_rdm'), fn.countDistinct('transaction_datetime').alias('days_redeem'))

# pivot
df_feat1a = df_feat1.groupBy('client_id').pivot("trans_redeem_flag").sum("n_trans_rdm")
df_feat1a = df_feat1a.withColumnRenamed('0','n_trans_atr_rdm').withColumnRenamed('1','n_trans_bfr_rdm')
df_feat1b = df_feat1.groupBy('client_id').pivot("trans_redeem_flag").sum("days_redeem")
df_feat1b = df_feat1b.withColumnRenamed('0','n_days_bfr_rdm').withColumnRenamed('1','n_days_atr_rdm')

# join 1a and 1b 
df_feat1b = df_feat1b.withColumnRenamed('client_id','client_id2')
df_feat1_agg = df_feat1a.join(df_feat1b,df_feat1a.client_id == df_feat1b.client_id2,'left').drop('client_id2')

df_feat1_agg.repartition(1).write.mode('overwrite').option("header",True).csv("D:/works/master_tilburg/dss/thesis/data/feat_n_day_trans.csv")

In [131]:
# regular points, express points and avg purchase 

df_feat2 = df_cli_pur.groupby('client_id','age','gender','treatment_flg','target','trans_redeem_flag','transaction_id').agg(fn.avg('regular_points_received').alias('regular_points_received'), \
                                                                                                             fn.avg('express_points_received').alias('express_points_received'), \
                                                                                                            fn.avg('purchase_sum').alias('purchase_sum')
                                                                                                            )

df_feat2 = df_feat2.groupby('client_id','age','gender','treatment_flg','target','trans_redeem_flag').agg(fn.sum('regular_points_received').alias('s_reg_pts_rdm'),fn.avg('regular_points_received').alias('avg_reg_pts_rdm'),\
                                                                                         fn.sum('express_points_received').alias('s_exp_pts_rdm'),fn.avg('express_points_received').alias('avg_exp_pts_rdm'),\
                                                                                         fn.sum('purchase_sum').alias('s_purchase'),fn.avg('purchase_sum').alias('avg_purchase'),\
                                                                                          fn.countDistinct('transaction_id').alias('n_trans')
                                                                                         )

df_feat2a = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('s_reg_pts_rdm').alias('s_reg_pts_rdm'))
df_feat2b = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('avg_reg_pts_rdm').alias('avg_reg_pts_rdm'))

df_feat2c = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('s_exp_pts_rdm').alias('s_exp_pts_rdm'))
df_feat2d = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('avg_exp_pts_rdm').alias('avg_exp_pts_rdm'))

df_feat2e = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('s_purchase').alias('s_purchase'))
df_feat2f = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('avg_purchase').alias('avg_purchase'))

df_feat2g = df_feat2.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('n_trans').alias('n_trans'))

In [132]:
df_feat2a = df_feat2a.withColumnRenamed('0','s_reg_pts_bfr_rdm').withColumnRenamed('1','s_reg_pts_atr_rdm') 
df_feat2b = df_feat2b.withColumnRenamed('client_id','client_id2').withColumnRenamed('0','avg_reg_pts_bfr_rdm').withColumnRenamed('1','avg_reg_pts_aft_rdm')
df_feat2_agg = df_feat2a.join(df_feat2b, df_feat2a.client_id==df_feat2b.client_id2,'left').drop('client_id2')


df_feat2c = df_feat2c.withColumnRenamed('client_id','client_id2').withColumnRenamed('0','s_exp_pts_bfr_rdm').withColumnRenamed('1','s_exp_pts_aft_rdm')
df_feat2_agg = df_feat2_agg.join(df_feat2c, df_feat2_agg.client_id == df_feat2c.client_id2,'left').drop('client_id2')

df_feat2d = df_feat2d.withColumnRenamed('client_id','client_id2').withColumnRenamed('0','avg_exp_pts_bfr_rdm').withColumnRenamed('1','avg_exp_pts_aft_rdm')
df_feat2_agg = df_feat2_agg.join(df_feat2d, df_feat2_agg.client_id == df_feat2d.client_id2,'left').drop('client_id2')

df_feat2e = df_feat2e.withColumnRenamed('client_id','client_id2').withColumnRenamed('0','s_pur_bfr_rdm').withColumnRenamed('1','s_pur_aft_rdm')
df_feat2_agg = df_feat2_agg.join(df_feat2e, df_feat2_agg.client_id == df_feat2e.client_id2,'left').drop('client_id2')

df_feat2f = df_feat2f.withColumnRenamed('client_id','client_id2').withColumnRenamed('0','avg_pur_bfr_rdm').withColumnRenamed('1','avg_pur_aft_rdm')
df_feat2_agg = df_feat2_agg.join(df_feat2f, df_feat2_agg.client_id == df_feat2f.client_id2,'left').drop('client_id2')

df_feat2g = df_feat2g.withColumnRenamed('client_id','client_id2').withColumnRenamed('0','n_trans_bfr_rdm').withColumnRenamed('1','n_trans_aft_rdm')
df_feat2_agg = df_feat2_agg.join(df_feat2g, df_feat2_agg.client_id == df_feat2g.client_id2,'left').drop('client_id2')

df_feat2_agg.repartition(1).write.mode('overwrite').option("header",True).csv("D:/works/master_tilburg/dss/thesis/data/feat2_pts.csv")

In [12]:
# avg pur per trans
df_feat3_agg =df_cli_pur.select('client_id','product_id','transaction_id','product_quantity','trans_redeem_flag').groupby('client_id','transaction_id','trans_redeem_flag','product_id')\
                .agg(fn.avg('product_quantity').alias('product_quantity'))


df_feat3_agg2 = df_feat3_agg.groupby('client_id','product_id').agg(fn.sum('product_quantity').alias('product_quantity'))
df_feat3_agg2 = df_feat3_agg2.withColumn("rank_qty", fn.row_number().over(Window.partitionBy("client_id").orderBy(fn.col("product_quantity").desc())))
# filter rank 
df_feat3_agg2_rank = df_feat3_agg2.filter(df_feat3_agg2.rank_qty == 1 )

# pivot the redeem flag 
# product qty 

df_feat3_agg3 = df_feat3_agg.groupby('client_id','product_id','trans_redeem_flag').agg(fn.sum('product_quantity').alias('product_quantity'))
# df_feat3_agg_1 = df_feat3_agg.groupby('client_id').pivot('trans_redeem_flag').agg(fn.first(fn.col('product_id')))

# product 
# df_feat3_agg_2 = df_feat3_agg.groupby('client_id').pivot('trans_redeem_flag').agg(fn.sum('product_quantity').alias('prd_qty'))

# # adding row number on purchase and quantity 
# df_feat3_agg_1 = df_feat3_agg_1.withColumn("rank_qty", fn.row_number().over(Window.partitionBy("client_id").orderBy(fn.col("product_quantity").desc())))


In [14]:
df_feat3_agg2_rank.count()

200039

In [15]:
df_feat3_agg2_rank.limit(3).show()

Py4JJavaError: An error occurred while calling o171.showString.
: org.apache.spark.SparkException: Job 15 cancelled 
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1955)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2205)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [141]:
df_feat3_agg.limit(3).toPandas()

Unnamed: 0,client_id,trans_redeem_flag,product_quantity
0,00f6cab0d9,0,1.100666
1,010c5002de,0,1.25188
2,018253c9e4,0,1.118742


In [140]:
df_feat3_agg.columns

['client_id', 'trans_redeem_flag', 'product_quantity']

In [108]:
df_feat2a.count()

200039

In [109]:
df_feat2a.select(fn.countDistinct('client_id')).show()

+-------------------------+
|count(DISTINCT client_id)|
+-------------------------+
|                   200039|
+-------------------------+



In [39]:
df_cli_pur.limit(3).toPandas()

Unnamed: 0,first_issue_date,first_redeem_date,client_id,transaction_id,transaction_datetime,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,product_id,product_quantity,trn_sum_from_iss,trn_sum_from_red,trans_redeem_flag,treatment_flg,target,iss_rdm_diff,trs_rdm_diff
0,2017-09-14 15:27:21,2017-12-25,00f6cab0d9,ad36e268f7,2018-12-01,6.3,0.0,0.0,0.0,630.59,6f953e34e7,c0f299c302,1.0,30.0,0.0,0,0,1,102,-341
1,2017-09-14 15:27:21,2017-12-25,00f6cab0d9,ad36e268f7,2018-12-01,6.3,0.0,0.0,0.0,630.59,6f953e34e7,4009f09b04,1.0,5.0,0.0,0,0,1,102,-341
2,2017-09-14 15:27:21,2017-12-25,00f6cab0d9,ad36e268f7,2018-12-01,6.3,0.0,0.0,0.0,630.59,6f953e34e7,439498bce2,1.0,0.0,0.0,0,0,1,102,-341


In [151]:
df_cli_pur.groupby('target','treatment_flg','drop_transaction').agg(fn.countDistinct('client_id')).toPandas()

Unnamed: 0,target,treatment_flg,drop_transaction,count(client_id)
0,0,1,1,15032
1,0,0,0,29165
2,1,0,1,19993
3,1,1,1,21141
4,1,0,0,50675
5,1,1,0,53320
6,0,1,0,26732
7,0,0,1,16658


# old features


In [159]:
# feature aggregation by transaction 
df_feature = df_cli_pur.groupby('client_id','transaction_id').agg(fn.avg('regular_points_received').alias('regular_points_received'), fn.avg('express_points_spent').alias('express_points_spent') \
                                                                 ,fn.avg('purchase_sum').alias('purchase_sum'),fn.countDistinct('product_id').alias('n_prod')\
                                                                 ,fn.sum('product_quantity').alias('product_quantity'),fn.avg('trn_sum_from_iss').alias('trn_sum_from_iss'),fn.avg('trn_sum_from_red').alias('trn_sum_from_red'))

df_feature = df_feature.groupby('client_id').agg(fn.sum('regular_points_received').alias('regular_points_received'),fn.sum('express_points_spent').alias('express_points_spent')\
                                                ,fn.sum('purchase_sum').alias('purchase_sum'),fn.avg('n_prod').alias('avg_n_prod'),fn.sum('product_quantity').alias('avg_n_prod_qty')\
                                                ,fn.sum('trn_sum_from_iss').alias('trn_sum_from_iss'),fn.sum('trn_sum_from_red').alias('trn_sum_from_red'), fn.countDistinct('transaction_id').alias('n_transaction'))


In [160]:
# top product transactions
df_top_product =df_cli_pur.select('client_id','product_id','purchase_sum').groupby('client_id','product_id')\
                .agg(fn.count('product_id').alias('n_product'),fn.sum('purchase_sum').alias('s_purchase'))

# adding row number on purchase and quantity 
df_top_product = df_top_product.withColumn("rank_pur", fn.row_number().over(Window.partitionBy("client_id").orderBy(fn.col("s_purchase").desc())))
df_top_product = df_top_product.withColumn("rank_qty", fn.row_number().over(Window.partitionBy("client_id").orderBy(fn.col("n_product").desc())))

# filter top product by purchase
df_top_product_purchase = df_top_product.filter(df_top_product.rank_pur == 1)

# filter top product by quantity
df_top_product_qty = df_top_product.filter(df_top_product.rank_qty == 1)

In [161]:
# favorite store 
df_store_gb = df_cli_pur.select('client_id','transaction_id','store_id','purchase_sum','product_id').groupby('client_id','transaction_id','store_id')\
                    .agg(fn.avg('purchase_sum').alias('s_purchase_sum'), fn.countDistinct('product_id').alias('n_prod_id'))

# 
df_store_gb = df_store_gb.select('client_id','transaction_id','store_id','s_purchase_sum','n_prod_id').groupby('client_id','store_id')\
              .agg(fn.sum('s_purchase_sum').alias('s_purchase_sum'), fn.avg('n_prod_id').alias('avg_n_prod_id'))                  

# unique product per store 
df_store_gb2 = df_cli_pur.select('client_id','store_id','product_id').groupby('client_id','store_id').agg(fn.countDistinct('product_id').alias('n_product'))
df_store_gb2 = df_store_gb2.withColumnRenamed('client_id','client_id2').withColumnRenamed('store_id','store_id2')

# join with with unique product for each store 
df_store_gb = df_store_gb.join(df_store_gb2, (df_store_gb.client_id == df_store_gb2.client_id2) & (df_store_gb.store_id == df_store_gb2.store_id2) , 'inner')

df_store_gb = df_store_gb.drop('client_id2','store_id2')

# put ranking 
df_store_gb_rank = df_store_gb.withColumn("rank_s_purchase", fn.row_number().over(Window.partitionBy("client_id").orderBy(fn.col("s_purchase_sum").desc())))
df_store_gb_rank = df_store_gb_rank.withColumn("rank_prod_qty", fn.row_number().over(Window.partitionBy("client_id").orderBy(fn.col("n_product").desc())))


# filter top product by purchase
df_store_gb_top_pur = df_store_gb_rank.filter(df_store_gb_rank.rank_s_purchase == 1)

# filter top product by quantity
df_store_gb_top_qty = df_store_gb_rank.filter(df_store_gb_rank.rank_prod_qty == 1)

In [162]:
# join new feature store and products

# store purchase 
df_store_gb_top_pur = df_store_gb_top_pur.withColumnRenamed('client_id','client_id2')
df_feature = df_feature.join(df_store_gb_top_pur.select('client_id2','store_id','s_purchase_sum'),df_feature.client_id == df_store_gb_top_pur.client_id2,'left' )
df_feature = df_feature.drop('client_id2')
df_feature = df_feature.withColumnRenamed('store_id','store_id_pur')

# store quantity 
df_store_gb_top_qty = df_store_gb_top_qty.withColumnRenamed('client_id','client_id2')
df_feature = df_feature.join(df_store_gb_top_qty.select('client_id2','store_id','n_product'),df_feature.client_id == df_store_gb_top_qty.client_id2,'left' )
df_feature = df_feature.drop('client_id2')
df_feature = df_feature.withColumnRenamed('store_id','store_id_pur_qty').withColumnRenamed('n_product','store_n_product')

# product purchase 
df_top_product_purchase = df_top_product_purchase.withColumnRenamed('client_id','client_id2')
df_feature = df_feature.join(df_top_product_purchase.select('client_id2','product_id','s_purchase'),df_feature.client_id == df_top_product_purchase.client_id2,'left' )
df_feature = df_feature.drop('client_id2')
df_feature = df_feature.withColumnRenamed('product_id','product_pur')

# product qty 
df_top_product_qty = df_top_product_qty.withColumnRenamed('client_id','client_id2')
df_feature = df_feature.join(df_top_product_qty.select('client_id2','product_id','n_product'),df_feature.client_id == df_top_product_qty.client_id2,'left' )
df_feature = df_feature.drop('client_id2')
df_feature = df_feature.withColumnRenamed('product_id','product_qty')

In [164]:
df_feature.repartition(1).write.mode('overwrite').option("header",True).csv("D:/works/master_tilburg/dss/thesis/data/feature_stg2.csv")

In [132]:
df_feature.limit(3).show()

+----------+-----------------------+--------------------+-----------------+------------------+--------------+------------------+----------------+-------------+------------+--------------+----------------+---------------+-----------+----------+-----------+---------+
| client_id|regular_points_received|express_points_spent|     purchase_sum|        avg_n_prod|avg_n_prod_qty|  trn_sum_from_iss|trn_sum_from_red|n_transaction|store_id_pur|s_purchase_sum|store_id_pur_qty|store_n_product|product_pur|s_purchase|product_qty|n_product|
+----------+-----------------------+--------------------+-----------------+------------------+--------------+------------------+----------------+-------------+------------+--------------+----------------+---------------+-----------+----------+-----------+---------+
|02429418df|     37.300000000000004|                 0.0|          4156.77|               8.5|          62.0| 539.9142857142857|             0.0|            6|  d09acf8114|        1349.0|      2fe93e36b

In [152]:
df_feature.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- regular_points_received: double (nullable = true)
 |-- express_points_spent: double (nullable = true)
 |-- purchase_sum: double (nullable = true)
 |-- avg_n_prod: double (nullable = true)
 |-- avg_n_prod_qty: double (nullable = true)
 |-- trn_sum_from_iss: double (nullable = true)
 |-- trn_sum_from_red: double (nullable = true)
 |-- n_transaction: long (nullable = false)
 |-- store_id_pur: string (nullable = true)
 |-- s_purchase_sum: double (nullable = true)
 |-- store_id_pur_qty: string (nullable = true)
 |-- store_n_product: long (nullable = true)
 |-- product_pur: string (nullable = true)
 |-- s_purchase: double (nullable = true)
 |-- product_qty: string (nullable = true)
 |-- n_product: long (nullable = true)



In [133]:
df_feature.count()

72824

In [110]:
df_store_gb_top_qty.limit(3).show()

+----------+----------+------------------+------------------+---------+---------------+-------------+
| client_id|  store_id|    s_purchase_sum|     avg_n_prod_id|n_product|rank_s_purchase|rank_prod_qty|
+----------+----------+------------------+------------------+---------+---------------+-------------+
|02429418df|2fe93e36be|1337.7000000000003|              21.0|       21|              2|            1|
|02d6c08e7d|7763d9b151|             721.0|               2.0|        8|              1|            1|
|03f35da9a5|04d336aec5|           5661.85|3.6470588235294117|       47|              1|            1|
+----------+----------+------------------+------------------+---------+---------------+-------------+



In [107]:
df_store_gb_top_pur.printSchema(), df_store_gb_top_qty.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- s_purchase_sum: double (nullable = true)
 |-- avg_n_prod_id: double (nullable = true)
 |-- n_product: long (nullable = false)
 |-- rank_s_purchase: integer (nullable = true)
 |-- rank_prod_qty: integer (nullable = true)

root
 |-- client_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- s_purchase_sum: double (nullable = true)
 |-- avg_n_prod_id: double (nullable = true)
 |-- n_product: long (nullable = false)
 |-- rank_s_purchase: integer (nullable = true)
 |-- rank_prod_qty: integer (nullable = true)



(None, None)

In [101]:
df_store_gb_top_qty.count(), df_top_product_qty.count()

(400162, 400162)

In [100]:
df_store_gb_top_pur.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- s_purchase_sum: double (nullable = true)
 |-- avg_n_prod_id: double (nullable = true)
 |-- n_product: long (nullable = false)
 |-- rank_s_purchase: integer (nullable = true)
 |-- rank_prod_qty: integer (nullable = true)



In [77]:
tes = df_cli_pur.groupby('client_id').agg(fn.countDistinct('drop_transaction').alias('n'))
tes.groupby('n').agg(fn.countDistinct('client_id')).show()

+---+----------------+
|  n|count(client_id)|
+---+----------------+
|  1|          167362|
|  2|           32677|
+---+----------------+



In [74]:
df_cli_pur.groupby('drop_transaction','treatment_flg','target').agg(fn.countDistinct('transaction_id'),fn.countDistinct('client_id')).show()

+----------------+-------------+------+---------------------+----------------+
|drop_transaction|treatment_flg|target|count(transaction_id)|count(client_id)|
+----------------+-------------+------+---------------------+----------------+
|               0|            1|     1|              1321476|           53320|
|               0|            0|     0|               361034|           29165|
|               1|            0|     1|               253067|           19993|
|               1|            1|     1|               260104|           21141|
|               1|            0|     0|               118283|           16658|
|               1|            1|     0|               107540|           15032|
|               0|            1|     0|               329054|           26732|
|               0|            0|     1|              1274388|           50675|
+----------------+-------------+------+---------------------+----------------+



In [75]:
53320 + 29165 + 19993 + 21141 + 16658 + 15032 + 26732 + 50675

232716

In [63]:
df_cli_pur.groupby('drop_transaction').agg(fn.countDistinct('transaction_id'),fn.countDistinct('client_id')).show()

+----------------+---------------------+----------------+
|drop_transaction|count(transaction_id)|count(client_id)|
+----------------+---------------------+----------------+
|               1|              1479837|          145869|
|               0|              6565370|          319733|
+----------------+---------------------+----------------+



In [47]:
df_cli.filter(df_cli.client_id == '000048b7a6' ).show()

+----------+-------------------+---+------+-----------------+
| client_id|   first_issue_date|age|gender|first_redeem_date|
+----------+-------------------+---+------+-----------------+
|000048b7a6|2018-12-15 13:33:11| 68|     F|       2019-11-20|
+----------+-------------------+---+------+-----------------+



In [26]:
df_cli.limit(3).show()

+----------+-------------------+---+------+-------------------+
| client_id|   first_issue_date|age|gender|  first_redeem_date|
+----------+-------------------+---+------+-------------------+
|000012768d|2017-08-05 15:40:48| 45|     U|2018-01-04 19:30:07|
|000036f903|2017-04-10 13:54:23| 72|     F|2017-04-23 12:37:56|
|000048b7a6|2018-12-15 13:33:11| 68|     F|         2019-11-20|
+----------+-------------------+---+------+-------------------+



In [23]:
df_cli.select([fn.count(fn.when(fn.isnan(c) | fn.col(c).isNull(), c)).alias(c) for c in df_cli.columns]).show()

+---------+----------------+-----------------+---+------+------------------+
|client_id|first_issue_date|first_redeem_date|age|gender|first_redeem_date2|
+---------+----------------+-----------------+---+------+------------------+
|        0|               0|            35469|  0|     0|                 0|
+---------+----------------+-----------------+---+------+------------------+



In [42]:
df_cli.select(fn.min('first_redeem_date'),fn.max('first_redeem_date')).show()

+----------------------+----------------------+
|min(first_redeem_date)|max(first_redeem_date)|
+----------------------+----------------------+
|   2017-04-11 09:42:20|   2019-11-20 12:37:56|
+----------------------+----------------------+



In [11]:
df_cli.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- first_issue_date: string (nullable = true)
 |-- first_redeem_date: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)



In [12]:
df_pur.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- transaction_datetime: string (nullable = true)
 |-- regular_points_received: double (nullable = true)
 |-- express_points_received: double (nullable = true)
 |-- regular_points_spent: double (nullable = true)
 |-- express_points_spent: double (nullable = true)
 |-- purchase_sum: double (nullable = true)
 |-- store_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_quantity: double (nullable = true)
 |-- trn_sum_from_iss: double (nullable = true)
 |-- trn_sum_from_red: double (nullable = true)



In [160]:
spark.stop()