In [1]:
%matplotlib inline
import pyspark.sql.functions as F
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

In [2]:
aisles = spark.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/aisles.csv")
dptmts = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/departments.csv")
prod_in_orders = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/order_products__prior.csv")
all_orders = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/orders.csv")
train = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/order_products__train.csv")
products = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/products.csv")
sample_sub = sqlContext.read \
    .options(header=True, inferSchema=True) \
    .csv("Instacart Kaggle/sample_submission.csv")

In [3]:
from pyspark.sql import Window

windowval = Window.partitionBy('user_id').orderBy('order_number').rangeBetween(Window.unboundedPreceding, 0)
windowval2 = Window.partitionBy('user_id', 'product_id').orderBy('order_number')
all_orders = all_orders.withColumn('dspo_cum_sum', F.sum('days_since_prior_order').over(windowval))

In [4]:
#The first join will connect the products tables
prod_full = products.join(aisles, on='aisle_id')
prod_full = prod_full.join(dptmts, on='department_id')
#The second join will connect the orders tables
order_full = prod_in_orders.join(all_orders, on='order_id')
# The third join will connect products and orders
order_prod_full = order_full.join(prod_full, on='product_id')

In [5]:
from pyspark.sql import Window
opf = order_prod_full
windowval = Window.partitionBy('user_id','product_id').orderBy('order_number').rangeBetween(Window.unboundedPreceding, 0)
opf = opf.withColumn('MIN_dspo_cum_sum', F.min('dspo_cum_sum').over(windowval))
opf = opf.withColumn('MAX_dspo_cum_sum', F.max('dspo_cum_sum').over(windowval))
opf = opf.withColumn('COUNT_dspo_cum_sum', F.count('dspo_cum_sum').over(windowval))

In [6]:
opf = opf.withColumn('SND_MAX_dspo_cum_sum', F.lag('dspo_cum_sum',1,0).over(windowval2))

In [7]:
opf = opf.withColumn('DIFF_dspo_cum_sum', opf.MAX_dspo_cum_sum - opf.MIN_dspo_cum_sum)
opf = opf.withColumn('DIFF2_dspo_cum_sum', opf.MAX_dspo_cum_sum - opf.SND_MAX_dspo_cum_sum)
opf = opf.withColumn('fixed_freq', opf.DIFF_dspo_cum_sum / (opf.COUNT_dspo_cum_sum - 1))
opf = opf.withColumn('flex_freq_1', opf.DIFF2_dspo_cum_sum / (opf.COUNT_dspo_cum_sum - 1))
opf = opf.withColumn('flex_freq_2', opf.DIFF2_dspo_cum_sum / (opf.COUNT_dspo_cum_sum - 1))
opf = opf.withColumnRenamed('COUNT_dspo_cum_sum', 'count_user_prod')

In [8]:
opf = opf.drop('order_id', 'add_to_cart_order', 'reordered', 'eval_set', 'order_dow', 'order_hour_of_day', 'department_id', 'aisle_id', 'product_name', 'aisle', 'department')

In [9]:
opf2 = opf.groupby(['user_id', 'product_id']).agg({'count_user_prod': 'max', 'flex_freq_1': 'mean', 'flex_freq_2': 'stddev_pop'})

In [10]:
opf2.show()

+-------+----------+--------------------+------------------+-----------------------+
|user_id|product_id|max(count_user_prod)|  avg(flex_freq_1)|stddev_pop(flex_freq_2)|
+-------+----------+--------------------+------------------+-----------------------+
|      7|     29894|                   1|              null|                   null|
|      7|     29993|                   3|              41.5|                    9.5|
|     14|     23271|                   1|              null|                   null|
|     14|     34234|                   1|              null|                   null|
|     31|     28577|                   2|              26.0|                    0.0|
|     31|     34234|                   1|              null|                   null|
|     37|     31528|                   1|              null|                   null|
|     38|      8638|                   1|              null|                   null|
|     40|     26706|                   0|              null|     

In [11]:
opf2.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("opf2.csv")