In [3]:
%matplotlib inline
import pyspark.sql.functions as F
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

In [4]:
aisles = spark.read \
    .options(header=True, inferSchema=True) \
    .csv("D:/DADOS USUARIO/Documents/springboard/capstone project/Instacart Kaggle/aisles.csv")
dptmts = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("D:/DADOS USUARIO/Documents/springboard/capstone project/Instacart Kaggle/departments.csv")
prod_in_orders = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("D:/DADOS USUARIO/Documents/springboard/capstone project/Instacart Kaggle/order_products__prior.csv")
all_orders = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("D:/DADOS USUARIO/Documents/springboard/capstone project/Instacart Kaggle/orders.csv")
train = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("D:/DADOS USUARIO/Documents/springboard/capstone project/Instacart Kaggle/order_products__train.csv")
products = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("D:/DADOS USUARIO/Documents/springboard/capstone project/Instacart Kaggle/products.csv")

# 1 - Merge of products, aisles and departments and delete dptmns & aisles

In [5]:
#The first join will connect the products tables
prod_full = products.join(aisles, on='aisle_id')
prod_full = prod_full.join(dptmts, on='department_id')
#delete aisles and dptmts dataframes
aisles.unpersist()
dptmts.unpersist()

DataFrame[department_id: int, department: string]

# 2 - Add the User_id to the prior and train dataset

In [6]:
#add user_id to train
train = train.join(all_orders, on='order_id')
train = train.drop('eval_set','order_number','order_dow','order_hour_of_day','days_since_prior_order')

# 3 - Inner join entre all orders e prod in orders

In [7]:
#add user_id to prior
orders_prod = prod_in_orders.join(all_orders, on='order_id')
prod_in_orders.unpersist()

DataFrame[order_id: int, product_id: int, add_to_cart_order: int, reordered: int]

In [8]:
orders_prod.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)



# 4 - Features from Products

In [9]:
from pyspark.sql import Window
windowval2 = Window.partitionBy('user_id', 'product_id').orderBy(['user_id', 'order_number', 'product_id']).rangeBetween(Window.unboundedPreceding, 0)
orders_prod = orders_prod.withColumn('prod_user_times', F.count('order_id').over(windowval2))

In [10]:
windowval3 = Window.partitionBy('product_id').orderBy('product_id').rangeBetween(Window.unboundedPreceding, 0)
orders_prod = orders_prod.withColumn('n_times_prod_ordered', F.count('order_id').over(windowval3))
orders_prod = orders_prod.withColumn('n_times_prod_reordered', F.sum('reordered').over(windowval3))

In [11]:
prod = orders_prod
prod1 = prod.select('product_id', 'prod_user_times').filter(prod.prod_user_times == 1).groupby('product_id').sum('prod_user_times')
prod2 = prod.select('product_id', 'prod_user_times').filter(prod.prod_user_times == 2).groupby('product_id').sum('prod_user_times')

In [12]:
prod1 = prod1.sort("product_id")
prod2 = prod2.sort("product_id")
prod1 = prod1.withColumnRenamed("sum(prod_user_times)", "first_ord_prod")
prod2 = prod2.withColumnRenamed("sum(prod_user_times)", "second_ord_prod")

In [13]:
prod = prod.drop('order_id','add_to_cart_order', 'reordered','user_id','eval_set','order_number','order_dow','order_hour_of_day','days_since_prior_order','prod_user_times')
prod = prod.groupby('product_id').agg({'n_times_prod_ordered': 'max', 'n_times_prod_reordered': 'max'})

In [14]:
prod = prod.sort("product_id")
prod = prod.withColumnRenamed("max(n_times_prod_ordered)", "times_prod_ordered")
prod = prod.withColumnRenamed("max(n_times_prod_reordered)", "times_prod_reordered")
prod = prod.join(prod1, on='product_id')
prod = prod.join(prod2, on='product_id')

In [15]:
prod = prod.withColumn('prod_reorder_probability', prod.second_ord_prod / prod.first_ord_prod)
prod = prod.withColumn('prod_reorder_times', 1 + prod.times_prod_reordered / prod.first_ord_prod)
prod = prod.withColumn('prod_reorder_ratio', prod.times_prod_reordered / prod.times_prod_ordered)

In [16]:
prod1.unpersist()
prod2.unpersist()

DataFrame[product_id: int, second_ord_prod: bigint]

In [17]:
prod = prod.drop('times_prod_reordered', 'first_ord_prod', 'second_ord_prod')

In [18]:
prod.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- times_prod_ordered: long (nullable = true)
 |-- prod_reorder_probability: double (nullable = true)
 |-- prod_reorder_times: double (nullable = true)
 |-- prod_reorder_ratio: double (nullable = true)



# 5 - Features from Users

In [19]:
users = all_orders.filter(all_orders.eval_set == 'prior')
users = users.withColumn('dspo', users.days_since_prior_order)

In [20]:
users.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)
 |-- dspo: double (nullable = true)



In [21]:
users = users.groupby('user_id').agg({'order_number': 'max', 'days_since_prior_order': 'sum', 'dspo': 'mean'})

In [22]:
us = orders_prod.groupby('user_id').count()
us = us.withColumnRenamed("count", "user_total_prod")

In [23]:
us1 = orders_prod.select('user_id', 'reordered').filter(orders_prod.reordered == 1).groupby('user_id').sum('reordered')
us2 = orders_prod.select('user_id', 'order_number').filter(orders_prod.order_number > 1).groupby('user_id').sum('order_number')
us3 = us1.join(us2, on='user_id')

In [24]:
us3 = us3.withColumnRenamed("sum(reordered)", "reord")
us3 = us3.withColumnRenamed("sum(order_number)", "on")

In [25]:
us3 = us3.withColumn('user_reorder_ratio', us3.reord / us3.on)

In [26]:
us3 = us3.drop('reord', 'on')

In [27]:
from pyspark.sql.functions import countDistinct
us4 = orders_prod.groupBy("user_id").agg(countDistinct("product_id"))
us4 = us4.withColumnRenamed('count(DISTINCT product_id)', 'distinct')

In [28]:
us = us.join(us1, on='user_id')
us = us.join(us2, on='user_id')
us = us.join(us3, on='user_id')
us = us.join(us4, on='user_id')
users = users.join(us, on='user_id')
users = users.withColumnRenamed('max(order_number)', 'user_orders')
users = users.withColumnRenamed('avg(dspo)', 'user_mean_days_since_prior')
users = users.withColumnRenamed('sum(days_since_prior_order)', 'user_period')
users = users.withColumnRenamed('sum(reordered)', 'sum_reordered')
users = users.withColumnRenamed('sum(order_number)', 'sum_order_number')
users = users.drop('sum_reordered', 'sum_order_number')

In [29]:
users = users.withColumn('user_average_basket', users.user_total_prod / users.user_orders)

In [30]:
us_orders = all_orders.select('user_id', 'order_id', 'eval_set', 'days_since_prior_order').filter(all_orders.eval_set != 'prior')

In [31]:
users = users.join(us_orders, on='user_id')

In [32]:
us.unpersist()
us1.unpersist()
us2.unpersist()
us3.unpersist()
us4.unpersist()
us_orders.unpersist()

DataFrame[user_id: int, order_id: int, eval_set: string, days_since_prior_order: double]

In [33]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- user_mean_days_since_prior: double (nullable = true)
 |-- user_orders: integer (nullable = true)
 |-- user_period: double (nullable = true)
 |-- user_total_prod: long (nullable = false)
 |-- user_reorder_ratio: double (nullable = true)
 |-- distinct: long (nullable = false)
 |-- user_average_basket: double (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- days_since_prior_order: double (nullable = true)



# 6 - Create a Database

In [34]:
data = orders_prod.withColumn('order_number2', orders_prod.order_number)

In [35]:
windowval4 = Window.partitionBy('user_id', 'product_id').rangeBetween(Window.unboundedPreceding, 0)
data = data.withColumn('count_order_id', F.min('order_id').over(windowval4))
data = data.withColumn('min_order_number', F.min('order_number').over(windowval4))
data = data.withColumn('max_order_number', F.max('order_number2').over(windowval4))
data = data.withColumn('mean_add_to_cart_order', F.avg('add_to_cart_order').over(windowval4))

In [36]:
data = data.drop('eval_set', 'order_id', 'reordered')

In [37]:
data = data.join(prod, on='product_id')
data = data.join(users, on='user_id')

In [38]:
data = data.withColumn('up_order_rate', data.count_order_id / data.user_orders)
data = data.withColumn('up_orders_since_last_order', data.user_orders - data.max_order_number)
data = data.withColumn('up_order_rate_since_first_order', data.count_order_id / (data.user_orders - data.min_order_number + 1))

In [39]:
train = train.select('user_id', 'product_id', 'reordered')

In [40]:
data = data.join(train, on=['user_id','product_id'], how='outer')

In [41]:
data = data.drop("days_since_prior_order", "order_number2", 'add_to_cart_order', 'order_dow', 'order_hour_of_day')

In [42]:
data.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- prod_user_times: long (nullable = true)
 |-- n_times_prod_ordered: long (nullable = true)
 |-- n_times_prod_reordered: long (nullable = true)
 |-- count_order_id: integer (nullable = true)
 |-- min_order_number: integer (nullable = true)
 |-- max_order_number: integer (nullable = true)
 |-- mean_add_to_cart_order: double (nullable = true)
 |-- times_prod_ordered: long (nullable = true)
 |-- prod_reorder_probability: double (nullable = true)
 |-- prod_reorder_times: double (nullable = true)
 |-- prod_reorder_ratio: double (nullable = true)
 |-- user_mean_days_since_prior: double (nullable = true)
 |-- user_orders: integer (nullable = true)
 |-- user_period: double (nullable = true)
 |-- user_total_prod: long (nullable = true)
 |-- user_reorder_ratio: double (nullable = true)
 |-- distinct: long (nullable = true)
 |-- user_average_basket: double (nulla

In [43]:
all_orders.unpersist()
train.unpersist()
products.unpersist()
prod_full.unpersist()
orders_prod.unpersist()
prod.unpersist()
users.unpersist()

DataFrame[user_id: int, user_mean_days_since_prior: double, user_orders: int, user_period: double, user_total_prod: bigint, user_reorder_ratio: double, distinct: bigint, user_average_basket: double, order_id: int, eval_set: string, days_since_prior_order: double]

# 7 - Train and Test datasets

In [44]:
train = data.filter(data.eval_set == 'train')

In [45]:
train = train.drop('eval_set', 'user_id', 'product_id', 'order_id')

In [46]:
train = train.na.fill(0, subset=['reordered'])

In [47]:
test = data.filter(data.eval_set == 'test')

In [48]:
test = test.drop('eval_set', 'user_id', 'reordered')

In [49]:
data.unpersist()

DataFrame[user_id: int, product_id: int, order_number: int, prod_user_times: bigint, n_times_prod_ordered: bigint, n_times_prod_reordered: bigint, count_order_id: int, min_order_number: int, max_order_number: int, mean_add_to_cart_order: double, times_prod_ordered: bigint, prod_reorder_probability: double, prod_reorder_times: double, prod_reorder_ratio: double, user_mean_days_since_prior: double, user_orders: int, user_period: double, user_total_prod: bigint, user_reorder_ratio: double, distinct: bigint, user_average_basket: double, order_id: int, eval_set: string, up_order_rate: double, up_orders_since_last_order: int, up_order_rate_since_first_order: double, reordered: int]

In [50]:
train.printSchema()

root
 |-- order_number: integer (nullable = true)
 |-- prod_user_times: long (nullable = true)
 |-- n_times_prod_ordered: long (nullable = true)
 |-- n_times_prod_reordered: long (nullable = true)
 |-- count_order_id: integer (nullable = true)
 |-- min_order_number: integer (nullable = true)
 |-- max_order_number: integer (nullable = true)
 |-- mean_add_to_cart_order: double (nullable = true)
 |-- times_prod_ordered: long (nullable = true)
 |-- prod_reorder_probability: double (nullable = true)
 |-- prod_reorder_times: double (nullable = true)
 |-- prod_reorder_ratio: double (nullable = true)
 |-- user_mean_days_since_prior: double (nullable = true)
 |-- user_orders: integer (nullable = true)
 |-- user_period: double (nullable = true)
 |-- user_total_prod: long (nullable = true)
 |-- user_reorder_ratio: double (nullable = true)
 |-- distinct: long (nullable = true)
 |-- user_average_basket: double (nullable = true)
 |-- up_order_rate: double (nullable = true)
 |-- up_orders_since_last_

In [51]:
test.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- prod_user_times: long (nullable = true)
 |-- n_times_prod_ordered: long (nullable = true)
 |-- n_times_prod_reordered: long (nullable = true)
 |-- count_order_id: integer (nullable = true)
 |-- min_order_number: integer (nullable = true)
 |-- max_order_number: integer (nullable = true)
 |-- mean_add_to_cart_order: double (nullable = true)
 |-- times_prod_ordered: long (nullable = true)
 |-- prod_reorder_probability: double (nullable = true)
 |-- prod_reorder_times: double (nullable = true)
 |-- prod_reorder_ratio: double (nullable = true)
 |-- user_mean_days_since_prior: double (nullable = true)
 |-- user_orders: integer (nullable = true)
 |-- user_period: double (nullable = true)
 |-- user_total_prod: long (nullable = true)
 |-- user_reorder_ratio: double (nullable = true)
 |-- distinct: long (nullable = true)
 |-- user_average_basket: double (nullable = true)
 |-- order_id: integer (null

# 8 - Write the data file

In [53]:
train.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("train.csv")

In [54]:
test.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("test.csv")

In [None]:
#from pyspark.sql import Window

#windowval = Window.partitionBy('user_id').orderBy('order_number').rangeBetween(Window.unboundedPreceding, 0)
#windowval2 = Window.partitionBy('user_id', 'product_id').orderBy('order_number')
#all_orders = all_orders.withColumn('dspo_cum_sum', F.sum('days_since_prior_order').over(windowval))

In [None]:
#from pyspark.sql import Window
#opf = order_prod_full
#windowval = Window.partitionBy('user_id','product_id').orderBy('order_number').rangeBetween(Window.unboundedPreceding, 0)
#opf = opf.withColumn('MIN_dspo_cum_sum', F.min('dspo_cum_sum').over(windowval))
#opf = opf.withColumn('MAX_dspo_cum_sum', F.max('dspo_cum_sum').over(windowval))
#opf = opf.withColumn('COUNT_dspo_cum_sum', F.count('dspo_cum_sum').over(windowval))

In [None]:
#opf = opf.withColumn('SND_MAX_dspo_cum_sum', F.lag('dspo_cum_sum',1,0).over(windowval2))

In [None]:
#opf = opf.withColumn('DIFF_dspo_cum_sum', opf.MAX_dspo_cum_sum - opf.MIN_dspo_cum_sum)
#opf = opf.withColumn('DIFF2_dspo_cum_sum', opf.MAX_dspo_cum_sum - opf.SND_MAX_dspo_cum_sum)
#opf = opf.withColumn('fixed_freq', opf.DIFF_dspo_cum_sum / (opf.COUNT_dspo_cum_sum - 1))
#opf = opf.withColumn('flex_freq_1', opf.DIFF2_dspo_cum_sum / (opf.COUNT_dspo_cum_sum - 1))
#opf = opf.withColumn('flex_freq_2', opf.DIFF2_dspo_cum_sum / (opf.COUNT_dspo_cum_sum - 1))
#opf = opf.withColumnRenamed('COUNT_dspo_cum_sum', 'count_user_prod')

In [None]:
#opf = opf.drop('order_id', 'add_to_cart_order', 'reordered', 'eval_set', 'order_dow', 'order_hour_of_day', 'department_id', 'aisle_id', 'product_name', 'aisle', 'department')

In [None]:
#opf2 = opf.groupby(['user_id', 'product_id']).agg({'count_user_prod': 'max', 'flex_freq_1': 'mean', 'flex_freq_2': 'stddev_pop'})

In [None]:
#opf2.show()

In [None]:
#3opf2.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("opf2.csv")