<img src="https://www.marketing91.com/wp-content/uploads/2018/10/Sales-Analysis-2.jpg" alt="analysis" width="500">


### Import Libraries

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType 

### Define schema of data

In [0]:
# Define schema
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True), # 'A' is a string
    StructField("order_date", DateType(), True),  # Consider DateType() if it's a real date
    StructField("location", StringType(), True),  # Value is string
    StructField("source_order", StringType(), True) # Swiggy is a string
])

### Print full schema object

In [0]:
# print(schema)
print(schema.json())

### Formatted output

In [0]:
for field in schema.fields:
    print(f"{field.name}: {field.dataType} (nullable={field.nullable})")

### Read CSV with defined schema

In [0]:
# Read CSV with defined schema

sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/Volumes/workspace/default/sales_data_visualpath/sales.csv")

### Print dataframe

In [0]:
# Display DataFrame

display(sales_df)

In [0]:


from pyspark.sql.functions import month, quarter
 
# Extract month from 'order_date' and add as new column

sales_df = sales_df.withColumn("order_month", month(sales_df.order_date))
 
# Extract quarter from 'order_date' and add as new column

sales_df = sales_df.withColumn("order_quarter", quarter(sales_df.order_date))
 
from pyspark.sql.functions import year
 
sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
 

In [0]:
display(sales_df)

In [0]:
sales_df.filter((sales_df.order_year == 2023) & (sales_df.order_quarter ==1)).show()

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
 
# Define schema for menu

schema = StructType([

    StructField("product_id", IntegerType(), True),

    StructField("product_name", StringType(), True),

    StructField("price", StringType(), True)

])
 

 

In [0]:
print(schema.json())

In [0]:
# Read the file into a DataFrame

menu_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/Volumes/workspace/default/sales_data_visualpath/menu.csv")
 
# Display the DataFrame

display(menu_df)

In [0]:
menu_df.groupBy("price").count().show()

In [0]:
menu_df.orderBy(menu_df.price.desc()).show()

## ### Total Amount Spent by Each Customer

In [0]:
total_amount_spent = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('customer_id')
            .agg({'price': 'sum'})
            .orderBy('customer_id')
)
display(total_amount_spent)

### Total Amount Spent by Each Product Category

In [0]:
total_amount_spent = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('product_name')
            .agg({'price': 'sum'})
            .orderBy('product_name')
)
display(total_amount_spent)

Databricks visualization. Run in Databricks to view.

### Total amt spend

In [0]:
from pyspark.sql.functions import col, sum
 
# Join and aggregate total amount spent per customer

total_amount_spent = (

    sales_df.join(menu_df, 'product_id')
    .groupBy('customer_id')
    .agg(sum(col('price').cast('int')).alias('total_spent'))
    .orderBy('customer_id')
)
 
# Display the result

display(total_amount_spent)

 

### Totall amount of sales in each month

In [0]:
#Total Amount of Sales in Each Month
monthly_sales = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('order_year', 'order_month')
            .agg(sum(col('price').cast('int')).alias('total_sales'))
            .orderBy('order_year', 'order_month')
)
display(monthly_sales)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

## Year sales

In [0]:
#Total Amount of Sales in Each Month
yearly_sales = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('order_year')
            .agg(sum(col('price').cast('int')).alias('year_wise_total_sales'))
            .orderBy('order_year')
)
display(yearly_sales)

### Total no of orders by product name (cathgory)

In [0]:
orders_by_category = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('product_name')
            .count()
            .orderBy('count', ascending=False)
)
display(orders_by_category)

Databricks visualization. Run in Databricks to view.

### How many time each product was purchaesd

In [0]:
from pyspark.sql.functions import count
 
most_df = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('product_id', 'product_name')
            .agg(count('product_id').alias('product_count'))
            .orderBy('product_count', ascending=False)
)
 
# Correct display function
display(most_df)

Databricks visualization. Run in Databricks to view.

## Top 5 order item

In [0]:
top_5_items = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('product_name')
            .count()
            .orderBy('count', ascending=False)
            .limit(5)
)
display(top_5_items)

Databricks data profile. Run in Databricks to view.

### All time top selling product

In [0]:
top_ordered = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('product_name')
            .agg({'product_id': 'count'})
            .withColumnRenamed('count(product_id)', 'total_orders')
            .orderBy('total_orders', ascending=False)
)
display(top_ordered)

Databricks visualization. Run in Databricks to view.

### find the top-selling product per week

### 

In [0]:
from pyspark.sql.functions import count, weekofyear, col

# Add week of year from the order date
weekly_sales_df = (
    sales_df.join(menu_df, 'product_id')
            .withColumn("week", weekofyear(col("order_date")))
            .groupBy("week", "product_id", "product_name")
            .agg(count("product_id").alias("product_count"))
            .orderBy("week", "product_count", ascending=[True, False])
)

# Display the weekly top-selling products
display(weekly_sales_df)


In [0]:
display(sales_df)

### No of Orders Place or Freq of customer visted

In [0]:
customer_frequency = (
    sales_df.groupBy('customer_id')
            .count()
            .orderBy('count', ascending=False)
)
display(customer_frequency)

Databricks visualization. Run in Databricks to view.

### Total sales by each country

In [0]:
sales_by_country = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('location')
            .agg({'price': 'sum'})
            .orderBy('sum(price)', ascending=False)
)
display(sales_by_country)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### Total sales by source order

In [0]:
sales_by_source = (
    sales_df.join(menu_df, 'product_id')
            .groupBy('source_order')
            .agg({'price': 'sum'})
            .orderBy('sum(price)', ascending=False)
)
display(sales_by_source)

Databricks visualization. Run in Databricks to view.

Databricks data profile. Run in Databricks to view.