Importing libraries and datasets

In [37]:
import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-19"
os.environ["SPARK_HOME"] = "C:\Program Files\spark-3.5.3-bin-hadoop3"

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,DateType,StringType


In [10]:
sc = SparkSession.builder.appName("Word Count").getOrCreate()


Creating schema for a sales_df data frame

In [39]:
schema = StructType([
    StructField("product_ID",IntegerType(),True),
    StructField("customer_ID",StringType() , True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
])

Inserting data into the sales_df

In [11]:
sales_df = sc.read.format("csv").option("inferschema","true").schema(schema).load(r"C:\Users\rahul dodda\Downloads\sales.csv.txt")

Types of display for sales_df data frame

In [None]:
print(type(sales_df))
print(display(sales_df))

<class 'pyspark.sql.dataframe.DataFrame'>


DataFrame[product_ID: int, customer_ID: string, order_date: date, location: string, source_order: string]

In [13]:
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_ID|customer_ID|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

Adding columns to the sales_df and using year, month, quarter functions

In [20]:
from pyspark.sql.functions import year,month,quarter

sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
sales_df = sales_df.withColumn("order_month", month(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter", quarter(sales_df.order_date))
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_ID|customer_ID|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

Creating menu_df schema and insert data into it

In [15]:
from pyspark.sql.types import StructField,StructType

schema = StructType([
    StructField("product_ID",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True)
    
])
menu_df = sc.read.format("csv").option("inferschema","true").schema(schema).load(r"C:\Users\rahul dodda\Downloads\menu.csv.txt")
menu_df.show()

+----------+------------+-----+
|product_ID|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



Total amount spent by each customer

In [18]:
amount_spent = (sales_df.join(menu_df,'product_ID').groupby("customer_id").agg({'price':'sum'})).orderBy("customer_id")
amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



Total amount spent on each food item

In [19]:
product_spent = (sales_df.join(menu_df,'product_ID').groupby("product_name").agg({'price':'sum'}))
product_spent.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|       Pasta|    1080.0|
|       PIZZA|    2100.0|
|    sandwich|    5760.0|
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
+------------+----------+



total amount of sales in each month

In [21]:
month_spent = (sales_df.join(menu_df,'product_ID').groupby("order_month").agg({'price':'sum'}))
month_spent.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          6|    2960.0|
|          3|     910.0|
|          5|    2960.0|
|          7|     910.0|
|         11|     910.0|
|          2|    2730.0|
+-----------+----------+



Yearly sales

In [22]:
year_spent = (sales_df.join(menu_df,'product_ID').groupby("order_year").agg({'price':'sum'}))
year_spent.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2023|    9990.0|
|      2022|    4350.0|
+----------+----------+



Quaterly sales

In [24]:
quarter_spent = (sales_df.join(menu_df,'product_ID').groupby("order_quarter").agg({'price':'sum'}))
quarter_spent.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            3|     910.0|
|            4|     910.0|
|            2|    5920.0|
+-------------+----------+



Total no of orders by each category

In [None]:
product_sold = (sales_df.join(menu_df,'product_ID').groupby("product_name").agg({'price':'count'}))
product_sold.show()

+------------+------------+
|product_name|count(price)|
+------------+------------+
|       Pasta|           6|
|       PIZZA|          21|
|    sandwich|          48|
|     Biryani|           6|
|     Chowmin|          24|
|        Dosa|          12|
+------------+------------+



top 5 ordered items

In [29]:
from pyspark.sql.functions import count

product_sold = (sales_df.join(menu_df,'product_ID').groupby("product_name")
                .agg(count('product_id').alias("product_count"))
                .orderBy("product_count",ascending=0)
                .limit(5))
product_sold.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|       Pasta|            6|
+------------+-------------+



top ordered items

In [30]:
from pyspark.sql.functions import count

product_sold = (sales_df.join(menu_df,'product_ID').groupby("product_name")
                .agg(count('product_id').alias("product_count"))
                .orderBy("product_count",ascending=0)
                .limit(1))
product_sold.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
+------------+-------------+



frequency of customer visited

In [41]:
from pyspark.sql.functions import countDistinct

pdf = (sales_df.filter(sales_df.source_order == "Restaurant").groupBy('customer_id').agg(countDistinct('order_date').alias("visits")))
pdf.show()

+-----------+------+
|customer_id|visits|
+-----------+------+
|          E|     5|
|          B|     6|
|          D|     1|
|          C|     3|
|          A|     6|
+-----------+------+



total sales by each country

In [33]:
amount_spent = (sales_df.join(menu_df,'product_ID').groupby("location").agg({'price':'sum'}))
amount_spent.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+



total sales by order_source

In [36]:
amount_spent = (sales_df.join(menu_df,'product_ID').groupby("source_order").agg({'price':'sum'}).limit(3))
amount_spent.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4810.0|
|      Swiggy|    6330.0|
|  Restaurant|    3090.0|
+------------+----------+

