In [1]:
import findspark

In [2]:
findspark.find()

'C:\\spark\\spark-3.5.0-bin-hadoop3'

In [3]:
findspark.init()

In [4]:
import pyspark
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DateType
from pyspark.sql.functions import *
from pyspark import RDD

In [5]:
#Create SparkSession
spark = SparkSession.\
builder.\
appName("sparksql1").\
getOrCreate()

In [6]:
sales='C:/Users/DELL/Documents/pyspark/sales.csv.txt'
menu='C:/Users/DELL/Documents/pyspark/menu.csv.txt'

In [7]:
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
])


In [8]:
data_1 = spark.read.csv("sales.csv.txt",sep = ",",schema = schema)

In [9]:
data_1.show(3)

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
+----------+-----------+----------+--------+------------+
only showing top 3 rows



In [10]:
# deriving year,month,quarter
from pyspark.sql import functions as F

data_1= data_1.select(
    '*',
    F.year('order_date').alias('order_year'),
    F.month('order_date').alias('order_month'),
    F.quarter('order_date').alias('order_quarter')
)
data_1.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

In [11]:
# Using menu dataframe
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True)
])
data_2 = spark.read.csv("menu.csv.txt",sep = ",",schema = schema)
data_2.show(5)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
+----------+------------+-----+
only showing top 5 rows



In [12]:
# Total Amount spent by each customer
total_amount_spent=(data_1.join(data_2,"product_id").\
                    groupBy("customer_id").\
                    agg({'price':'sum'}).\
                    orderBy("customer_id")
                   )
total_amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



In [13]:
# Total amount of sales in each month
df_1=(data_1.join(data_2,"product_id").\
                    groupBy("order_month").\
                    agg({'price':'sum'}).\
                    orderBy("order_month")
                   )
df_1.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



In [14]:
# Yearly Sales
df_2=(data_1.join(data_2,"product_id").\
                    groupBy("order_year").\
                    agg({'price':'sum'}).\
                    orderBy("order_year")
                   )
df_2.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



In [15]:
# Quaterly Sales
df_3=(data_1.join(data_2,"product_id").\
                    groupBy("order_quarter").\
                    agg({'price':'sum'}).\
                    orderBy("order_quarter")
                   )
df_3.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



In [16]:
# How many times each product has been purchased
from pyspark.sql.functions import count
df_4=(data_1.join(data_2,"product_id").\
                    groupBy("product_id","product_name").\
                    agg(count("product_id").alias("product_count")).\
                    orderBy("product_count",ascending=0).\
                    drop("product_id")
                   )
df_4.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



In [17]:
# Top 5 ordered items
df_4=(data_1.join(data_2,"product_id").\
                    groupBy("product_id","product_name").\
                    agg(count("product_id").alias("product_count")).\
                    orderBy("product_count",ascending=0).\
                    drop("product_id").\
                    limit(5)
                   )
df_4.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
+------------+-------------+



In [18]:
# Top ordered item
df_4=(data_1.join(data_2,"product_id").\
                    groupBy("product_id","product_name").\
                    agg(count("product_id").alias("product_count")).\
                    orderBy("product_count",ascending=0).\
                    drop("product_id").\
                    limit(1)
                   )
df_4.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
+------------+-------------+



In [19]:
# Frequency of customer of vising the restaurant
from pyspark.sql.functions import countDistinct
df_5=(data_1.filter(data_1.source_order=='Restaurant').\
                   groupBy("customer_id").\
                   agg(countDistinct("order_date").alias("Frequency_of_visit"))
                   )
df_5.show()

+-----------+------------------+
|customer_id|Frequency_of_visit|
+-----------+------------------+
|          E|                 5|
|          B|                 6|
|          D|                 1|
|          C|                 3|
|          A|                 6|
+-----------+------------------+



In [20]:
# Total sales by each country
df_6=(data_1.join(data_2,"product_id").\
                    groupBy("location").\
                    agg({'price':'sum'})
                   )
df_6.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+



In [21]:
# Total sales by order source
df_7=(data_1.join(data_2,"product_id").\
                    groupBy("source_order").\
                    agg({'price':'sum'})
                   )
df_7.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    6330.0|
|  Restaurant|    3090.0|
+------------+----------+

