In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("structured_streaming") \
    .getOrCreate()

In [2]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [3]:
# Import Data
aisles = spark.read.csv("/FileStore/tables/aisles.csv", header=True, inferSchema=True)
departments = spark.read.csv("/FileStore/tables/departments.csv", header=True, inferSchema=True)
#order_products_prior = spark.read.csv("/FileStore/tables/order_products__prior.csv", header=True, inferSchema=True)
order_products_train = spark.read.csv("/FileStore/tables/order_products__train.csv", header=True, inferSchema=True)
orders = spark.read.csv("/FileStore/tables/orders.csv", header=True, inferSchema=True)
products = spark.read.csv("/FileStore/tables/products.csv", header=True, inferSchema=True)


In [4]:
%python
orders.createOrReplaceTempView("orders")
orders.show(10)

In [5]:
%sql
select 
  count(order_id) as total_orders, 
  (case 
     when order_dow = '0' then 'Sunday'
     when order_dow = '1' then 'Monday'
     when order_dow = '2' then 'Tuesday'
     when order_dow = '3' then 'Wednesday'
     when order_dow = '4' then 'Thursday'
     when order_dow = '5' then 'Friday'
     when order_dow = '6' then 'Saturday'              
   end) as day_of_week 
  from orders  
 group by order_dow 
 order by total_orders desc

total_orders,day_of_week
600905,Sunday
587478,Monday
467260,Tuesday
453368,Friday
448761,Saturday
436972,Wednesday
426339,Thursday


In [6]:
%sql
select 
  count(order_id) as total_orders, 
  order_hour_of_day as hour 
  from orders 
 group by order_hour_of_day 
 order by order_hour_of_day

total_orders,hour
22758,0
12398,1
7539,2
5474,3
5527,4
9569,5
30529,6
91868,7
178201,8
257812,9


In [7]:
%python
products.createOrReplaceTempView("products")
orders.createOrReplaceTempView("orders")
departments.createOrReplaceTempView("departments")

In [8]:
%sql
select d.department, count(distinct p.product_id) as products
  from products p
    inner join departments d
      on d.department_id = p.department_id
 group by d.department
 order by products desc
 limit 10

department,products
personal care,6563
snacks,6264
pantry,5371
beverages,4365
frozen,4007
dairy eggs,3449
household,3084
canned goods,2092
dry goods pasta,1858
produce,1684


In [9]:
%python
products.createOrReplaceTempView("products")
orders.createOrReplaceTempView("orders")
departments.createOrReplaceTempView("departments")
aisles.createOrReplaceTempView("aisles")
order_products_train.createOrReplaceTempView("order_products_train")

In [10]:
# Organize the data by shopping basket
from pyspark.sql.functions import collect_set, col, count
rawData = spark.sql("select p.product_name, o.order_id from products p inner join order_products_train o where o.product_id = p.product_id")
baskets = rawData.groupBy('order_id').agg(collect_set('product_name').alias('items'))
baskets.createOrReplaceTempView('baskets')

In [11]:
rawData.show(10,truncate = False)

In [12]:
baskets.show(20)

In [13]:
%python
df1 = baskets.join(rawData, ["order_id"])
df1.show(10)

In [14]:
df = df1["items","order_id"]
df.show(10)

In [15]:
from pyspark.ml.fpm import FPGrowth


In [16]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.06)
model = fpGrowth.fit(df)

In [17]:
# Display frequent itemsets.
model.freqItemsets.show(20)


In [18]:
model.associationRules.show(5)


In [19]:
# transform examines the input items against all the association rules and summarize the
# consequents as prediction
df4 = model.transform(df).show()