In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=dba2afcc06d9e0b7d3ec665c41c93b78f300680df6f6394b3c79391cf9995cf9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType
from pyspark.sql import SparkSession

In [None]:
# Create a Spark Session
spark = SparkSession.builder.appName("Create Table").getOrCreate()

# Define the custom schema
custom_schema = StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
])

# Read CSV data with the custom schema
df = spark.read.csv('/content/sales.csv', header=True, schema=custom_schema)

# Create a temporary table
df.createOrReplaceTempView("sales")

# Query the table
# result = spark.sql("SELECT product_id, customer_id,order_date,location, source_order FROM sales")
df.show()


+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|          C|2022-01-07|      UK|      zomato|
|         3|  

In [None]:
#Deriving year, month, quater
from pyspark.sql.functions import month, year, quarter
df = df.withColumn("order_year",year(df.order_date)) #New Column adding

In [None]:
df.show()

+----------+-----------+----------+--------+------------+----------+
|product_id|customer_id|order_date|location|source_order|order_year|
+----------+-----------+----------+--------+------------+----------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|
|         1|          B|2023-01-04|   India|  Restaurant|      2023|
|         1|          B|2023-02-11|   India|      Swiggy|      2023|
|         3|          B|2023-01-16|   India|      zomato|      2023|
|         3|          B|2022-02-01|   India|      zomato|      2022|
|         3|          C|2023-01-01

In [None]:
df = df.withColumn("order_month",month(df.order_date))
df.show()

+----------+-----------+----------+--------+------------+----------+-----------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|
+----------+-----------+----------+--------+------------+----------+-----------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|          1|
|         1|          B|2023-01-04|   India|  Restaurant|      2023|          1|
|         1|          B|2023-02-11|   India|      Swiggy|      2023|          2|
|         3|          B|2023

In [None]:
df = df.withColumn("order_quater",quarter(df.order_date))
df.show()

+----------+-----------+----------+--------+------------+----------+-----------+------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quater|
+----------+-----------+----------+--------+------------+----------+-----------+------------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|           1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|           1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|           1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|           1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|           1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|           1|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|          1|           1|
|         1|          B|2023-01-04|   India|  Restaurant|   

In [None]:
spark = SparkSession.builder.appName("menu").getOrCreate()
custom_schema = StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True)
])

menu_df = spark.read.csv("/content/menu.csv",header = True,schema = custom_schema)
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [None]:
#Total amount spent by each costumer
total_amount_spent = (df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'}).orderBy("customer_id"))
total_amount_spent.show()
display(total_amount_spent)

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    3960.0|
|          B|    3240.0|
|          C|    1800.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



DataFrame[customer_id: string, sum(price): double]

In [None]:
df.show()

+----------+-----------+----------+--------+------------+----------+-----------+------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quater|
+----------+-----------+----------+--------+------------+----------+-----------+------------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|           1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|           1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|           1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|           1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|           1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|           1|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|          1|           1|
|         1|          B|2023-01-04|   India|  Restaurant|   

In [None]:
#Total amount spent on each food category
food_spent = df.join(menu_df,"product_id").groupBy("product_name").agg({'price':'sum'}).orderBy('product_name')
food_spent.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



In [None]:
#Total sales each month
df1 = (df.join(menu_df,"product_id").groupBy('order_month').agg({"price":"sum"}).orderBy('order_month'))
df1.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2460.0|
|          2|    2430.0|
|          3|     810.0|
|          5|    2460.0|
|          6|    2460.0|
|          7|     810.0|
|         11|     810.0|
+-----------+----------+



In [None]:
#Yearly sales
df2 = (df.join(menu_df,"product_id").groupBy('order_year').agg({"price":"sum"}).orderBy('order_year'))
df2.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    7890.0|
+----------+----------+



In [None]:
df3 = (df.join(menu_df,"product_id").groupBy('order_quater').agg({"price":"sum"}).orderBy('order_quater'))
df3.show()

+------------+----------+
|order_quater|sum(price)|
+------------+----------+
|           1|    5700.0|
|           2|    4920.0|
|           3|     810.0|
|           4|     810.0|
+------------+----------+



In [None]:
# how many times each product purchased
from pyspark.sql.functions import count
most_df = (df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count')).orderBy('product_count',ascending = 0)
.drop('product_id'))
most_df.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



In [None]:
#Frequency of customer visited to restaurant
from pyspark.sql.functions import countDistinct

visit_df = (df.filter(df.source_order == 'Restaurant').groupBy('customer_id').agg(countDistinct('order_date')))
visit_df.show()

+-----------+--------------------------+
|customer_id|count(DISTINCT order_date)|
+-----------+--------------------------+
|          E|                         5|
|          B|                         6|
|          D|                         1|
|          C|                         3|
|          A|                         6|
+-----------+--------------------------+



In [None]:
#total sales by each country
country_df = (df.join(menu_df,'product_id').groupBy('location').agg({'price':'sum'}))
country_df.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    3960.0|
|     USA|    2160.0|
|      UK|    6120.0|
+--------+----------+



In [None]:
#total sales by order source
order_df = (df.join(menu_df,'product_id').groupBy('source_order').agg({'price':'sum'}))
order_df.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    4830.0|
|  Restaurant|    2490.0|
+------------+----------+

