In [363]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
# from pyspark.sql.functions import rank,desc,when,datediff,month
from pyspark.sql.functions import *
# from pyspark.sql.functions import countDistinct,max

# Create SparkSession 
spark = SparkSession.builder.appName("dany's diner").getOrCreate()

In [200]:
sales_df = spark.read.csv("sales.csv",header=True)
sales_df = sales_df.withColumn('product_id',sales_df.product_id.cast('int'))
sales_df = sales_df.withColumn('order_date',sales_df.order_date.cast('date'))
menu_df = spark.read.csv("menu.csv",header=True)
menu_df = menu_df.withColumn('product_id',menu_df.product_id.cast('int'))
menu_df = menu_df.withColumn('price',menu_df.price.cast('int'))
members_df = spark.read.csv("members.csv",header=True)
members_df = members_df.withColumn('join_date',members_df.join_date.cast('date'))


Question1 : What is the total amount each customer spent at the restaurant?

In [201]:
total_amt_per_customer = sales_df.join(menu_df,'product_id','inner').groupBy("customer_id").agg({"price":"sum"}).orderBy("customer_id").withColumnRenamed("sum(price)","total_price")

In [128]:
total_amt_per_customer.show()

+-----------+-----------+
|customer_id|total_price|
+-----------+-----------+
|          A|         76|
|          B|         74|
|          C|         36|
+-----------+-----------+



Question2 : How many days has each customer visited the restaurant?

In [129]:
total_days_per_customer = sales_df.groupBy("customer_id").agg(countDistinct('order_date')).orderBy("customer_id")

In [130]:
total_days_per_customer.show()

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          A|                4|
|          B|                6|
|          C|                2|
+-----------+-----------------+



Question3 :  What was the first item from the menu purchased by each customer?

In [144]:
windowSpec = Window.partitionBy("customer_id").orderBy("order_date")
first_item_each_customer = sales_df
first_item_each_customer = first_item_each_customer.withColumn('rank',rank().over(windowSpec))
# first_item_each_customer = sales_df.filter(sales_df.rank==1)


In [152]:
first_item_each_customer.filter("rank==1").join(menu_df,'product_id').show()

+----------+-----------+----------+----+------------+-----+
|product_id|customer_id|order_date|rank|product_name|price|
+----------+-----------+----------+----+------------+-----+
|         1|          A|2021-01-01|   1|       sushi|   10|
|         2|          A|2021-01-01|   1|       curry|   15|
|         2|          B|2021-01-01|   1|       curry|   15|
|         3|          C|2021-01-01|   1|       ramen|   12|
|         3|          C|2021-01-01|   1|       ramen|   12|
+----------+-----------+----------+----+------------+-----+



Question4 :  What is the most purchased item on the menu and how many times was it purchased by all customers?

In [173]:
most_purchased = sales_df.groupBy('product_id').agg({'product_id':'count'}).withColumnRenamed('count(product_id)','count').orderBy('count',ascending=0).limit(1).join(menu_df,'product_id')

In [174]:
most_purchased.show()

+----------+-----+------------+-----+
|product_id|count|product_name|price|
+----------+-----+------------+-----+
|         3|    8|       ramen|   12|
+----------+-----+------------+-----+



Question5 :  Which item was the most popular for each customer?

In [211]:
most_popular_per_cust = sales_df
most_popular_per_cust = most_popular_per_cust.groupBy('customer_id','product_id').agg({'product_id':'count'}).orderBy('count(product_id)')

In [212]:
windowSpec_per_customer = Window.partitionBy('customer_id').orderBy(desc('count(product_id)'))
most_popular_per_cust = most_popular_per_cust.withColumn('rank',rank().over(windowSpec_per_customer))

In [213]:
most_popular_per_cust=most_popular_per_cust.filter("rank==1")

In [215]:
most_popular_per_cust = most_popular_per_cust.join(menu_df,'product_id')

In [223]:
most_popular_per_cust.select('customer_id','product_name').show()

+-----------+------------+
|customer_id|product_name|
+-----------+------------+
|          A|       ramen|
|          B|       sushi|
|          B|       ramen|
|          B|       curry|
|          C|       ramen|
+-----------+------------+



Question6 :  Which item was purchased first by the customer after they became a member?

In [226]:
purchased_first_after_membership = sales_df.join(members_df,'customer_id')

In [234]:
purchased_first_after_membership=purchased_first_after_membership.filter("order_date>=join_date")

In [235]:
windowSpec_purchased_after_membership = Window.partitionBy('customer_id').orderBy('order_date')

In [240]:
purchased_first_after_membership=purchased_first_after_membership.withColumn('rank',rank().over(windowSpec_purchased_after_membership))

In [242]:
purchased_first_after_membership=purchased_first_after_membership.filter("rank==1")

In [243]:
purchased_first_after_membership.show()

+-----------+----------+----------+----------+----+
|customer_id|order_date|product_id| join_date|rank|
+-----------+----------+----------+----------+----+
|          A|2021-01-07|         2|2021-01-07|   1|
|          B|2021-01-11|         1|2021-01-09|   1|
+-----------+----------+----------+----------+----+



In [244]:
purchased_first_after_membership = purchased_first_after_membership.join(menu_df,'product_id')

In [245]:
purchased_first_after_membership.show()

+----------+-----------+----------+----------+----+------------+-----+
|product_id|customer_id|order_date| join_date|rank|product_name|price|
+----------+-----------+----------+----------+----+------------+-----+
|         2|          A|2021-01-07|2021-01-07|   1|       curry|   15|
|         1|          B|2021-01-11|2021-01-09|   1|       sushi|   10|
+----------+-----------+----------+----------+----+------------+-----+



In [246]:
purchased_first_after_membership = purchased_first_after_membership.select('customer_id','order_date','product_name').show()

+-----------+----------+------------+
|customer_id|order_date|product_name|
+-----------+----------+------------+
|          A|2021-01-07|       curry|
|          B|2021-01-11|       sushi|
+-----------+----------+------------+



Question7 :  Which item was purchased just before the customer became a member?

In [275]:
purchased_just_before_membership = sales_df.join(members_df,'customer_id')

In [276]:
purchased_just_before_membership=purchased_just_before_membership.filter('order_date<join_date')

In [277]:
purchased_just_before_membership = purchased_just_before_membership.join(menu_df,'product_id')

In [278]:
windowSpec_purchased_just_before_membership=Window.partitionBy('customer_id').orderBy(desc('order_date'))
purchased_just_before_membership = purchased_just_before_membership.withColumn('rank',rank().over(windowSpec_purchased_just_before_membership))

In [273]:
purchased_just_before_membership=purchased_just_before_membership.filter('rank==1')

Question8 :  What is the total items and amount spent for each member before they became a member?

In [286]:
sum_purchased_just_before_membership=purchased_just_before_membership

In [289]:
sum_purchased_just_before_membership=sum_purchased_just_before_membership.groupBy('customer_id').agg({'price':'sum'})

In [290]:
sum_purchased_just_before_membership.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          B|        40|
|          A|        25|
+-----------+----------+



Question9 :  If each $1 spent equates to 10 points and sushi has a 2x points multiplier — how many points would each customer have?

In [305]:
total_points = sales_df.join(menu_df,'product_id')

In [306]:
total_points = total_points.groupBy('customer_id','product_name').agg({'price':'sum'}).withColumnRenamed('sum(price)','total_price')

In [313]:
total_points.show()

+-----------+------------+-----------+
|customer_id|product_name|total_price|
+-----------+------------+-----------+
|          A|       sushi|         10|
|          A|       ramen|         36|
|          B|       sushi|         20|
|          B|       ramen|         24|
|          A|       curry|         30|
|          C|       ramen|         36|
|          B|       curry|         30|
+-----------+------------+-----------+



In [322]:
total_points=total_points.withColumn('points',when((total_points.product_name == 'sushi'),total_points.total_price*20).otherwise(total_points.total_price*10))

In [325]:
total_points = total_points.groupBy('customer_id').agg({'points':'sum'})

In [326]:
total_points.show()

+-----------+-----------+
|customer_id|sum(points)|
+-----------+-----------+
|          B|        940|
|          C|        360|
|          A|        860|
+-----------+-----------+



Question10 :  In the first week after a customer joins the program (including their join date) they earn 2x points on all items, not just sushi — how many points do customer A and B have at the end of January?

In [342]:
points_when_joined = sales_df.join(menu_df,'product_id').join(members_df,'customer_id')

In [352]:
points_when_joined = points_when_joined.filter(month('order_date')==1)

In [353]:
points_when_joined = points_when_joined.withColumn('points',when((datediff(points_when_joined.join_date,points_when_joined.order_date)<6),points_when_joined.price*20).when((points_when_joined.product_name=='sushi'),points_when_joined.price*20).otherwise(points_when_joined.price*10))

In [354]:
points_when_joined.show()

+-----------+----------+----------+------------+-----+----------+------+
|customer_id|product_id|order_date|product_name|price| join_date|points|
+-----------+----------+----------+------------+-----+----------+------+
|          A|         1|2021-01-01|       sushi|   10|2021-01-07|   200|
|          A|         2|2021-01-01|       curry|   15|2021-01-07|   150|
|          A|         2|2021-01-07|       curry|   15|2021-01-07|   300|
|          A|         3|2021-01-10|       ramen|   12|2021-01-07|   240|
|          A|         3|2021-01-11|       ramen|   12|2021-01-07|   240|
|          A|         3|2021-01-11|       ramen|   12|2021-01-07|   240|
|          B|         2|2021-01-01|       curry|   15|2021-01-09|   150|
|          B|         2|2021-01-02|       curry|   15|2021-01-09|   150|
|          B|         1|2021-01-04|       sushi|   10|2021-01-09|   200|
|          B|         1|2021-01-11|       sushi|   10|2021-01-09|   200|
|          B|         3|2021-01-16|       ramen|   

In [355]:
points_when_joined.groupBy('customer_id').agg({'points':'sum'}).show()

+-----------+-----------+
|customer_id|sum(points)|
+-----------+-----------+
|          B|        940|
|          A|       1370|
+-----------+-----------+



-------------BONUS ROUND ----------------------------------------

Recreate the table with: customer_id, order_date, product_name, price, member (Y/N)

In [405]:
consolidated_info = sales_df.join(menu_df,'product_id').join(members_df,'customer_id','left')

In [406]:
consolidated_info = consolidated_info.withColumn('member',when((consolidated_info.order_date<consolidated_info.join_date),'N').otherwise('Y'))

In [407]:
consolidated_info = consolidated_info.select('customer_id','order_date','product_name','price','member')

In [408]:
consolidated_info.show()

+-----------+----------+------------+-----+------+
|customer_id|order_date|product_name|price|member|
+-----------+----------+------------+-----+------+
|          A|2021-01-01|       sushi|   10|     N|
|          A|2021-01-01|       curry|   15|     N|
|          A|2021-01-07|       curry|   15|     Y|
|          A|2021-01-10|       ramen|   12|     Y|
|          A|2021-01-11|       ramen|   12|     Y|
|          A|2021-01-11|       ramen|   12|     Y|
|          B|2021-01-01|       curry|   15|     N|
|          B|2021-01-02|       curry|   15|     N|
|          B|2021-01-04|       sushi|   10|     N|
|          B|2021-01-11|       sushi|   10|     Y|
|          B|2021-01-16|       ramen|   12|     Y|
|          B|2021-02-01|       ramen|   12|     Y|
|          C|2021-01-01|       ramen|   12|     Y|
|          C|2021-01-01|       ramen|   12|     Y|
|          C|2021-01-07|       ramen|   12|     Y|
+-----------+----------+------------+-----+------+



Rank All The Things - Danny also requires further information about the ranking of customer products, but he purposely does not need the ranking for non-member purchases so he expects null ranking values for the records when customers are not yet part of the loyalty program.

In [416]:
windowSpec_rank_of_all_things = Window.partitionBy('customer_id','member').orderBy('order_date')

In [417]:
rank_of_all_things = consolidated_info.withColumn('ranking',
                                                  when((consolidated_info.member=="N"),'Null').otherwise(rank().over(windowSpec_rank_of_all_things)))

In [418]:
rank_of_all_things.show()

+-----------+----------+------------+-----+------+-------+
|customer_id|order_date|product_name|price|member|ranking|
+-----------+----------+------------+-----+------+-------+
|          A|2021-01-01|       sushi|   10|     N|   Null|
|          A|2021-01-01|       curry|   15|     N|   Null|
|          A|2021-01-07|       curry|   15|     Y|      1|
|          A|2021-01-10|       ramen|   12|     Y|      2|
|          A|2021-01-11|       ramen|   12|     Y|      3|
|          A|2021-01-11|       ramen|   12|     Y|      3|
|          B|2021-01-01|       curry|   15|     N|   Null|
|          B|2021-01-02|       curry|   15|     N|   Null|
|          B|2021-01-04|       sushi|   10|     N|   Null|
|          B|2021-01-11|       sushi|   10|     Y|      1|
|          B|2021-01-16|       ramen|   12|     Y|      2|
|          B|2021-02-01|       ramen|   12|     Y|      3|
|          C|2021-01-01|       ramen|   12|     Y|      1|
|          C|2021-01-01|       ramen|   12|     Y|      