In [44]:
df = spark.read.csv("Amazon Sale Report(in).csv", header=True, inferSchema=True)
print(f"Loaded {df.count()} rows")
df.show(5)


Loaded 128975 rows
+-----+-------------------+---------+--------------------+----------+--------------+------------------+-------+---------------+-------------+----+----------+--------------+---+--------+------+-----------+-----------+----------------+------------+--------------------+-----+------------+----+
|index|           Order ID|     Date|              Status|Fulfilment|Sales Channel |ship-service-level|  Style|            SKU|     Category|Size|      ASIN|Courier Status|Qty|currency|Amount|  ship-city| ship-state|ship-postal-code|ship-country|       promotion-ids|  B2B|fulfilled-by|_c23|
+-----+-------------------+---------+--------------------+----------+--------------+------------------+-------+---------------+-------------+----+----------+--------------+---+--------+------+-----------+-----------+----------------+------------+--------------------+-----+------------+----+
|    0|405-8078784-5731545|4/30/2022|           Cancelled|  Merchant|     Amazon.in|          Standard| S

In [53]:
import os

# Set Hadoop home
os.environ['HADOOP_HOME'] = r'C:\hadoop'
os.environ['PATH'] += r';C:\hadoop\bin'
# Restart Spark with Hadoop configured
spark = SparkSession.builder \
    .appName("Sales") \
    .master("local[2]") \
    .getOrCreate()

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("Sales_ETL") \
    .master("local[2]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print("✓ Spark started")


✓ Spark started


In [49]:
df_clean = df.dropDuplicates(['Order ID']) \
    .fillna({'Amount': 0, 'Qty': 0, 'ship-state': 'Unknown'}) \
    .dropna(subset=['Order ID', 'Date'])

print(f"Clean rows: {df_clean.count()}")


Clean rows: 68930


In [48]:
import pandas as pd #cleaning first in pandas
data = pd.read_csv("Amazon Sale Report(in).csv")
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')  
data['YearMonth'] = data['Date'].dt.to_period('M').astype(str)


data.to_csv("Amazon_Sales_Clean.csv", index=False)

#loading in pyspark
df = spark.read.csv("Amazon_Sales_Clean.csv", header=True, inferSchema=True)  #saving to csv (imitates parquet)
df.show(5)

  data = pd.read_csv("Amazon Sale Report(in).csv")


+-----+-------------------+----------+--------------------+----------+--------------+------------------+-------+---------------+-------------+----+----------+--------------+---+--------+------+-----------+-----------+----------------+------------+--------------------+-----+------------+-----------+---------+
|index|           Order ID|      Date|              Status|Fulfilment|Sales Channel |ship-service-level|  Style|            SKU|     Category|Size|      ASIN|Courier Status|Qty|currency|Amount|  ship-city| ship-state|ship-postal-code|ship-country|       promotion-ids|  B2B|fulfilled-by|Unnamed: 23|YearMonth|
+-----+-------------------+----------+--------------------+----------+--------------+------------------+-------+---------------+-------------+----+----------+--------------+---+--------+------+-----------+-----------+----------------+------------+--------------------+-----+------------+-----------+---------+
|    0|405-8078784-5731545|2022-04-30|           Cancelled|  Merchant|

In [55]:
monthly_revenue = df.groupBy('YearMonth') \
    .agg(_sum('Amount').alias('Revenue')) \
    .orderBy(col('Revenue').desc())

monthly_revenue.toPandas().to_csv('output/monthly_revenue.csv', index=False)

monthly_revenue.show()

+---------+--------------------+
|YearMonth|             Revenue|
+---------+--------------------+
|      NaT| 3.347653045999995E7|
|  2022-04|1.7516195560000014E7|
|  2022-05|1.4987650069999937E7|
|  2022-06|1.2510618359999996E7|
|  2022-03|  101683.84999999998|
+---------+--------------------+



In [57]:
region_sales = df_clean.groupBy('ship-state') \
    .agg(_sum('Amount').alias('Revenue'),
         count('Order ID').alias('Orders'),
         avg('Amount').alias('Avg_Order_Value')) \
    .orderBy(col('Revenue').desc())

region_sales.toPandas().to_csv('output/region_sales.csv', index=False)  #saving to csv (imitates parquet)

print("---TOP 10 STATES BY REVENUE---")
region_sales.show(10)


---TOP 10 STATES BY REVENUE---
+--------------+------------------+------+-----------------+
|    ship-state|           Revenue|Orders|  Avg_Order_Value|
+--------------+------------------+------+-----------------+
|   MAHARASHTRA| 7052615.170000002| 11776|598.8973479959241|
|     KARNATAKA| 5621026.809999999|  9226|609.2593550834597|
|     TELANGANA|3724120.5600000005|  6110|609.5123666121114|
| UTTAR PRADESH|3628635.6500000022|  5698|636.8261934011938|
|    TAMIL NADU| 3422890.340000001|  5990|571.4341135225377|
|         DELHI|        2259015.99|  3589|629.4276929506827|
|        KERALA|2104301.7700000005|  3588|586.4832134894093|
|   WEST BENGAL|1881120.1199999992|  3129|601.1889165867686|
|ANDHRA PRADESH|1699258.1999999997|  2859|594.3540398740818|
|       HARYANA|1587312.8399999994|  2465|643.9403002028395|
+--------------+------------------+------+-----------------+
only showing top 10 rows


In [66]:
# Overall aov
aov_overall = df_clean.agg(
    avg('Amount').alias('Avg_Order_Value'),
    _sum('Amount').alias('Total_Revenue'),
    count('Order ID').alias('Total_Orders')
)

# aov by Category
aov_category = df_clean.groupBy('Category') \
    .agg(avg('Amount').alias('Avg_Order_Value'),
         count('Order ID').alias('Orders')) \
    .orderBy(col('Avg_Order_Value').desc())

aov_category.toPandas().to_csv('output/avg_order_value.csv', index=False)  #saving to csv (imitates parquet)
aov_overall.toPandas().to_csv('output/avg_order_value.csv', index=False)   #saving to csv (imitates parquet)

print("=== AVERAGE ORDER VALUE (OVERALL) ===")
aov_overall.show()
print("\n=== AVERAGE ORDER VALUE BY CATEGORY ===")
aov_category.show()


=== AVERAGE ORDER VALUE (OVERALL) ===
+-----------------+--------------------+------------+
|  Avg_Order_Value|       Total_Revenue|Total_Orders|
+-----------------+--------------------+------------+
|611.1611636442756|4.2127339009999916E7|       68930|
+-----------------+--------------------+------------+


=== AVERAGE ORDER VALUE BY CATEGORY ===
+-------------+------------------+------+
|     Category|   Avg_Order_Value|Orders|
+-------------+------------------+------+
|          Set| 780.5649311573816| 26655|
|        Saree| 750.2346341463415|    82|
|Western Dress| 718.4618957399102|  8920|
| Ethnic Dress|        689.888125|   608|
|          Top| 500.5786549509972|  5918|
|       Blouse| 476.2117647058825|   544|
|        kurta|428.69816947059724| 25916|
|       Bottom| 348.9475524475524|   286|
|      Dupatta|             305.0|     1|
+-------------+------------------+------+



In [68]:
category_performance = df_clean.groupBy('Category') \
    .agg(_sum('Amount').alias('Revenue'),
         count('Order ID').alias('Orders'),
         _sum('Qty').alias('Total_Qty_Sold'),
         avg('Amount').alias('Avg_Price')) \
    .orderBy(col('Revenue').desc())


category_performance.toPandas().to_csv('output/category_performance.csv', index=False) #saving to csv (imitates parquet)
print("=== CATEGORY PERFORMANCE ===")
category_performance.show()



=== CATEGORY PERFORMANCE ===
+-------------+--------------------+------+--------------+------------------+
|     Category|             Revenue|Orders|Total_Qty_Sold|         Avg_Price|
+-------------+--------------------+------+--------------+------------------+
|          Set|2.0805958240000006E7| 26655|         23941| 780.5649311573816|
|        kurta|1.1110141759999998E7| 25916|         23236|428.69816947059724|
|Western Dress|   6408680.109999999|  8920|          8002| 718.4618957399102|
|          Top|  2962424.4800000014|  5918|          5509| 500.5786549509972|
| Ethnic Dress|           419451.98|   608|           553|        689.888125|
|       Blouse|  259059.20000000007|   544|           508| 476.2117647058825|
|       Bottom|             99799.0|   286|           255| 348.9475524475524|
|        Saree|  61519.240000000005|    82|            73| 750.2346341463415|
|      Dupatta|               305.0|     1|             1|             305.0|
+-------------+--------------------