# Loading Raw Data


In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA amazon_project")  
df_raw = spark.table("amazon_sales_raw")  

df_raw.printSchema()
df_raw.show(5)
df_raw.columns


root
 |-- Order_ID: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Unit_Price_INR: double (nullable = true)
 |-- Total_Sales_INR: double (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- Delivery_Status: string (nullable = true)
 |-- Review_Rating: long (nullable = true)
 |-- Review_Text: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)

+---------+----------+-----------+----------------+------------+--------+--------------+---------------+----------------+---------------+-------------+------------------+---------+-------+
| Order_ID|      Date|Customer_ID|Product_Category|Product_Name|Quantity|Unit_Price_INR|Total_Sales_INR|  Payment_Method|Delivery_Status|Review_Rating|       Review_Text|    State|Country|
+---------+----------

['Order_ID',
 'Date',
 'Customer_ID',
 'Product_Category',
 'Product_Name',
 'Quantity',
 'Unit_Price_INR',
 'Total_Sales_INR',
 'Payment_Method',
 'Delivery_Status',
 'Review_Rating',
 'Review_Text',
 'State',
 'Country']

#Cleaning & Feature Engineering
## Basic selection & type casting

In [0]:
from pyspark.sql import functions as F

df = df_raw.select(
    "Order_ID", "Date", "Customer_ID", "Product_Category", "Product_Name",
    "Quantity", "Unit_Price_INR", "Total_Sales_INR",
    "Payment_Method", "Delivery_Status",
    "Review_Rating", "Review_Text",
    "State", "Country"
)

# Cast numeric + date types
df = (
    df.withColumn("Quantity", F.col("Quantity").cast("int"))
      .withColumn("Unit_Price_INR", F.col("Unit_Price_INR").cast("double"))
      .withColumn("Total_Sales_INR", F.col("Total_Sales_INR").cast("double"))
      .withColumn("Review_Rating", F.col("Review_Rating").cast("double"))
      .withColumn("Date", F.to_date(F.col("Date")))  # assuming yyyy-MM-dd or similar
)

# Drop rows with critical missing values
df = df.dropna(subset=["Order_ID", "Date", "Quantity", "Unit_Price_INR"])


##Fix Total_Sales & create engineered columns 

Recompute Total_Sales_INR = Quantity * Unit_Price_INR when missing or wrong

Create Order_Year, Order_Month, Order_DayOfWeek

Is_Delivered flag

Review_Sentiment (Positive / Neutral / Negative)

Review_Text_Length

drop rows where Total_Sales_INR ended up null

In [0]:
# Recalculate total sales when null or zero
df = df.withColumn(
    "Total_Sales_INR",
    F.when(
        (F.col("Total_Sales_INR").isNull()) | (F.col("Total_Sales_INR") == 0),
        F.col("Quantity") * F.col("Unit_Price_INR")
    ).otherwise(F.col("Total_Sales_INR"))
)

# Date-based fields
df = df.withColumn("Order_Year", F.year("Date"))
df = df.withColumn("Order_Month", F.date_format("Date", "yyyy-MM"))
df = df.withColumn("Order_DayOfWeek", F.date_format("Date", "E"))  # Mon, Tue, ...

# Delivery status flags
df = df.withColumn(
    "Is_Delivered",
    F.when(F.lower("Delivery_Status").like("%delivered%"), 1).otherwise(0)
)

# Rating-based sentiment (simple)
df = df.withColumn(
    "Review_Sentiment",
    F.when(F.col("Review_Rating") >= 4, "Positive")
     .when(F.col("Review_Rating") <= 2, "Negative")
     .otherwise("Neutral")
)

# Review text length (for later analysis)
df = df.withColumn(
    "Review_Text_Length",
    F.length(F.col("Review_Text"))
)

# drop rows where Total_Sales_INR ended up null
df = df.dropna(subset=["Total_Sales_INR"])
df.show(5)
df.printSchema() 

+---------+----------+-----------+----------------+------------+--------+--------------+---------------+----------------+---------------+-------------+------------------+---------+-------+----------+-----------+---------------+------------+----------------+------------------+
| Order_ID|      Date|Customer_ID|Product_Category|Product_Name|Quantity|Unit_Price_INR|Total_Sales_INR|  Payment_Method|Delivery_Status|Review_Rating|       Review_Text|    State|Country|Order_Year|Order_Month|Order_DayOfWeek|Is_Delivered|Review_Sentiment|Review_Text_Length|
+---------+----------+-----------+----------------+------------+--------+--------------+---------------+----------------+---------------+-------------+------------------+---------+-------+----------+-----------+---------------+------------+----------------+------------------+
|ORD100000|2025-01-25|   CUST2796|  Home & Kitchen|Cookware Set|       2|      25574.41|       51148.82|     Credit Card|       Returned|          1.0|    Waste of money

# Save Cleaned Data

In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA amazon_project")

df.write.format("delta").mode("overwrite").saveAsTable(
    "workspace.amazon_project.amazon_sales_clean"
)

print("Saved cleaned table as workspace.amazon_project.amazon_sales_clean")


Saved cleaned table as workspace.amazon_project.amazon_sales_clean


# Reading Clean Data

In [0]:
df_clean = spark.table("workspace.amazon_project.amazon_sales_clean")
df_clean.show(5)
df_clean.printSchema()

+---------+----------+-----------+----------------+------------+--------+--------------+---------------+----------------+---------------+-------------+------------------+---------+-------+----------+-----------+---------------+------------+----------------+------------------+
| Order_ID|      Date|Customer_ID|Product_Category|Product_Name|Quantity|Unit_Price_INR|Total_Sales_INR|  Payment_Method|Delivery_Status|Review_Rating|       Review_Text|    State|Country|Order_Year|Order_Month|Order_DayOfWeek|Is_Delivered|Review_Sentiment|Review_Text_Length|
+---------+----------+-----------+----------------+------------+--------+--------------+---------------+----------------+---------------+-------------+------------------+---------+-------+----------+-----------+---------------+------------+----------------+------------------+
|ORD100000|2025-01-25|   CUST2796|  Home & Kitchen|Cookware Set|       2|      25574.41|       51148.82|     Credit Card|       Returned|          1.0|    Waste of money

# Notebook Analysis
## Basic sales stats

In [0]:
df_clean.select(
    "Quantity", "Unit_Price_INR", "Total_Sales_INR", "Review_Rating"
).describe().show()


+-------+------------------+------------------+-----------------+------------------+
|summary|          Quantity|    Unit_Price_INR|  Total_Sales_INR|     Review_Rating|
+-------+------------------+------------------+-----------------+------------------+
|  count|             15000|             15000|            15000|             15000|
|   mean|2.9846666666666666| 24955.31371533341|74544.12023333332|3.0401333333333334|
| stddev|1.4228257744483424|14401.316925461231|59369.65415454544|1.4110476200024644|
|    min|                 1|            202.57|           204.05|               1.0|
|    max|                 5|          49994.43|         249955.5|               5.0|
+-------+------------------+------------------+-----------------+------------------+



## Sales by Product Category

In [0]:
sales_by_cat = (
    df_clean.groupBy("Product_Category")
            .agg(
                F.round(F.sum("Total_Sales_INR"), 2).alias("total_revenue"),
                F.countDistinct("Order_ID").alias("num_orders")
            )
            .orderBy(F.col("total_revenue").desc())
)

display(sales_by_cat)


Product_Category,total_revenue,num_orders
Beauty,227489624.68,2997
Electronics,226564923.33,3036
Books,224999226.56,3035
Clothing,222409335.83,3022
Home & Kitchen,216698693.1,2910


## Sales by State (geographic flavor)

In [0]:
sales_by_state = (
    df_clean.groupBy("State")
            .agg(
                F.round(F.sum("Total_Sales_INR"), 2).alias("total_revenue"),
                F.countDistinct("Order_ID").alias("num_orders")
            )
            .orderBy(F.col("total_revenue").desc())
)

display(sales_by_state)


State,total_revenue,num_orders
Sikkim,43113469.51,596
Rajasthan,42906175.08,568
Chhattisgarh,42857545.27,556
Meghalaya,42773152.96,559
Tamil Nadu,41967968.99,567
Uttar Pradesh,41690917.07,551
Bihar,41669240.44,549
West Bengal,41195932.45,528
Tripura,41103376.81,540
Odisha,40924381.38,542


## Overall notebook chart: Monthly Revenue Trend

In [0]:
df_with_month = df_clean.withColumn(
    "Order_Month_Date",
    F.date_trunc("month", F.col("Date"))
).withColumn(
    "Order_Month_Label",
    F.date_format(F.col("Order_Month_Date"), "yyyy-MM")
)

monthly_revenue = (
    df_with_month.groupBy("Order_Month_Date", "Order_Month_Label")
        .agg(
            F.round(F.sum("Total_Sales_INR"), 2).alias("total_revenue"),
            F.countDistinct("Order_ID").alias("num_orders")
        )
        .orderBy("Order_Month_Date")
)

display(monthly_revenue)



Order_Month_Date,Order_Month_Label,total_revenue,num_orders
2025-01-01T00:00:00.000Z,2025-01,92051785.03,1276
2025-02-01T00:00:00.000Z,2025-02,84995760.62,1183
2025-03-01T00:00:00.000Z,2025-03,93064349.39,1226
2025-04-01T00:00:00.000Z,2025-04,91388990.14,1203
2025-05-01T00:00:00.000Z,2025-05,97195848.48,1267
2025-06-01T00:00:00.000Z,2025-06,89730685.72,1225
2025-07-01T00:00:00.000Z,2025-07,95176904.42,1306
2025-08-01T00:00:00.000Z,2025-08,97576563.09,1312
2025-09-01T00:00:00.000Z,2025-09,91225555.74,1248
2025-10-01T00:00:00.000Z,2025-10,93478344.05,1233


Databricks visualization. Run in Databricks to view.