In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

In [0]:
### Loading sales data
sales_schema = StructType([

    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
])

sales_df = spark.read.format("csv").option("header",True).schema(sales_schema).load("/FileStore/tables/sales_csv.txt")

In [0]:
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|          C|2022-01-07|      UK|      zomato|
|         3|  

In [0]:
sales_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)



In [0]:
from pyspark.sql.functions import month, year, quarter

In [0]:
### Extracting Year from order_date column.
sales_df = sales_df.withColumn("order_year",year(sales_df.order_date))
sales_df.show()

+----------+-----------+----------+--------+------------+----------+
|product_id|customer_id|order_date|location|source_order|order_year|
+----------+-----------+----------+--------+------------+----------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|
|         1|          B|2023-01-04|   India|  Restaurant|      2023|
|         1|          B|2023-02-11|   India|      Swiggy|      2023|
|         3|          B|2023-01-16|   India|      zomato|      2023|
|         3|          B|2022-02-01|   India|      zomato|      2022|
|         3|          C|2023-01-01

In [0]:
### ### Extracting Month and Quarter from order_date column.
sales_df = sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter",quarter(sales_df.order_date))

In [0]:
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|          1|            1|
|         1|          B|2023-01-04|   India|  Rest

In [0]:
### Loading menu data
menu_schema = StructType([

    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True)
    
])

menu_df = spark.read.format("csv").option("header","True").schema(menu_schema).load("/FileStore/tables/menu_csv.txt")

In [0]:
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [0]:
## Total Amount Spend by per Customer

total_amount_spent = (sales_df.join(menu_df,'product_id')
                            .groupBy('customer_id')
                            .agg({'price':'sum'})
                            .orderBy('customer_id')
                    )
total_amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    3960.0|
|          B|    3240.0|
|          C|    1800.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



In [0]:
## Total amount spend per product.

total_amount_spent_per_product = (sales_df.join(menu_df,'product_id')
                      .groupBy('product_name')
                      .agg({'price':'sum'})
                      .orderBy('product_name'))

total_amount_spent_per_product.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



In [0]:
### Monthly Sales.

monthly_sales = (sales_df.join(menu_df,'product_id')
                 .groupBy('order_month')
                 .agg({'price':'sum'})
                 .orderBy('order_month'))

monthly_sales.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2460.0|
|          2|    2430.0|
|          3|     810.0|
|          5|    2460.0|
|          6|    2460.0|
|          7|     810.0|
|         11|     810.0|
+-----------+----------+



In [0]:
## Yearly Sales

yearly_sales = (sales_df.join(menu_df,'product_id')
                .groupBy('order_year')
                .agg({'price':'sum'})
                .orderBy('order_year'))

yearly_sales.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    7890.0|
+----------+----------+



In [0]:
## Quarterly Sales

quarterly_sales = (sales_df.join(menu_df,'product_id')
                   .groupBy('order_quarter')
                   .agg({'price':'sum'})
                   .orderBy('order_quarter'))

quarterly_sales.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    5700.0|
|            2|    4920.0|
|            3|     810.0|
|            4|     810.0|
+-------------+----------+



In [0]:
from pyspark.sql.functions import count

In [0]:
## No. of products purchased per product

product_purchased = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name')
                     .agg(count('product_id').alias('product_count'))
                     .orderBy('product_count',ascending = 0)
                     .drop('product_id')
                     )
product_purchased.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



In [0]:
## Top 5 Products
total_5_products = (sales_df.join(menu_df,'product_id')
                    .groupBy('product_id','product_name')
                    .agg(count('product_id').alias('product_count'))
                    .orderBy('product_count',ascending = 0)
                    .drop('product_id').limit(5) )

total_5_products.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



In [0]:
## Most Ordered product.

top_ordered_product = (sales_df.join(menu_df,'product_id')
                    .groupBy('product_id','product_name')
                    .agg(count('product_id').alias('product_count'))
                    .orderBy('product_count',ascending = 0)
                    .drop('product_id').limit(1) )

top_ordered_product.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
+------------+-------------+



In [0]:
## Sales Per Country
sales_by_country = (sales_df.join(menu_df,'product_id')
                    .groupBy('location')
                    .agg({'price':'sum'}).alias('total_sales'))

sales_by_country.show()
     

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    3960.0|
|     USA|    2160.0|
|      UK|    6120.0|
+--------+----------+



In [0]:
## Sales by order source
sales_by_order_source = (sales_df.join(menu_df,'product_id')
                         .groupBy('source_order')
                    .agg({'price':'sum'}).alias('total_sales'))

sales_by_order_source.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    4830.0|
|  Restaurant|    2490.0|
+------------+----------+

