In [0]:
import pyspark
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('Distinct').getOrCreate()

In [0]:
spark

In [0]:

from pyspark.sql.types import *
orderSchema = StructType([
    StructField('ProductID', IntegerType(),True),
    StructField('CustomerID',StringType(),True),
    StructField('OrderDate', DateType(),True),
    StructField('Location', StringType(),True),
    StructField('SourceOrder',StringType(),True),  

]

)
sales_df = spark.read.format("csv").option("header","true").schema(orderSchema).load("/FileStore/tables/sales_csv.txt")
display(sales_df)

ProductID,CustomerID,OrderDate,Location,SourceOrder
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy
3,B,2023-01-16,India,zomato


In [0]:
sales_df.printSchema

Out[7]: <bound method DataFrame.printSchema of DataFrame[ProductID: int, CustomerID: string, OrderDate: date, Location: string, SourceOrder: string]>

In [0]:
from pyspark.sql.functions import *
sales_df =  sales_df.withColumn('Month',month(col('OrderDate')))
display(sales_df)

ProductID,CustomerID,OrderDate,Location,SourceOrder,Month
2,A,2022-01-01,India,Swiggy,1
2,A,2023-01-07,India,Swiggy,1
3,A,2023-01-10,India,Restaurant,1
3,A,2022-01-11,India,Swiggy,1
3,A,2023-01-11,India,Restaurant,1
2,B,2022-02-01,India,Swiggy,2
2,B,2023-01-02,India,Swiggy,1
1,B,2023-01-04,India,Restaurant,1
1,B,2023-02-11,India,Swiggy,2
3,B,2023-01-16,India,zomato,1


In [0]:
sales_df = sales_df.withColumn('Quater',quarter(col('OrderDate')))
display(sales_df)

ProductID,CustomerID,OrderDate,Location,SourceOrder,Month,Year,Quater
2,A,2022-01-01,India,Swiggy,1,2022,1
2,A,2023-01-07,India,Swiggy,1,2023,1
3,A,2023-01-10,India,Restaurant,1,2023,1
3,A,2022-01-11,India,Swiggy,1,2022,1
3,A,2023-01-11,India,Restaurant,1,2023,1
2,B,2022-02-01,India,Swiggy,2,2022,1
2,B,2023-01-02,India,Swiggy,1,2023,1
1,B,2023-01-04,India,Restaurant,1,2023,1
1,B,2023-02-11,India,Swiggy,2,2023,1
3,B,2023-01-16,India,zomato,1,2023,1


In [0]:
from pyspark.sql.types import *
menuSchema = StructType([
    StructField('ProductID', IntegerType(),True),
    StructField('ProductName',StringType(),True),
    StructField('Price', FloatType(),True)

]

)
menu_df = spark.read.format("csv").option("header","true").schema(menuSchema).load("/FileStore/tables/menu_csv.txt")
display(menu_df)

ProductID,ProductName,Price
2,Chowmin,150.0
3,sandwich,120.0
4,Dosa,110.0
5,Biryani,80.0
6,Pasta,180.0


In [0]:
Customer_spending = (sales_df.join(menu_df,'ProductID').groupBy('CustomerID').agg({'Price':'sum'}).orderBy('CustomerID'))
display(Customer_spending)

CustomerID,sum(Price)
A,3960.0
B,3240.0
C,1800.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
Category_Sales = (sales_df.join(menu_df,'ProductID').groupBy('ProductName').agg({'Price':'sum'}).orderBy('ProductName'))
display(Category_Sales)

ProductName,sum(Price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
Monthly_Sale = (sales_df.join(menu_df,'ProductID').groupBy('Month').agg({'Price':'sum'}).orderBy('Month'))
display(Monthly_Sale)

Month,sum(Price)
1,2460.0
2,2430.0
3,810.0
5,2460.0
6,2460.0
7,810.0
11,810.0


Databricks visualization. Run in Databricks to view.

In [0]:
Quaterly_sale = (sales_df.join(menu_df,'ProductID').groupBy('Quater').agg({'Price':'sum'}).orderBy('Quater'))
display(Quaterly_sale)

Quater,sum(Price)
1,5700.0
2,4920.0
3,810.0
4,810.0


Databricks visualization. Run in Databricks to view.

In [0]:
Yearly_sale = (sales_df.join(menu_df,'ProductID').groupBy('Year').agg({'Price':'sum'}).orderBy('Year'))
display(Yearly_sale)

Year,sum(Price)
2022,4350.0
2023,7890.0


Databricks visualization. Run in Databricks to view.

In [0]:
SourceOrder_sales = (sales_df.join(menu_df,'ProductID').groupBy('SourceOrder').agg({'Price':'sum'}).orderBy('SourceOrder'))
display(SourceOrder_sales)

SourceOrder,sum(Price)
Restaurant,2490.0
Swiggy,4830.0
zomato,4920.0


Databricks visualization. Run in Databricks to view.

In [0]:
Location_sales = (sales_df.join(menu_df,'ProductID').groupBy('Location').agg({'Price':'sum'}).orderBy('Location'))
display(Location_sales)

Location,sum(Price)
India,3960.0
UK,6120.0
USA,2160.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count
ordered_product= (sales_df.join(menu_df,'ProductID')
                       .groupBy('ProductID','ProductName')
                       .agg(count('ProductID').alias('Product_Count'))
                       .orderBy('Product_Count',ascending=0))



display(ordered_product)

ProductID,ProductName,Product_Count
3,sandwich,48
2,Chowmin,24
4,Dosa,12
5,Biryani,6
6,Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

restaurant_order = (sales_df.filter(sales_df.SourceOrder == 'Restaurant')
                    .groupBy('CustomerID')
                    .agg(countDistinct('OrderDate').alias('Customer visits to Restaurant')))


display(restaurant_order)

CustomerID,Customer visits to Restaurant
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.