In [0]:
# /FileStore/tables/sales_csv-1.txt
# /FileStore/tables/menu_csv.txt

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
schema = StructType(
    [
        StructField("product_id", IntegerType(), True),
        StructField("customer_id", StringType(), True),
        StructField("order_date", DateType(), True),
        StructField("location", StringType(), True),
        StructField("source_order", StringType(), True),
    ]
)

sales_df = (
    spark.read.format("csv")
    .option("inferschema", "true")
    .schema(schema)
    .load("/FileStore/tables/sales_csv-1.txt")
)

sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

In [0]:
sales_df = sales_df.withColumn("Order_Year",year(sales_df.order_date)).withColumn("Order_month",month(sales_df.order_date)).withColumn("Order_quarter",quarter(sales_df.order_date))

sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|Order_Year|Order_month|Order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

In [0]:
menu_schema = StructType([
    StructField("product_id",IntegerType(),True),StructField("product_name",StringType()
    ,True),StructField("price",StringType(),True),
])

menu_df = spark.read.format("csv").option("inferschema","true").schema(menu_schema).load("/FileStore/tables/menu_csv.txt")

menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [0]:
total_amount_spend = sales_df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'}).orderBy('customer_id')
total_amount_spend.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



In [0]:
total_amount_spend.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



In [0]:
total_amount_spend_category = sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'price':'sum'}).orderBy('product_name')
total_amount_spend_category.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



In [0]:
total_amt_sales_mtd = sales_df.join(menu_df,'product_id').groupBy('order_year','order_month').agg({'price':'sum'}).alias('price').orderBy('order_year','order_month')

total_amt_sales_mtd.show()

+----------+-----------+----------+
|order_year|order_month|sum(price)|
+----------+-----------+----------+
|      2022|          1|     720.0|
|      2022|          2|    1050.0|
|      2022|          3|     380.0|
|      2022|          5|     720.0|
|      2022|          6|     720.0|
|      2022|          7|     380.0|
|      2022|         11|     380.0|
|      2023|          1|    2240.0|
|      2023|          2|    1680.0|
|      2023|          3|     530.0|
|      2023|          5|    2240.0|
|      2023|          6|    2240.0|
|      2023|          7|     530.0|
|      2023|         11|     530.0|
+----------+-----------+----------+



In [0]:
total_ytd_sales = sales_df.join(menu_df,'product_id').groupBy('order_quarter').agg({'price':'sum'}).orderBy('order_quarter')
total_ytd_sales.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



In [0]:
times_product_purchased = sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0)
times_product_purchased.show()

+----------+------------+-------------+
|product_id|product_name|product_count|
+----------+------------+-------------+
|         3|    sandwich|           48|
|         2|     Chowmin|           24|
|         1|       PIZZA|           21|
|         4|        Dosa|           12|
|         5|     Biryani|            6|
|         6|       Pasta|            6|
+----------+------------+-------------+



In [0]:
sales_df.select(sales_df.source_order).distinct().show()

frequency_of_customer_visited = sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg({'order_date':'count'})



frequency_of_customer_visited.show()

+------------+
|source_order|
+------------+
|      zomato|
|      Swiggy|
|  Restaurant|
+------------+

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          E|                6|
|          B|                6|
|          D|                3|
|          C|                3|
|          A|                9|
+-----------+-----------------+



In [0]:
total_sales_country = sales_df.join(menu_df,'product_id').groupBy('location').agg({'price':'sum'})
total_sales_country.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+



In [0]:
total_sales_order_source = sales_df.join(menu_df,'product_id').groupBy('source_order').agg(sum('price').alias('sales'))
total_sales_order_source.show()

+------------+------+
|source_order| sales|
+------------+------+
|      zomato|4920.0|
|      Swiggy|6330.0|
|  Restaurant|3090.0|
+------------+------+

