In [35]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf

In [36]:
spark = SparkSession.builder.appName("Danny's Diner").getOrCreate()

In [37]:
sales_data = [('A', '2021-01-01', '1'),
  ('A', '2021-01-01', '2'),
  ('A', '2021-01-07', '2'),
  ('A', '2021-01-10', '3'),
  ('A', '2021-01-11', '3'),
  ('A', '2021-01-11', '3'),
  ('B', '2021-01-01', '2'),
  ('B', '2021-01-02', '2'),
  ('B', '2021-01-04', '1'),
  ('B', '2021-01-11', '1'),
  ('B', '2021-01-16', '3'),
  ('B', '2021-02-01', '3'),
  ('C', '2021-01-01', '3'),
  ('C', '2021-01-01', '3'),
  ('C', '2021-01-07', '3')]

sales_schema = """customer_id string, order_date string, product_id string"""

In [38]:
sales_raw_df = spark.createDataFrame(sales_data,sales_schema)
sales_df = sales_raw_df.select("customer_id",sf.to_date("order_date",'yyyy-MM-dd').alias("order_date"),
                               sf.col("product_id").cast("int"))
sales_df.printSchema()
sales_df.show()

root
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: integer (nullable = true)

+-----------+----------+----------+
|customer_id|order_date|product_id|
+-----------+----------+----------+
|          A|2021-01-01|         1|
|          A|2021-01-01|         2|
|          A|2021-01-07|         2|
|          A|2021-01-10|         3|
|          A|2021-01-11|         3|
|          A|2021-01-11|         3|
|          B|2021-01-01|         2|
|          B|2021-01-02|         2|
|          B|2021-01-04|         1|
|          B|2021-01-11|         1|
|          B|2021-01-16|         3|
|          B|2021-02-01|         3|
|          C|2021-01-01|         3|
|          C|2021-01-01|         3|
|          C|2021-01-07|         3|
+-----------+----------+----------+



In [39]:
product_data = [(1, 'sushi', 10),
  (2, 'curry', 15),
  (3, 'ramen', 12)]

product_schema = """product_id int,product_name string,price int"""

In [40]:
product_df = spark.createDataFrame(product_data,product_schema)

product_df.printSchema()
product_df.show()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       sushi|   10|
|         2|       curry|   15|
|         3|       ramen|   12|
+----------+------------+-----+



In [41]:
member_data = [('A', '2021-01-07'),
  ('B', '2021-01-09')]

member_schema = """customer_id string,join_date string"""

In [42]:
member_raw_df = spark.createDataFrame(member_data,member_schema)
member_df = member_raw_df.selectExpr("customer_id","to_date(join_date,'yyyy-MM-dd') as join_date")

member_df.printSchema()
member_df.show()

root
 |-- customer_id: string (nullable = true)
 |-- join_date: date (nullable = true)

+-----------+----------+
|customer_id| join_date|
+-----------+----------+
|          A|2021-01-07|
|          B|2021-01-09|
+-----------+----------+



Q1. What is the total amount each customer spent at the restaurant?

In [43]:
q1_df = sales_df.join(product_df,"product_id","inner") \
                .groupBy("customer_id") \
                .agg(sf.sum("price").alias("Total_Amount_Spent")) \
                .orderBy("customer_id")
q1_df.show()

+-----------+------------------+
|customer_id|Total_Amount_Spent|
+-----------+------------------+
|          A|                76|
|          B|                74|
|          C|                36|
+-----------+------------------+



Q2. How many days has each customer visited the restaurant?

In [44]:
q2_df = sales_df.groupBy("customer_id") \
                .agg(sf.countDistinct("order_date").alias("No. of days visited the restaurant")) \
                .orderBy("customer_id")
q2_df.show()

+-----------+----------------------------------+
|customer_id|No. of days visited the restaurant|
+-----------+----------------------------------+
|          A|                                 4|
|          B|                                 6|
|          C|                                 2|
+-----------+----------------------------------+



Q3. What was the first item from the menu purchased by each customer?

In [45]:
from pyspark.sql import Window

In [46]:
window_fun = Window.partitionBy("customer_id").orderBy("order_date")

In [47]:
q3_df = sales_df.withColumn("rank",sf.dense_rank().over(window_fun)) \
                .filter("rank==1") \
                .join(product_df,"product_id") \
                .select("customer_id","product_name") \
                .distinct() \
                .orderBy("customer_id")
q3_df.show()

+-----------+------------+
|customer_id|product_name|
+-----------+------------+
|          A|       curry|
|          A|       sushi|
|          B|       curry|
|          C|       ramen|
+-----------+------------+



Q4. What is the most purchased item on the menu and how many times was it purchased by all customers?

In [48]:
q4_df = sales_df.join(product_df,"product_id") \
                .groupBy("product_name") \
                .agg(sf.count("customer_id").alias("Number of Purchases made by customers")) \
                .orderBy(sf.desc("Number of Purchases made by customers"))
q4_df.show()
print(f"As per the above table, {q4_df.collect()[0][0]} is the most purchased item on the menu and it has been purchased {q4_df.collect()[0][1]} times by customers.")

+------------+-------------------------------------+
|product_name|Number of Purchases made by customers|
+------------+-------------------------------------+
|       ramen|                                    8|
|       curry|                                    4|
|       sushi|                                    3|
+------------+-------------------------------------+

As per the above table, ramen is the most purchased item on the menu and it has been purchased 8 times by customers.


Q5. Which item was the most popular for each customer?

In [49]:
q5_df = sales_df.join(product_df,"product_id") \
                .groupBy("customer_id","product_name") \
                .agg(sf.count("product_name").alias("order_count")) \
                .withColumn("rank",sf.dense_rank().over(Window.partitionBy("customer_id")
                                                        .orderBy(sf.col("order_count").desc()))) \
                .filter("rank == 1") \
                .select("customer_id","product_name","order_count") \
                .orderBy("customer_id","product_name")
q5_df.show()

+-----------+------------+-----------+
|customer_id|product_name|order_count|
+-----------+------------+-----------+
|          A|       ramen|          3|
|          B|       curry|          2|
|          B|       ramen|          2|
|          B|       sushi|          2|
|          C|       ramen|          3|
+-----------+------------+-----------+



Q6. Which item was purchased first by the customer after they became a member?

In [50]:
q6_df = sales_df.join(member_df,"customer_id","inner") \
                .filter(sf.col("order_date")>sf.col("join_date")) \
                .withColumn("rank",sf.dense_rank().over(Window.partitionBy("customer_id").orderBy("order_date"))) \
                .filter("rank == 1") \
                .join(product_df,"product_id") \
                .select("customer_id","order_date","product_name") \
                .orderBy("customer_id")
q6_df.show()

+-----------+----------+------------+
|customer_id|order_date|product_name|
+-----------+----------+------------+
|          A|2021-01-10|       ramen|
|          B|2021-01-11|       sushi|
+-----------+----------+------------+



Q7. Which item was purchased just before the customer became a member?

In [51]:
q7_df = sales_df.join(member_df,"customer_id","inner") \
                .filter(sf.col("order_date")<sf.col("join_date")) \
                .withColumn("rank",sf.dense_rank().over(Window.partitionBy("customer_id")
                                                        .orderBy(sf.col("order_date").desc()))) \
                .filter("rank == 1") \
                .join(product_df,"product_id") \
                .select("customer_id","order_date","product_name") \
                .orderBy("customer_id")
q7_df.show()

+-----------+----------+------------+
|customer_id|order_date|product_name|
+-----------+----------+------------+
|          A|2021-01-01|       curry|
|          A|2021-01-01|       sushi|
|          B|2021-01-04|       sushi|
+-----------+----------+------------+



Q8. What is the total items and amount spent for each member before they became a member?

In [52]:
q8_df = sales_df.join(member_df,"customer_id","inner") \
                .filter(sf.col("order_date")<sf.col("join_date")) \
                .join(product_df,"product_id") \
                .groupBy("customer_id") \
                .agg(sf.count("product_name").alias("Total_Items"),sf.sum("price").alias("Total_Amount")) \
                .orderBy("customer_id")
q8_df.show()

+-----------+-----------+------------+
|customer_id|Total_Items|Total_Amount|
+-----------+-----------+------------+
|          A|          2|          25|
|          B|          3|          40|
+-----------+-----------+------------+



Q9. If each $1 spent equates to 10 points and sushi has a 2x points multiplier. How many points would each customer have?

In [53]:
q9_df = sales_df.join(product_df,"product_id") \
                .withColumn("Points",sf.when(sf.col("product_name")=='sushi',
                                             sf.col("price")*20).otherwise(sf.col("price")*10)) \
                .groupBy("customer_id") \
                .agg(sf.sum("Points").alias("Total Customer Points")) \
                .orderBy("customer_id")
q9_df.show()

+-----------+---------------------+
|customer_id|Total Customer Points|
+-----------+---------------------+
|          A|                  860|
|          B|                  940|
|          C|                  360|
+-----------+---------------------+



Q10. In the first week after a customer joins the program (including their join date) they earn 2x points on all items, not just sushi. How many points do customer A and B have at the end of January?

In [54]:
q10_df = sales_df.join(member_df,"customer_id") \
                 .filter(sf.month("order_date") == 1) \
                 .join(product_df,"product_id") \
                 .withColumn("Customer Points",
                             sf.when((sf.col("order_date")>=sf.col("join_date"))&
                                     (sf.col("order_date").between(sf.col("join_date"),sf.date_add(sf.col("join_date"),6))),
                                     sf.col("price")*20)
                            .when(sf.col("product_name")=='sushi',sf.col("price")*20).otherwise(sf.col("price")*10)) \
                 .groupBy("customer_id") \
                 .agg(sf.sum("Customer Points").alias("Total Customer Points")) \
                 .orderBy("customer_id")

q10_df.show()

+-----------+---------------------+
|customer_id|Total Customer Points|
+-----------+---------------------+
|          A|                 1370|
|          B|                  820|
+-----------+---------------------+



Bonus Questions
Join All The Things
The following questions are related creating basic data tables that Danny and his team can use to quickly derive insights without needing to join the underlying tables using SQL.

Recreate the following table output using the available data:

In [55]:
q11_df = sales_df.join(member_df,"customer_id","left") \
                 .join(product_df,"product_id") \
                 .withColumn("member",sf.when(sf.col("join_date").isNull(),"N")
                             .when(sf.col("join_date")>sf.col("order_date"),"N")
                             .otherwise("Y")) \
                 .select("customer_id","order_date","product_name","price","member") \
                 .orderBy("customer_id","order_date")
q11_df.show()

+-----------+----------+------------+-----+------+
|customer_id|order_date|product_name|price|member|
+-----------+----------+------------+-----+------+
|          A|2021-01-01|       sushi|   10|     N|
|          A|2021-01-01|       curry|   15|     N|
|          A|2021-01-07|       curry|   15|     Y|
|          A|2021-01-10|       ramen|   12|     Y|
|          A|2021-01-11|       ramen|   12|     Y|
|          A|2021-01-11|       ramen|   12|     Y|
|          B|2021-01-01|       curry|   15|     N|
|          B|2021-01-02|       curry|   15|     N|
|          B|2021-01-04|       sushi|   10|     N|
|          B|2021-01-11|       sushi|   10|     Y|
|          B|2021-01-16|       ramen|   12|     Y|
|          B|2021-02-01|       ramen|   12|     Y|
|          C|2021-01-01|       ramen|   12|     N|
|          C|2021-01-01|       ramen|   12|     N|
|          C|2021-01-07|       ramen|   12|     N|
+-----------+----------+------------+-----+------+



Rank All The Things
Danny also requires further information about the ranking of customer products, but he purposely does not need the ranking for non-member purchases so he expects null ranking values for the records when customers are not yet part of the loyalty program.

In [56]:
q12_df = sales_df.join(member_df,"customer_id","left") \
                 .join(product_df,"product_id") \
                 .withColumn("member",sf.when(sf.col("join_date").isNull(),"N")
                             .when(sf.col("join_date")>sf.col("order_date"),"N")
                             .otherwise("Y")) \
                 .withColumn("ranking",sf.when(sf.col("member") == "Y",
                                               sf.dense_rank().over(Window.partitionBy("customer_id","member")
                                                                    .orderBy("order_date")))) \
                 .select("customer_id","order_date","product_name","price","member","ranking") \
                 .orderBy("customer_id","order_date")
q12_df.show()

+-----------+----------+------------+-----+------+-------+
|customer_id|order_date|product_name|price|member|ranking|
+-----------+----------+------------+-----+------+-------+
|          A|2021-01-01|       sushi|   10|     N|   null|
|          A|2021-01-01|       curry|   15|     N|   null|
|          A|2021-01-07|       curry|   15|     Y|      1|
|          A|2021-01-10|       ramen|   12|     Y|      2|
|          A|2021-01-11|       ramen|   12|     Y|      3|
|          A|2021-01-11|       ramen|   12|     Y|      3|
|          B|2021-01-01|       curry|   15|     N|   null|
|          B|2021-01-02|       curry|   15|     N|   null|
|          B|2021-01-04|       sushi|   10|     N|   null|
|          B|2021-01-11|       sushi|   10|     Y|      1|
|          B|2021-01-16|       ramen|   12|     Y|      2|
|          B|2021-02-01|       ramen|   12|     Y|      3|
|          C|2021-01-01|       ramen|   12|     N|   null|
|          C|2021-01-01|       ramen|   12|     N|   nul