In [0]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import StructType,StructField

In [0]:
spark=SparkSession.builder.appName("sales_analysis").getOrCreate()
data=spark.read.csv("/FileStore/tables/sales_csv.txt")
data.show()

+---+---+-----------+-----+----------+
|_c0|_c1|        _c2|  _c3|       _c4|
+---+---+-----------+-----+----------+
|  1|  A| 2023-01-01|India|    Swiggy|
|  2|  A| 2022-01-01|India|    Swiggy|
|  2|  A| 2023-01-07|India|    Swiggy|
|  3|  A| 2023-01-10|India|Restaurant|
|  3|  A| 2022-01-11|India|    Swiggy|
|  3|  A| 2023-01-11|India|Restaurant|
|  2|  B| 2022-02-01|India|    Swiggy|
|  2|  B| 2023-01-02|India|    Swiggy|
|  1|  B| 2023-01-04|India|Restaurant|
|  1|  B| 2023-02-11|India|    Swiggy|
|  3|  B| 2023-01-16|India|    zomato|
|  3|  B| 2022-02-01|India|    zomato|
|  3|  C| 2023-01-01|India|    zomato|
|  1|  C| 2023-01-01|   UK|    Swiggy|
|  6|  C| 2022-01-07|   UK|    zomato|
|  3|  D| 2023-02-16|   UK|Restaurant|
|  5|  D| 2022-02-01|   UK|    zomato|
|  3|  E| 2023-02-01|   UK|Restaurant|
|  4|  E| 2023-02-01|   UK|    Swiggy|
|  4|  E| 2023-02-07|   UK|Restaurant|
+---+---+-----------+-----+----------+
only showing top 20 rows



In [0]:
df_menu=spark.read.csv("/FileStore/tables/menu_csv.txt")
df_menu.show()

+---+---------+----+----+
|_c0|      _c1| _c2| _c3|
+---+---------+----+----+
|  1|    PIZZA| 100|null|
|  2|  Chowmin| 150|null|
|  3| sandwich| 120|null|
|  4|     Dosa| 110|null|
|  5|  Biryani|  80|null|
|  6|    Pasta| 180|null|
+---+---------+----+----+



In [0]:
df_sales.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType,IntegerType,DateType 
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("Location",StringType(),True),
    StructField("source_order",IntegerType(),True)
])


In [0]:
sales_df=spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/sales_csv.txt")

In [0]:
display(sales_df)

product_id,customer_id,order_date,Location,source_order
1,A,2023-01-01,India,
2,A,2022-01-01,India,
2,A,2023-01-07,India,
3,A,2023-01-10,India,
3,A,2022-01-11,India,
3,A,2023-01-11,India,
2,B,2022-02-01,India,
2,B,2023-01-02,India,
1,B,2023-01-04,India,
1,B,2023-02-11,India,


In [0]:
##deriving year,month and quarter sales 
from pyspark.sql.functions import month,year,quarter
sales_df=sales_df.withColumn("order_year",year(sales_df.order_date))
sales_df=sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df=sales_df.withColumn("order_quarter",quarter(sales_df.order_date))


In [0]:
display(sales_df)


product_id,customer_id,order_date,Location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,,2023,1,1
2,A,2022-01-01,India,,2022,1,1
2,A,2023-01-07,India,,2023,1,1
3,A,2023-01-10,India,,2023,1,1
3,A,2022-01-11,India,,2022,1,1
3,A,2023-01-11,India,,2023,1,1
2,B,2022-02-01,India,,2022,2,1
2,B,2023-01-02,India,,2023,1,1
1,B,2023-01-04,India,,2023,1,1
1,B,2023-02-11,India,,2023,2,1


In [0]:
sales_df=sales_df.drop('source_order')
sales_df.show()

+----------+-----------+----------+--------+----------+-----------+-------------+
|product_id|customer_id|order_date|Location|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      2023|          1|            1|
|         3|          A|2023-01-10|   India|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      2022|          1|            1|
|         3|          A|2023-01-11|   India|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      2023|          1|            1|
|         1|          B|2023-01-04|   India|      2023|          1|            1|
|         1|    

In [0]:
##menu orders details
from pyspark.sql.types import StructType,IntegerType,DateType 
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True),
    
])


In [0]:
df_menu=spark.read.format("csv").option("inferschema","True").schema(schema).load("/FileStore/tables/menu_csv-1.txt")
df_menu.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [0]:
##total amount spend by each customer
df=sales_df.join(df_menu,on="product_id",how="inner")

In [0]:
df.groupBy("customer_id").agg({"price":"sum"}).show()
display(df)

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          E|    2040.0|
|          B|    4440.0|
|          D|    1200.0|
|          C|    2400.0|
|          A|    4260.0|
+-----------+----------+



product_id,customer_id,order_date,Location,order_year,order_month,order_quarter,product_name,price
1,A,2023-01-01,India,2023,1,1,PIZZA,100
2,A,2022-01-01,India,2022,1,1,Chowmin,150
2,A,2023-01-07,India,2023,1,1,Chowmin,150
3,A,2023-01-10,India,2023,1,1,sandwich,120
3,A,2022-01-11,India,2022,1,1,sandwich,120
3,A,2023-01-11,India,2023,1,1,sandwich,120
2,B,2022-02-01,India,2022,2,1,Chowmin,150
2,B,2023-01-02,India,2023,1,1,Chowmin,150
1,B,2023-01-04,India,2023,1,1,PIZZA,100
1,B,2023-02-11,India,2023,2,1,PIZZA,100


In [0]:
##total amount spent by each food category
new_df=sales_df.join(df_menu,"product_id",how="inner")
new_df.show()
new_df.printSchema()

+----------+-----------+----------+--------+----------+-----------+-------------+------------+-----+
|product_id|customer_id|order_date|Location|order_year|order_month|order_quarter|product_name|price|
+----------+-----------+----------+--------+----------+-----------+-------------+------------+-----+
|         1|          A|2023-01-01|   India|      2023|          1|            1|       PIZZA|  100|
|         2|          A|2022-01-01|   India|      2022|          1|            1|     Chowmin|  150|
|         2|          A|2023-01-07|   India|      2023|          1|            1|     Chowmin|  150|
|         3|          A|2023-01-10|   India|      2023|          1|            1|    sandwich|  120|
|         3|          A|2022-01-11|   India|      2022|          1|            1|    sandwich|  120|
|         3|          A|2023-01-11|   India|      2023|          1|            1|    sandwich|  120|
|         2|          B|2022-02-01|   India|      2022|          2|            1|     Chowm

In [0]:
new_df.groupBy("product_name").agg({"price":"sum"}).orderBy("product_name").show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



In [0]:
##total amount of sales in each month
df_s=sales_df.join(df_menu,on="product_id",how="inner")
df_s.groupBy("order_month").agg({"price":"sum"}).orderBy("order_month").show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



In [0]:
##yearly sales
df_s.groupBy("order_year").agg({'price':"sum"}).orderBy("order_year").show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



In [0]:
##quartely sales
df_s.groupBy("ordeR_quarter").agg({"price":"sum"}).orderBy("order_quarter").show()

+-------------+----------+
|ordeR_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



In [0]:
##total amount of order by each category
df_s.groupBy("customer_id").agg({"product_id":"count"}).show()

+-----------+-----------------+
|customer_id|count(product_id)|
+-----------+-----------------+
|          E|               18|
|          B|               36|
|          D|               12|
|          C|               18|
|          A|               33|
+-----------+-----------------+



In [0]:
##how many times each product purchased
df_s.groupBy("product_name").agg({"product_id":"count"}).show()


+------------+-----------------+
|product_name|count(product_id)|
+------------+-----------------+
|       Pasta|                6|
|       PIZZA|               21|
|    sandwich|               48|
|     Biryani|                6|
|     Chowmin|               24|
|        Dosa|               12|
+------------+-----------------+



In [0]:
from pyspark.sql.functions import desc

df_s.groupBy("product_name").agg({"product_id": "count"}).alias("product_count")\
    .orderBy(desc("count(product_id)")).show(5)


+------------+-----------------+
|product_name|count(product_id)|
+------------+-----------------+
|    sandwich|               48|
|     Chowmin|               24|
|       PIZZA|               21|
|        Dosa|               12|
|       Pasta|                6|
+------------+-----------------+
only showing top 5 rows



In [0]:
##top ordered items
df_s.groupBy("product_name").agg({"product_id":"count"}).alias("product_count")\
    .orderBy(desc("count(product_id)")).show(1)

+------------+-----------------+
|product_name|count(product_id)|
+------------+-----------------+
|    sandwich|               48|
+------------+-----------------+
only showing top 1 row



In [0]:
#frequency of customers visited
df_s.groupBy("customer_id").agg({"product_id":"count"}).show()

+-----------+-----------------+
|customer_id|count(product_id)|
+-----------+-----------------+
|          E|               18|
|          B|               36|
|          D|               12|
|          C|               18|
|          A|               33|
+-----------+-----------------+



In [0]:
##how many times each product purchased
most_df=df_s.groupBy("product_id","product_name").agg({"product_id":"count"}).alias("product_count")\
    .orderBy("product_count",ascending=False).drop("product_id").show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3650431256076648>:2[0m
[1;32m      1[0m [38;5;66;03m##how many times each product purchased[39;00m
[0;32m----> 2[0m most_df[38;5;241m=[39m[43mdf_s[49m[38;5;241;43m.[39;49m[43mgroupBy[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mproduct_id[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mproduct_name[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43magg[49m[43m([49m[43m{[49m[38;5;124;43m"[39;49m[38;5;124;43mproduct_id[39;49m[38;5;124;43m"[39;49m[43m:[49m[38;5;124;43m"[39;49m[38;5;124;43mcount[39;49m[38;5;124;43m"[39;49m[43m}[49m[43m)[49m[38;5;241;43m.[39;49m[43malias[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mproduct_count[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      3[0m