In [0]:
# Creating sales dataframe

from pyspark.sql.types import *

schema = StructType([
    StructField('product_id', IntegerType(), True),
    StructField('customer_id',StringType(),True),
    StructField('order_date',DateType(),True),
    StructField('location',StringType(),True),
    StructField('source_order',StringType(),True)
])

df = spark.read.format('csv').option('inferschema','true').schema(schema).load('/Volumes/workspace/schema/streaming/sales.csv.txt')

In [0]:
df.limit(5).display()

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1


In [0]:
# Extracting year, month and quarter from order_date
from pyspark.sql.functions import *
df = df.withColumn('order_year',year(df.order_date))
df = df.withColumn('order_month',month(df.order_date))
df = df.withColumn('order_quarter',quarter(df.order_date))
df.display()

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
# Creating menu dataframe

schema_menu = StructType([
    StructField('product_id',IntegerType(),True),
    StructField('product_name',StringType(),True),
    StructField('price',StringType(),True)
])

df2 = spark.read.format('csv').option('inferschema','true').schema(schema_menu).load('/Volumes/workspace/schema/streaming/menu.csv.txt')
df2.display()

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# total amount spend by each customer
total_amt_spend = (df.join(df2,'product_id').groupBy('customer_id').agg(sum('price')).orderBy('customer_id'))
display(total_amt_spend)

customer_id,sum(price)
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


In [0]:
df.select('customer_id').distinct().display()

customer_id
A
C
B
E
D


In [0]:
# total amount spend by each food category

total_food_spend = (
    df.join(df2,'product_id')
    .groupBy('product_name')
    .agg(
        sum('price').alias('Total sum of product'),
        count('*').alias('Number of orders')
        )
    .orderBy('product_name')).select('product_name','Total sum of product','Number of orders')

final = total_food_spend.join(df2,'product_name').select('product_name','Total sum of product','Number of orders','price')    
display(final)

product_name,Total sum of product,Number of orders,price
sandwich,5760.0,48,120
Dosa,1320.0,12,110
Biryani,480.0,6,80
PIZZA,2100.0,21,100
Chowmin,3600.0,24,150
Pasta,1080.0,6,180


In [0]:
df.filter( col('product_id') == 5 ).display()

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
5,D,2022-02-01,UK,zomato,2022,2,1
5,D,2023-03-01,USA,zomato,2023,3,1
5,D,2022-02-05,UK,zomato,2022,2,1
5,D,2023-07-05,USA,zomato,2023,7,3
5,D,2022-02-06,UK,zomato,2022,2,1
5,D,2023-11-06,USA,zomato,2023,11,4


In [0]:
# total amount of sales in each month
total_sales_month = df.join(df2,'product_id')\
                    .groupBy('order_month')\
                    .agg(sum('price').alias('Total monthly sales'))\
                    .orderBy('order_month')
display(total_sales_month)      

order_month,Total monthly sales
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


In [0]:
# yearly sales
yearly_sales = df.join(df2,'product_id')\
    .groupBy('order_year')\
    .agg(sum('price').alias('Yearly Sales'))\
    .orderBy('order_year')
display(yearly_sales)    

order_year,Yearly Sales
2022,4350.0
2023,9990.0


In [0]:
temp = df.join(df2,'product_id').groupBy('source_order','customer_id').agg(count('*')).orderBy('source_order','customer_id')
temp2 = temp.groupBy('source_order').pivot('customer_id').agg(sum('count(1)')).orderBy('source_order')
display(temp2)

source_order,A,B,C,D,E
Restaurant,9,6,3,3.0,6
Swiggy,21,18,6,,6
zomato,3,12,9,9.0,6


In [0]:
# total sales by each country
df.join(df2,'product_id')\
    .groupBy('location')\
    .agg(
        sum('price').alias('Total sales by each country'),
        count('*').alias('Total Orders')).orderBy('location').display()

location,Total sales by each country,Total Orders
India,4860.0,39
UK,7020.0,57
USA,2460.0,21


In [0]:
# total sales by order_source
df.join(df2,'product_id')\
    .groupBy('source_order')\
    .agg(sum('price').alias('Total sales by order_source'))\
    .orderBy('source_order').display()        

source_order,Total sales by order_source
Restaurant,3090.0
Swiggy,6330.0
zomato,4920.0


In [0]:
df.select('product_id').distinct().display()

product_id
1
4
6
3
2
5
