In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()

# Read Data

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType

schema  = StructType([
  StructField("Product_id",IntegerType(),True),
  StructField("Customer_id",StringType(),True),
  StructField("Order_date",DateType(),True),
  StructField("Location",StringType(),True),
  StructField("Source_order",StringType(),True),
])

df = spark.read.format("csv")\
  .option("inferSchema", "true")\
  .schema(schema)\
  .load("/Volumes/pyspark_project/project1/projectvolume/SalesData/")
display(df)
df.printSchema()

schema1 = StructType([
  StructField("Product_id",IntegerType(),True),
  StructField("Product_name",StringType(),True),
  StructField("Price",StringType(),True)])

df1 = spark.read.format("csv")\
  .option("inferSchema", "true")\
  .schema(schema1)\
  .load("/Volumes/pyspark_project/project1/projectvolume/MenuData/")
df1.show()
df1.printSchema()


### count to number of duplicates rows

In [0]:
from pyspark.sql.functions import count
duplicate_count = (
    df.groupBy(df.columns)
      .count()
      .filter("count > 1")
      .count()
)
print(duplicate_count)

### drop duplicates rows

In [0]:
df=df.dropDuplicates()

In [0]:
from pyspark.sql.functions import count
duplicate_count = (
    df.groupBy(df.columns)
      .count()
      .filter("count > 1")
      .count()
)
print(duplicate_count)

### check for Null values

In [0]:
from pyspark.sql.functions import col, sum, when

null_counts = df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

null_counts.show()

In [0]:
from pyspark.sql.functions import col

null_rows = df.filter(
    " OR ".join([f"{c} IS NULL" for c in df.columns])
)

null_rows.show(truncate=False)


### drop null rows

In [0]:
df = df.dropna()

In [0]:
from pyspark.sql.functions import col, sum, when

null_counts = df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

null_counts.show()

In [0]:
display(df)

### extractction of  year,monnth,quarter from order date column

In [0]:
from pyspark.sql.functions import year,month,quarter
from pyspark.sql import functions as F   
df = df.withColumn("year",year(df.Order_date))\
       .withColumn("month",month(df.Order_date))\
       .withColumn("quarter",quarter(df.Order_date))\
       .withColumn("day_name",F.date_format("Order_date","EEEE"))\
       .withColumn("month_name",F.date_format("Order_date","MMMM"))
display(df)


# join

In [0]:
sales_df = df.join(df1,df.Product_id==df1.Product_id,"inner")
display(sales_df)


In [0]:
sales_df.printSchema()

In [0]:
sales_df = sales_df.withColumn("Price",col("Price").cast("double"))
sales_df.printSchema()


In [0]:
# KPI

# Total Amount spent by each customer
# Total Amount spent by each food category
# Total Amount of sales in each month
# Yearly sales
# Quaterly sales
# total number of order by ach category
# Top 5 ordered items
# Top ordered items
# frequecy of customer visited
# Total sales by each country
# total sales by order source

# **KPI**

### Total Amount spent by each customer

In [0]:
from pyspark.sql.functions import col
kpi1= sales_df.groupBy("Customer_id")\
    .agg(sum("Price").alias("Total_amount"))\
        .orderBy(col("Customer_id"))
display(kpi1)

Databricks visualization. Run in Databricks to view.

### Total Amount spent by each food category

In [0]:
from pyspark.sql.functions import col
kpi2 = sales_df.groupBy("Product_name")\
    .agg(sum("Price").alias("Total_amount"))\
    .orderBy(col("Total_amount").desc())
display(kpi2)

Databricks visualization. Run in Databricks to view.

### Total Amount of sales in each month

In [0]:
kpi3 = sales_df.groupBy("month_name").agg(sum("Price").alias("Total_amount"))\
    .orderBy(col("Total_amount").desc())
display(kpi3)

Databricks visualization. Run in Databricks to view.

### Yearly sales

In [0]:
kpi4 = sales_df.groupBy("year").agg(sum("Price").alias("Total_amount"))\
    .orderBy(col("Total_amount").desc())
display(kpi4)

Databricks visualization. Run in Databricks to view.

### Quaterly sales

In [0]:
kpi5 = sales_df.groupBy("quarter").agg(sum("Price").alias("Total_amount"))\
    .orderBy(col("Total_amount").desc())
display(kpi5)

Databricks visualization. Run in Databricks to view.

### total number of order by ach category

In [0]:
kpi6 = sales_df.groupBy("Product_name").agg(count("Customer_id").alias("Total_No_of_Customer"))\
    .orderBy(col("Total_No_of_Customer").desc())
display(kpi6)

Databricks visualization. Run in Databricks to view.

### Top 5 ordered items