In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IMBA_DE').getOrCreate()

In [0]:
orders = spark.read.csv('/FileStore/tables/orders.csv', header=True, inferSchema=True)
order_products__prior = spark.read.csv('/FileStore/tables/order_products__prior.csv', header=True, inferSchema=True)
order_products__train = spark.read.csv('/FileStore/tables/order_products__train.csv', header=True, inferSchema=True)

In [0]:
order_products = order_products__prior.unionAll(order_products__train)
display(order_products)

order_id,product_id,add_to_cart_order,reordered
2,33120,1,1
2,28985,2,1
2,9327,3,0
2,45918,4,1
2,30035,5,0
2,17794,6,1
2,40141,7,1
2,1819,8,1
2,43668,9,0
3,33754,1,1


In [0]:
display(orders.head(10))

order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
2539329,1,prior,1,2,8,
2398795,1,prior,2,3,7,15.0
473747,1,prior,3,3,12,21.0
2254736,1,prior,4,4,7,29.0
431534,1,prior,5,4,15,28.0
3367565,1,prior,6,2,7,19.0
550135,1,prior,7,1,9,20.0
3108588,1,prior,8,1,14,14.0
2295261,1,prior,9,1,16,0.0
2550362,1,prior,10,4,8,30.0


### Create 4 data feature tables

In [0]:
# Create a dataframe called "order_products_prior"
orders.createOrReplaceTempView('orders')
order_products.createOrReplaceTempView('order_products')

order_products_prior = spark.sql("SELECT a.*, b.product_id, b.add_to_cart_order, b.reordered \
                                 FROM orders a JOIN order_products b ON a.order_id = b.order_id \
                                 WHERE a.eval_set = 'prior'")

display(order_products_prior)

order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order,product_id,add_to_cart_order,reordered
26,153404,prior,2,0,16,7.0,35951,1,0
26,153404,prior,2,0,16,7.0,24852,2,1
26,153404,prior,2,0,16,7.0,46206,3,0
26,153404,prior,2,0,16,7.0,25890,4,0
26,153404,prior,2,0,16,7.0,33120,5,0
26,153404,prior,2,0,16,7.0,21903,6,0
26,153404,prior,2,0,16,7.0,40545,7,0
26,153404,prior,2,0,16,7.0,47766,8,1
27,129389,prior,16,6,20,14.0,13176,1,1
27,129389,prior,16,6,20,14.0,30442,2,1


In [0]:
# Create a dataframe called "user_features_1"
user_features_1 = spark.sql("SELECT user_id, Max(order_number) AS user_orders, Sum(days_since_prior_order) AS user_period, \
                            Avg(days_since_prior_order) AS user_mean_days_since_prior \
                            FROM orders GROUP BY user_id")

display(user_features_1)

user_id,user_orders,user_period,user_mean_days_since_prior
148,8,69.0,9.857142857142858
463,8,133.0,19.0
471,7,65.0,10.833333333333334
496,83,280.0,3.414634146341464
833,12,233.0,21.181818181818183
1088,16,238.0,15.866666666666667
1238,30,364.0,12.551724137931034
1342,7,52.0,8.666666666666666
1580,10,185.0,20.55555555555556
1591,4,26.0,8.666666666666666


In [0]:
# Create a dataframe called "user_features_2"
order_products_prior.createOrReplaceTempView('order_products_prior')

user_features_2 = spark.sql("SELECT user_id, Count(*) AS user_total_products, Count(DISTINCT product_id) AS user_distinct_products, \
                            Sum(CASE WHEN reordered=1 THEN 1 ELSE 0 END) / Cast(Sum(CASE WHEN order_number > 1 THEN 1 ELSE 0 END) AS DOUBLE) AS user_reorder_ratio \
                            FROM order_products_prior GROUP BY user_id")

display(user_features_2)

user_id,user_total_products,user_distinct_products,user_reorder_ratio
156366,1097,400,0.6435826408125577
136631,240,93,0.630901287553648
22346,719,231,0.6961483594864479
80451,29,19,0.4166666666666667
19530,112,36,0.7037037037037037
15790,183,93,0.5421686746987951
197148,830,256,0.7086419753086419
74904,141,69,0.5413533834586466
66010,23,19,0.2857142857142857
4900,217,80,0.6586538461538461


In [0]:
# Create a dataframe called "up_features"
up_features = spark.sql("SELECT user_id, product_id, Count(*) AS up_orders, Min(order_number) AS up_first_order, \
                        Max(order_number) AS up_last_order, Avg(add_to_cart_order) AS up_average_cart_position \
                        FROM order_products_prior GROUP BY user_id, product_id")

display(up_features)

user_id,product_id,up_orders,up_first_order,up_last_order,up_average_cart_position
152610,11175,2,22,23,11.5
4076,47049,9,6,47,11.0
37763,9755,1,2,2,7.0
70318,49112,3,2,6,4.333333333333333
184695,4462,3,6,15,14.0
201623,10017,31,3,92,8.548387096774194
21237,35535,10,1,33,8.5
19259,44085,13,6,26,4.307692307692308
12016,47006,1,1,1,3.0
51227,32369,10,3,43,11.7


In [0]:
# Create a dataframe called "prd_features"
prd_features = spark.sql("SELECT product_id, Count(*) AS prod_orders, Sum(reordered) AS prod_reorders, \
                         Sum(CASE WHEN product_seq_time = 1 THEN 1 ELSE 0 END) AS prod_first_orders, \
                         Sum(CASE WHEN product_seq_time = 2 THEN 1 ELSE 0 END) AS prod_second_orders \
                         FROM (SELECT *, Rank()OVER(partition BY user_id, product_id ORDER BY order_number) AS product_seq_time \
                               FROM order_products_prior) \
                         GROUP BY product_id")

display(prd_features)

product_id,prod_orders,prod_reorders,prod_first_orders,prod_second_orders
26706,2427,1444,983,379
40574,484,235,249,89
148,4903,2778,2125,887
38311,1970,1459,511,256
37489,1346,677,669,276
29993,4895,3083,1812,848
31528,1929,958,971,354
35982,468,120,348,64
8638,4136,2146,1990,754
19204,2069,1338,731,340


### The final data will be ready for DA/DS

In [0]:
# Generate the final dataframe
user_features_1.createOrReplaceTempView('user_features_1')
user_features_2.createOrReplaceTempView('user_features_2')
up_features.createOrReplaceTempView('up_features')
prd_features.createOrReplaceTempView('prd_features')

data = spark.sql("""

with feature_1 as (
    select F1.user_id, F1.user_orders, F1.user_period, F1.user_mean_days_since_prior,
        F2.user_total_products, F2.user_distinct_products, F2.user_reorder_ratio
    from user_features_1 F1
    INNER JOIN user_features_2 F2 on F1.user_id = F2.user_id
),
    
feature_2 as (
    select F3.*, 
        UF.product_id, UF.up_orders, UF.up_first_order, UF.up_last_order, UF.up_average_cart_position
    from feature_1 F3
    INNER JOIN up_features UF on F3.user_id = UF.user_id
),

feature_3 as (
    select F4.*,
        PF.prod_orders, PF.prod_reorders, PF.prod_first_orders, PF.prod_second_orders
    from feature_2 F4
    INNER JOIN prd_features PF on F4.product_id = PF.product_id
)

select * from feature_3

""")

display(data)

user_id,user_orders,user_period,user_mean_days_since_prior,user_total_products,user_distinct_products,user_reorder_ratio,product_id,up_orders,up_first_order,up_last_order,up_average_cart_position,prod_orders,prod_reorders,prod_first_orders,prod_second_orders
31,21,111.0,5.55,299,190,0.3811188811188811,34234,1,14,14,13.0,6506,4422,2084,1110
31,21,111.0,5.55,299,190,0.3811188811188811,28577,2,12,16,28.0,1905,1091,814,342
76,10,190.0,21.11111111111111,32,25,0.25,2122,1,3,3,3.0,5,0,5,0
183,5,61.0,15.25,76,58,0.2857142857142857,28577,1,2,2,7.0,1905,1091,814,342
210,100,330.0,3.333333333333333,1476,233,0.8461538461538461,44596,1,77,77,4.0,937,272,665,141
210,100,330.0,3.333333333333333,1476,233,0.8461538461538461,31035,10,18,95,12.1,234,90,144,46
243,28,331.0,12.25925925925926,351,112,0.7029411764705882,11858,1,3,3,10.0,237,173,64,39
243,28,331.0,12.25925925925926,351,112,0.7029411764705882,33569,1,16,16,14.0,949,573,376,178
332,13,107.0,8.916666666666666,234,104,0.625,31528,4,3,6,2.5,1929,958,971,354
375,8,141.0,20.142857142857142,116,40,0.7378640776699029,31912,4,2,6,13.0,1805,941,864,341
