# Problem Statement
## -----------------------------------------------------------------------------------------------
###                 Danny wants to use the data to answer a few simple questions about his customers, especially about their visiting patterns, how much money they’ve spent and also which menu items are their favourite. Having this deeper connection with his customers will help him deliver a better and more personalised experience for his loyal customers.

###                 He plans on using these insights to help him decide whether he should expand the existing customer loyalty program - additionally he needs help to generate some basic datasets so his team can easily inspect the data without needing to use SQL.

In [0]:
# creating a spark session to get started with our spark jobs

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

dd_ss = SparkSession.builder.appName("Restaurant_Dannys_Dinner").getOrCreate()
print(dd_ss)

<pyspark.sql.session.SparkSession object at 0x7f2e8e0a5000>


In [0]:
# creating a sales dataframe and load data into it

sales_col = StructType([
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("product_id", StringType(), True)
])

sales_df = dd_ss.read.format("csv")\
    .option("inferschema", True)\
    .schema(sales_col)\
    .load("/FileStore/tables/pyspark/Sales_danny_s_dinner.csv")
    
sales_df.limit(10).display()
sales_df.printSchema()

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


root
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: string (nullable = true)



In [0]:
# you can see in our output we get product_id as String
# so need to fix this
sales_df = sales_df.withColumn("product_id", col("product_id").cast(IntegerType()))
sales_df.limit(10).display()
sales_df.printSchema()

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


root
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: integer (nullable = true)



In [0]:
# creating remaining dataframes and load data

menu_col = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True)
])

menu_df = dd_ss.read.format("csv")\
    .option("inferschema", True)\
    .schema(menu_col)\
    .load("/FileStore/tables/pyspark/Menu_danny_s_dinner.csv")

menu_df = menu_df.withColumn("price", col("price").cast(IntegerType()))
    
menu_df.display()
menu_df.printSchema()

product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)



In [0]:
member_col = StructType([
    StructField("customer_id", StringType(), True),
    StructField("join_date", DateType(), True),
])

member_df = dd_ss.read.format("csv")\
    .option("inferschema", True)\
    .schema(member_col)\
    .load("/FileStore/tables/pyspark/Member_danny_s_dinner.csv")
    
member_df.display()
member_df.printSchema()

customer_id,join_date
A,2021-01-07
B,2021-01-09


root
 |-- customer_id: string (nullable = true)
 |-- join_date: date (nullable = true)



In [0]:
sales_df.createOrReplaceTempView("sales_tb")
menu_df.createOrReplaceTempView("menu_tb")
member_df.createOrReplaceTempView("menu_tb")

In [0]:
dd_ss.sql("select * from sales_tb limit 10").display()
dd_ss.sql("select * from menu_tb").display()
dd_ss.sql("select * from member_tb").display()

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


customer_id,join_date
A,2021-01-07
B,2021-01-09


#Questions

## 1. What is the total amount each customer spent at the restaurant?

In [0]:
# using sum we get total amount and group by help get group of customer like A, B, C
# we use join here because customer_id and price column belog from diff tables
# so get them from respective tables and perform respective we use join like sum

dd_ss.sql("""
                select s.customer_id, sum(m.price) as spend_amount from sales_tb s
                join menu_tb m using (product_id)
                group by s.customer_id
                order by spend_amount desc;
""").display()

# get the answer/insight refer below table ----------------------------------------------

customer_id,spend_amount
A,76
B,74
C,36


## 2. How many days has each customer visited the restaurant?

In [0]:
# using count we get total number of days customer visit to restaurant and distinct help us remove dublicate date or same day he visited again
# using group by we get group of customer like A, B, C
# using order by we sort our output [asc | desc]

dd_ss.sql("""
                select customer_id, count(distinct order_date) as visited_count
                from sales_tb
                group by customer_id
                order by visited_count desc;
""").display()

# get the answer/insight refer below table ----------------------------------------------

customer_id,visited_count
B,6
A,4
C,2


## 3. What was the first item from the menu purchased by each customer?

In [0]:
# here we create a common table expression [ menu ]
# using window function we create a dense_rank() column [ menu_rank ] that help as to sort data accordingly our requirement
# In query, we simple use menu table and menu_rank to get answer of our question

dd_ss.sql("""
                with menu as (
                    select s.customer_id, s.order_date, m.product_name,
                    dense_rank() over (partition by s.customer_id order by s.order_date) as menu_rank
                    from sales_tb s join menu_tb m using (product_id)
                ) 
                
                select customer_id, order_date, product_name
                from menu
                where menu_rank = 1
                group by customer_id, order_date, product_name;
""").display()

# get the answer/insight refer below table ----------------------------------------------

customer_id,order_date,product_name
A,2021-01-01,sushi
A,2021-01-01,curry
B,2021-01-01,curry
C,2021-01-01,ramen
