In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType,VarcharType, StringType

In [0]:
sales = spark.read.csv("dbfs:/FileStore/CaseStudy/Dannys_Diner/sales.csv",header=True, inferSchema=True)
sales.printSchema()
display(sales)

root
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: integer (nullable = true)



customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


In [0]:
menu = spark.read.csv("dbfs:/FileStore/CaseStudy/Dannys_Diner/menu.csv",header=True,inferSchema=True)
menu.printSchema()
display(menu)

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)



product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


In [0]:
members = spark.read.csv("dbfs:/FileStore/CaseStudy/Dannys_Diner/members.csv", header=True,inferSchema=True)
members.printSchema()
display(members)

root
 |-- customer_id: string (nullable = true)
 |-- join_date: date (nullable = true)



customer_id,join_date
A,2021-01-07
B,2021-01-09


In [0]:
sales.createOrReplaceTempView("Sales")
menu.createOrReplaceTempView("Menu")
members.createOrReplaceTempView("Members")

1. What is the total amount each customer spent at the restaurant?

In [0]:
df1 = sales.join(menu,"product_id","inner") \
        .groupBy("customer_id").sum("price") \
        .orderBy("customer_id")         

df1.show()        

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|        76|
|          B|        74|
|          C|        36|
+-----------+----------+



In [0]:
%sql

SELECT s.customer_id as members, SUM(m.price) as totalSpent FROM Sales s
JOIN Menu m ON s.product_id = m.product_id
GROUP BY s.customer_id
ORDER BY members

members,totalSpent
A,76
B,74
C,36


2. How many days has each customer visited the restaurant?

In [0]:
from pyspark.sql.functions import countDistinct,count

In [0]:
df2 = sales.groupBy("customer_id") \
        .agg(countDistinct("order_date").alias("count")) \
        .orderBy("customer_id")

display(df2)

customer_id,count
A,4
B,6
C,2


In [0]:
%sql

SELECT customer_id, COUNT(DISTINCT order_date) AS `count` FROM Sales 
GROUP BY customer_id
ORDER BY customer_id

customer_id,count
A,4
B,6
C,2


3. What was the first item from the menu purchased by each customer?

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col

In [0]:
windowSpec83 = Window.partitionBy("customer_id").orderBy(col("order_date").asc())

df3 = sales.join(menu,"product_id") \
        .withColumn("rank", dense_rank().over(windowSpec3)) \
        .filter("rank == 1") \
        .select("customer_id","product_name") \
        .distinct()

display(df3)

customer_id,product_name
A,sushi
A,curry
B,curry
C,ramen


In [0]:
%sql
-- windows function alias name cannot be used for filtering directly. Need to create a cte and then we can use it for filtering

WITH ranked_sales AS (
    SELECT
        s.customer_id AS customer,
        m.product_name AS product,
        DENSE_RANK() OVER (PARTITION BY s.customer_id ORDER BY order_date ASC) AS rank
    FROM Sales s
    JOIN Menu m ON s.product_id = m.product_id
)
SELECT DISTINCT customer, product
FROM ranked_sales
WHERE rank = 1
ORDER BY customer;


customer,product
A,sushi
A,curry
B,curry
C,ramen


4. What is the most purchased item on the menu and how many times was it purchased by all customers?

In [0]:
from pyspark.sql.functions import desc

In [0]:
%sql

SELECT product_name, count(1) as cnt FROM sales s 
JOIN menu m ON s.product_id = m.product_id
GROUP BY product_name
ORDER BY cnt DESC 
LIMIT 1

product_name,cnt
ramen,8


In [0]:
df4 = sales.join(menu,"product_id") \
        .groupBy("product_name").count() \
        .orderBy(desc("count")) \
        .limit(1)

display(df4)

product_name,count
ramen,8


5. Which item was the most popular for each customer?

In [0]:
%sql

WITH popular AS (
SELECT customer_id, product_name, COUNT(1) AS order_count,
DENSE_RANK() OVER(PARTITION BY customer_id ORDER BY COUNT(s.customer_id) DESC) AS rank
FROM sales s
JOIN menu m ON s.product_id = m.product_id
GROUP BY customer_id, product_name
)
SELECT customer_id, product_name, order_count
FROM popular
WHERE rank = 1

customer_id,product_name,order_count
A,ramen,3
B,sushi,2
B,ramen,2
B,curry,2
C,ramen,3


In [0]:
windowSpec5 = Window.partitionBy("customer_id").orderBy(desc("order_count"))

df5 = sales.join(menu,"product_id") \
        .groupBy("customer_id","product_name") \
        .agg(count("product_id").alias("order_count")) \
        .withColumn("rank", dense_rank().over(windowSpec5)) \
        .filter("rank = 1") \
        .select("customer_id","product_name","order_count")

display(df5)

customer_id,product_name,order_count
A,ramen,3
B,sushi,2
B,ramen,2
B,curry,2
C,ramen,3


6. Which item was purchased first by the customer after they became a member?

In [0]:
from pyspark.sql.functions import row_number

In [0]:
%sql

WITH become_member AS(
SELECT m.customer_id,s.product_id,
ROW_NUMBER() OVER(PARTITION BY m.customer_id ORDER BY s.order_date) AS rw
FROM members m
JOIN sales s ON m.customer_id=s.customer_id
AND s.order_date >= m.join_date
)
SELECT b.customer_id,menu.product_id,product_name FROM become_member b
JOIN menu ON b.product_id = menu.product_id
WHERE rw = 1
ORDER BY b.customer_id

customer_id,product_id,product_name
A,2,curry
B,1,sushi


In [0]:
WindowSpec6 = Window.partitionBy("customer_id").orderBy("order_date")

df6 = sales.join(members,"customer_id") \
        .filter(sales.order_date >= members.join_date) \
        .withColumn("rank", row_number().over(WindowSpec6) ) \
        .join(menu,"product_id") \
        .select("customer_id","product_name","rank") \
        .where("rank =1")
        

display(df6)

customer_id,product_name,rank
A,curry,1
B,sushi,1


7. Which item was purchased just before the customer became a member?

In [0]:
%sql

WITH not_member AS(
SELECT m.customer_id,s.product_id,
ROW_NUMBER() OVER(PARTITION BY m.customer_id ORDER BY s.order_date DESC) AS rw
FROM members m
JOIN sales s ON m.customer_id=s.customer_id
AND s.order_date < m.join_date
)
SELECT n.customer_id,menu.product_id,product_name FROM not_member n
JOIN menu ON n.product_id = menu.product_id
WHERE rw = 1
ORDER BY n.customer_id

customer_id,product_id,product_name
A,1,sushi
B,1,sushi


In [0]:
WindowSpec7 = Window.partitionBy("customer_id").orderBy(desc("order_date"))

df7 = sales.join(members,"customer_id") \
        .filter(sales.order_date < members.join_date) \
        .withColumn("rank", row_number().over(WindowSpec7) ) \
        .join(menu,"product_id") \
        .select("customer_id","product_name","rank") \
        .where("rank =1")
        

display(df7)

customer_id,product_name,rank
A,sushi,1
B,sushi,1


8. What is the total items and amount spent for each member before they became a member?

In [0]:
%sql

SELECT mem.customer_id, COUNT(s.product_id) AS totalPrdctCount, SUM(m.price) FROM Sales s
JOIN Menu m ON s.product_id = m.product_id
JOIN Members mem ON mem.customer_id = s.customer_id
AND s.order_date < mem.join_date
GROUP BY mem.customer_id
ORDER BY mem.customer_id

customer_id,totalPrdctCount,sum(price)
A,2,25
B,3,40


In [0]:
from pyspark.sql.functions import *

In [0]:
#https://stackoverflow.com/questions/46009493/why-does-pyspark-agg-tell-me-that-datatypes-are-incorrect-here
# sum("price").alias("total") got error in this aggegration step as pyspark got confused between python sum and pyspark sum function ---> this might have been the error but it was not
#https://medium.com/@ragavi.indra/handling-a-pyspark-typeerror-unsupported-operand-type-s-for-int-and-str-bdc003b2c6a6
#In this scenario, I forgot a tiny but essential step: importing the ‘sum’ function!

df8 = sales.join(menu,"product_id") \
        .join(members,"customer_id") \
        .filter(sales.order_date < members.join_date) \
        .withColumn("price",col("price").cast("Integer")) \
        .groupBy("customer_id") \
        .agg(count("product_id").alias("total_items"),
              sum(col("price")).alias("total")
        )

display(df8)

customer_id,total_items,total
B,3,40
A,2,25


9. If each $1 spent equates to 10 points and sushi has a 2x points multiplier - how many points would each customer have?

In [0]:
%sql
-- in group by case when column name does not work

WITH points_cte AS (
    SELECT product_id,
                    CASE
                        WHEN product_name = "sushi"  THEN 20*price
                        ELSE 10*price
                    END AS points 
FROM menu
)
SELECT s.customer_id, SUM(points) FROM sales s
JOIN points_cte c ON s.product_id = c.product_id
GROUP BY s.customer_id 
ORDER BY s.customer_id


/*SELECT customer_id,
                    CASE
                        WHEN m.product_name = "sushi"  THEN 20*price
                        ELSE 10*price
                    END AS points 
FROM sales s
JOIN menu m ON s.product_id = m.product_id
GROUP BY customer_id
*/

customer_id,sum(points)
A,860
B,940
C,360


In [0]:
df9 = sales.join(menu,"product_id") \
        .withColumn("points", when(col("product_name") == "sushi", col("price")*20).otherwise(col("price")*10)) \
        .groupBy("customer_id").agg(sum("points").alias("total_points")) \
        .orderBy("customer_id")

display(df9)

customer_id,total_points
A,860
B,940
C,360


Recreate the table with: customer_id, order_date, product_name, price, member (Y/N)

In [0]:
%sql
-- added join date column for my own refrence
SELECT s.customer_id, order_date, join_date, product_name, price,
            CASE
                WHEN join_date > order_date THEN "N"
                WHEN join_date <= order_date THEN "Y"
                ELSE "N"
            END AS member 
FROM sales s
LEFT JOIN members mem ON s.customer_id = mem.customer_id
JOIN menu m ON s.product_id = m.product_id

customer_id,order_date,join_date,product_name,price,member
A,2021-01-01,2021-01-07,sushi,10,N
A,2021-01-01,2021-01-07,curry,15,N
A,2021-01-07,2021-01-07,curry,15,Y
A,2021-01-10,2021-01-07,ramen,12,Y
A,2021-01-11,2021-01-07,ramen,12,Y
A,2021-01-11,2021-01-07,ramen,12,Y
B,2021-01-01,2021-01-09,curry,15,N
B,2021-01-02,2021-01-09,curry,15,N
B,2021-01-04,2021-01-09,sushi,10,N
B,2021-01-11,2021-01-09,sushi,10,Y


In [0]:
df11 = sales.join(members,"customer_id","left") \
        .join(menu,"product_id") \
        .withColumn("member", when(col("join_date")>col("order_date"),"N").when(col("join_date")<=col("order_date"),"Y").otherwise("N")) \
        .select("customer_id","order_date","join_date","product_name","price","member")

display(df11)

customer_id,order_date,join_date,product_name,price,member
A,2021-01-01,2021-01-07,sushi,10,N
A,2021-01-01,2021-01-07,curry,15,N
A,2021-01-07,2021-01-07,curry,15,Y
A,2021-01-10,2021-01-07,ramen,12,Y
A,2021-01-11,2021-01-07,ramen,12,Y
A,2021-01-11,2021-01-07,ramen,12,Y
B,2021-01-01,2021-01-09,curry,15,N
B,2021-01-02,2021-01-09,curry,15,N
B,2021-01-04,2021-01-09,sushi,10,N
B,2021-01-11,2021-01-09,sushi,10,Y


Recreate the table with: customer_id, order_date, product_name, price, member (Y/N), ranking 

In [0]:

%sql
-- added join date column for my own refrence
WITH mem_cte AS(
SELECT s.customer_id, order_date, join_date, product_name, price,
            CASE
                WHEN join_date > order_date THEN "N"
                WHEN join_date <= order_date THEN "Y"
                ELSE "N"
            END AS member 
FROM sales s
LEFT JOIN members mem ON s.customer_id = mem.customer_id
JOIN menu m ON s.product_id = m.product_id
)
SELECT *,
        CASE 
            WHEN member = "N" THEN NULL
            ELSE RANK() OVER(PARTITION BY customer_id,member ORDER BY order_date) 
        END AS ranking
FROM mem_cte

customer_id,order_date,join_date,product_name,price,member,ranking
A,2021-01-01,2021-01-07,sushi,10,N,
A,2021-01-01,2021-01-07,curry,15,N,
A,2021-01-07,2021-01-07,curry,15,Y,1.0
A,2021-01-10,2021-01-07,ramen,12,Y,2.0
A,2021-01-11,2021-01-07,ramen,12,Y,3.0
A,2021-01-11,2021-01-07,ramen,12,Y,3.0
B,2021-01-01,2021-01-09,curry,15,N,
B,2021-01-02,2021-01-09,curry,15,N,
B,2021-01-04,2021-01-09,sushi,10,N,
B,2021-01-11,2021-01-09,sushi,10,Y,1.0


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col, rank

In [0]:
WindowSpec12 = Window.partitionBy("customer_id","member").orderBy("order_date")


df12 = sales.join(menu,"product_id","left") \
        .join(members,"customer_id") \
        .withColumn("member", when(col("join_date")>col("order_date"),"N").when(col("join_date")<=col("order_date"),"Y").otherwise("N")) \
        .withColumn("ranking", when(col("member")=="N", "NULL").otherwise( rank().over(WindowSpec11)) )


display(df12)

customer_id,product_id,order_date,product_name,price,join_date,member,ranking
A,1,2021-01-01,sushi,10,2021-01-07,N,
A,2,2021-01-01,curry,15,2021-01-07,N,
A,2,2021-01-07,curry,15,2021-01-07,Y,1.0
A,3,2021-01-10,ramen,12,2021-01-07,Y,2.0
A,3,2021-01-11,ramen,12,2021-01-07,Y,3.0
A,3,2021-01-11,ramen,12,2021-01-07,Y,3.0
B,2,2021-01-01,curry,15,2021-01-09,N,
B,2,2021-01-02,curry,15,2021-01-09,N,
B,1,2021-01-04,sushi,10,2021-01-09,N,
B,1,2021-01-11,sushi,10,2021-01-09,Y,1.0
