In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0xffaa5e230e60>

In [0]:
df = spark.read \
    .option("header", True) \
    .csv("/Volumes/workspace/default/ecommerce_raw/orders_part_*.csv")

df.show(5)
df.printSchema()
df.count()


+--------+-----------+----------+-------------------+--------+----------+--------+------------+------------+------------+
|order_id|customer_id|product_id|    order_timestamp|quantity|unit_price|discount|payment_type|order_status|        city|
+--------+-----------+----------+-------------------+--------+----------+--------+------------+------------+------------+
|    O2_0|    C411959|     P3370|2024-12-13 05:33:57|       3|    438.48|    0.27|      WALLET|      PLACED|Stewartburgh|
|    O2_1|    C249437|     P1511|2025-08-21 04:48:05|       5|   1092.23|    0.31|      WALLET|   CANCELLED|  Smithmouth|
|    O2_2|     C23425|     P6277|2024-01-25 04:53:05|       3|   3575.17|    0.26|      WALLET|        PAID|Jonathanfurt|
|    O2_3|    C435067|    P14319|2024-09-30 13:49:44|       5|   4105.65|    0.31|         COD|   CANCELLED|Barrychester|
|    O2_4|    C105996|    P10067|2024-07-23 13:28:58|       1|   4352.35|    0.17|      WALLET|        PAID|   Amberside|
+--------+-----------+--

1000000

In [0]:
from pyspark.sql.functions import col, to_timestamp

df_clean = (
    df
    .withColumn("order_timestamp", to_timestamp("order_timestamp"))
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("unit_price", col("unit_price").cast("double"))
    .withColumn("discount", col("discount").cast("double"))
)


In [0]:
df_clean.printSchema()


root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- city: string (nullable = true)



In [0]:
df = spark.read \
    .option("header", True) \
    .csv("/Volumes/workspace/default/ecommerce_raw/orders_part_*.csv")


In [0]:
from pyspark.sql.functions import col, to_timestamp

df = df.withColumn("order_timestamp", to_timestamp("order_timestamp")) \
       .withColumn("quantity", col("quantity").cast("int")) \
       .withColumn("unit_price", col("unit_price").cast("double")) \
       .withColumn("discount", col("discount").cast("double"))


In [0]:
df = df.repartition(8)


In [0]:
df = df.repartition(8)



In [0]:
from pyspark.sql.functions import col, round

df = df.withColumn(
    "revenue",
    round(col("quantity") * col("unit_price") * (1 - col("discount")), 2)
)


In [0]:
%sql


SHOW VOLUMES IN workspace.default;



database,volume_name
default,ecommerce_processed
default,ecommerce_raw


In [0]:
df.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/Volumes/workspace/default/ecommerce_processed/orders_delta")


In [0]:
spark.read.format("delta") \
    .load("/Volumes/workspace/default/ecommerce_processed/orders_delta") \
    .count()


1000000

In [0]:
from pyspark.sql.functions import countDistinct

dim_customer = (
    df.select("customer_id", "city")
      .dropDuplicates(["customer_id"])
)

dim_customer.show(5)


+-----------+----------------+
|customer_id|            city|
+-----------+----------------+
|    C101180|   Parkerborough|
|    C101668|     Port Robert|
|    C102471|West Markchester|
|    C102640|   Jonathanhaven|
|     C10348|  Port Davidfurt|
+-----------+----------------+
only showing top 5 rows


In [0]:
dim_product = (
    df.select("product_id", "unit_price")
      .dropDuplicates(["product_id"])
)

dim_product.show(5)


+----------+----------+
|product_id|unit_price|
+----------+----------+
|    P10027|    597.68|
|    P10058|    153.69|
|    P10061|   1056.59|
|     P1007|   3651.15|
|    P10116|   2418.38|
+----------+----------+
only showing top 5 rows


In [0]:
dim_customer.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/Volumes/workspace/default/ecommerce_processed/dim_customer")


In [0]:
dim_product.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/Volumes/workspace/default/ecommerce_processed/dim_product")


In [0]:
dim_customer = spark.read.format("delta").load(
    "/Volumes/workspace/default/ecommerce_processed/dim_customer"
)

dim_product = spark.read.format("delta").load(
    "/Volumes/workspace/default/ecommerce_processed/dim_product"
)


In [0]:
from pyspark.sql.functions import broadcast

fact_enriched = (
    df.join(broadcast(dim_customer), on="customer_id", how="left")
      .join(broadcast(dim_product), on="product_id", how="left")
)


In [0]:
from pyspark.sql.functions import round, col, broadcast

# Recreate dimensions (safe)
dim_customer = (
    df.select("customer_id", "city")
      .dropDuplicates(["customer_id"])
)

dim_product = (
    df.select("product_id", "unit_price")
      .dropDuplicates(["product_id"])
)

# Join fact + dimensions
fact_enriched = (
    df.join(broadcast(dim_customer), "customer_id", "left")
      .join(broadcast(dim_product), "product_id", "left")
)

# Create gold fact table
fact_gold = fact_enriched.withColumn(
    "revenue",
    round(col("quantity") * col("unit_price") * (1 - col("discount")), 2)
)


In [0]:
from pyspark.sql.functions import col

dim_product = (
    df.select("product_id", col("unit_price").alias("product_price"))
      .dropDuplicates(["product_id"])
)



In [0]:
from pyspark.sql.functions import broadcast, round

fact_enriched = (
    df.join(broadcast(dim_customer), "customer_id", "left")
      .join(broadcast(dim_product), "product_id", "left")
)

fact_gold = fact_enriched.withColumn(
    "revenue",
    round(
        col("quantity") * col("unit_price") * (1 - col("discount")),
        2
    )
)


In [0]:
dim_customer = (
    df.select("customer_id")
      .dropDuplicates(["customer_id"])
)



In [0]:
from pyspark.sql.functions import col

dim_product = (
    df.select("product_id", col("unit_price").alias("product_price"))
      .dropDuplicates(["product_id"])
)


In [0]:
from pyspark.sql.functions import broadcast, round, col

fact_enriched = (
    df.join(broadcast(dim_customer), "customer_id", "left")
      .join(broadcast(dim_product), "product_id", "left")
)

fact_gold = fact_enriched.withColumn(
    "revenue",
    round(col("quantity") * col("unit_price") * (1 - col("discount")), 2)
)


In [0]:
fact_gold.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/Volumes/workspace/default/ecommerce_processed/fact_orders_gold")



In [0]:
spark.read.format("delta") \
    .load("/Volumes/workspace/default/ecommerce_processed/fact_orders_gold") \
    .printSchema()


root
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- city: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- product_price: double (nullable = true)



In [0]:
spark.read.format("delta") \
    .load("/Volumes/workspace/default/ecommerce_processed/fact_orders_gold") \
    .printSchema()



root
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- city: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- product_price: double (nullable = true)



In [0]:
from pyspark.sql.functions import col, sum as sum_

# Remove duplicate columns by selecting only the required ones
fact_gold_clean = fact_gold.select("city", "revenue")

city_revenue = (
    fact_gold_clean
    .groupBy("city")
    .agg(
        sum_("revenue").alias("total_revenue")
    )
    .orderBy(col("total_revenue").desc())
    .limit(10)
)

display(city_revenue)

city,total_revenue
South Michael,5310540.81
North Michael,5299237.489999999
Port Michael,4900986.97
East Michael,4881552.56
Lake Michael,4811088.46
New Michael,4715508.929999999
West Michael,4563973.9
Smithmouth,3861938.789999999
West David,3694895.86
New James,3655193.06


In [0]:
fact_gold = spark.read.format("delta").load(
    "/Volumes/workspace/default/ecommerce_processed/fact_orders_gold"
)




In [0]:
from pyspark.sql.functions import col, sum, round

city_chart = (
    fact_gold
    .groupBy("city")
    .agg(round(sum("revenue"), 2).alias("total_revenue"))
    .orderBy(col("total_revenue").desc())
    .limit(10)
)

display(city_chart)


city,total_revenue
South Michael,5310540.81
North Michael,5299237.49
Port Michael,4900986.97
East Michael,4881552.56
Lake Michael,4811088.46
New Michael,4715508.93
West Michael,4563973.9
Smithmouth,3861938.79
West David,3694895.86
New James,3655193.06


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import col, sum, count, round

cancel_analytics = (
    fact_gold
    .groupBy("order_status")
    .agg(
        count("*").alias("orders"),
        round(sum("revenue"), 2).alias("revenue")
    )
)

cancel_analytics.show()


+------------+------+---------------+
|order_status|orders|        revenue|
+------------+------+---------------+
|   DELIVERED|199489|1.22012689956E9|
|   CANCELLED|200177| 1.2262575888E9|
|      PLACED|199897|1.22312289859E9|
|     SHIPPED|199402| 1.2223215362E9|
|        PAID|201035|1.22916037433E9|
+------------+------+---------------+



In [0]:
cancel_city = (
    fact_gold
    .filter(col("order_status") == "CANCELLED")
    .groupBy("city")
    .agg(round(sum("revenue"), 2).alias("lost_revenue"))
    .orderBy(col("lost_revenue").desc())
)

cancel_city.show(10)


+-------------+------------+
|         city|lost_revenue|
+-------------+------------+
|North Michael|  1050297.43|
|  New Michael|  1029965.43|
| Lake Michael|  1012106.55|
|South Michael|   960363.97|
| West Michael|   939200.53|
| East Michael|   937306.41|
|    Port John|   816694.91|
|   Port James|   812020.14|
| North Robert|   799782.95|
| Port Michael|    799092.6|
+-------------+------------+
only showing top 10 rows


In [0]:
from pyspark.sql.functions import when

discount_analysis = (
    fact_gold
    .withColumn(
        "discount_bucket",
        when(col("discount") < 0.1, "Low")
        .when(col("discount") < 0.25, "Medium")
        .otherwise("High")
    )
    .groupBy("discount_bucket")
    .agg(
        count("*").alias("orders"),
        round(sum("revenue"), 2).alias("total_revenue"),
        round(sum("revenue") / count("*"), 2).alias("avg_revenue_per_order")
    )
)

discount_analysis.show()


+---------------+------+---------------+---------------------+
|discount_bucket|orders|  total_revenue|avg_revenue_per_order|
+---------------+------+---------------+---------------------+
|         Medium|376080|2.38859833199E9|               6351.3|
|           High|387276|2.00807022622E9|              5185.11|
|            Low|236644|1.72432073927E9|              7286.56|
+---------------+------+---------------+---------------------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

customer_rev = (
    fact_gold
    .groupBy("customer_id")
    .agg(round(sum("revenue"), 2).alias("customer_revenue"))
)

window = Window.orderBy(col("customer_revenue").desc())

top_customers = (
    customer_rev
    .withColumn("rank", dense_rank().over(window))
)

top_customers.show(10)




+-----------+----------------+----+
|customer_id|customer_revenue|rank|
+-----------+----------------+----+
|     C34342|        127433.5|   1|
|    C125741|        99627.63|   2|
|    C313684|        98641.05|   3|
|    C301023|        94445.24|   4|
|    C277953|         92758.2|   5|
|    C367468|        90385.83|   6|
|      C4229|        88419.96|   7|
|    C442561|        87724.83|   8|
|    C154659|        87471.38|   9|
|    C387853|         86554.1|  10|
+-----------+----------------+----+
only showing top 10 rows




In [0]:
city_efficiency = (
    fact_gold
    .groupBy("city")
    .agg(
        count("*").alias("orders"),
        round(sum("revenue"), 2).alias("revenue"),
        round(sum("revenue") / count("*"), 2).alias("revenue_per_order")
    )
    .orderBy(col("revenue_per_order").desc())
)

city_efficiency.show(10)


+------------------+------+--------+-----------------+
|              city|orders| revenue|revenue_per_order|
+------------------+------+--------+-----------------+
|    South Rayhaven|     1|24757.05|         24757.05|
|         Malikside|     1| 24700.1|          24700.1|
|   East Arthurport|     1|24425.58|         24425.58|
|    South Rossland|     1|24374.66|         24374.66|
|     East Alectown|     1|24263.23|         24263.23|
|South Ricardoville|     1|24252.28|         24252.28|
|    East Leahhaven|     1|24126.25|         24126.25|
|         Dorisview|     1| 24071.1|          24071.1|
|   West Taylorstad|     1|23999.63|         23999.63|
|      West Donfort|     1|23943.94|         23943.94|
+------------------+------+--------+-----------------+
only showing top 10 rows


In [0]:
order_funnel = (
    fact_gold
    .groupBy("order_status")
    .count()
    .orderBy(col("count").desc())
)

order_funnel.show()


+------------+------+
|order_status| count|
+------------+------+
|        PAID|201035|
|   CANCELLED|200177|
|      PLACED|199897|
|   DELIVERED|199489|
|     SHIPPED|199402|
+------------+------+



In [0]:
payment_analysis = (
    fact_gold
    .groupBy("payment_type")
    .agg(
        count("*").alias("orders"),
        round(sum("revenue"), 2).alias("revenue"),
        round(sum("revenue") / count("*"), 2).alias("avg_order_value")
    )
    .orderBy(col("revenue").desc())
)

payment_analysis.show()


+------------+------+---------------+---------------+
|payment_type|orders|        revenue|avg_order_value|
+------------+------+---------------+---------------+
|      WALLET|250279| 1.5320176947E9|        6121.24|
|        CARD|249585|1.53104945233E9|        6134.38|
|         COD|250159|1.52899353891E9|        6112.09|
|         UPI|249977|1.52892861154E9|        6116.28|
+------------+------+---------------+---------------+



In [0]:
from pyspark.sql.functions import when

discount_chart = (
    fact_gold
    .withColumn(
        "discount_bucket",
        when(col("discount") < 0.1, "Low Discount")
        .when(col("discount") < 0.25, "Medium Discount")
        .otherwise("High Discount")
    )
    .groupBy("discount_bucket")
    .agg(
        count("*").alias("orders"),
        round(sum("revenue"), 2).alias("total_revenue"),
        round(sum("revenue") / count("*"), 2).alias("avg_revenue_per_order")
    )
)

display(discount_chart)


discount_bucket,orders,total_revenue,avg_revenue_per_order
Low Discount,236644,1724320739.27,7286.56
High Discount,387276,2008070226.22,5185.11
Medium Discount,376080,2388598331.99,6351.3


Databricks visualization. Run in Databricks to view.

In [0]:
city_chart = (
    fact_gold
    .groupBy("city")
    .agg(round(sum("revenue"), 2).alias("total_revenue"))
    .orderBy(col("total_revenue").desc())
    .limit(10)
)


display(city_chart)


city,total_revenue
South Michael,5310540.81
North Michael,5299237.49
Port Michael,4900986.97
East Michael,4881552.56
Lake Michael,4811088.46
New Michael,4715508.93
West Michael,4563973.9
Smithmouth,3861938.79
West David,3694895.86
New James,3655193.06


Databricks visualization. Run in Databricks to view.

In [0]:
payment_chart = (
    fact_gold
    .groupBy("payment_type")
    .agg(
        count("*").alias("orders"),
        round(sum("revenue"), 2).alias("total_revenue"),
        round(sum("revenue") / count("*"), 2).alias("avg_order_value")
    )
)

display(payment_chart)


payment_type,orders,total_revenue,avg_order_value
COD,250159,1528993538.91,6112.09
WALLET,250279,1532017694.7,6121.24
UPI,249977,1528928611.54,6116.28
CARD,249585,1531049452.33,6134.38


Databricks visualization. Run in Databricks to view.