In [1]:
import pandas as pd
import numpy as np

np.random.seed(42)

n = 1_000_000  # 1 million transactions

order_id = np.arange(1, n+1)
customer_id = np.random.randint(1000, 50000, n)
product_id = np.random.randint(1, 500, n)

categories = ["Electronics", "Clothing", "Groceries", "Furniture"]
category = np.random.choice(categories, n)

# Assign price ranges by category
def get_price(cat):
    if cat == "Electronics": return np.random.uniform(200, 2000)
    if cat == "Clothing": return np.random.uniform(20, 200)
    if cat == "Groceries": return np.random.uniform(5, 100)
    if cat == "Furniture": return np.random.uniform(100, 1500)

price = [get_price(c) for c in category]
quantity = np.random.randint(1, 5, n)

# Generate random dates in 2023
dates = pd.date_range("2023-01-01", "2023-12-31", freq="min")
order_date = np.random.choice(dates, n)

regions = ["North", "South", "East", "West"]
region = np.random.choice(regions, n)

df = pd.DataFrame({
    "order_id": order_id,
    "customer_id": customer_id,
    "product_id": product_id,
    "category": category,
    "quantity": quantity,
    "price": price,
    "order_date": order_date,
    "region": region
})

df.to_csv("retail_sales.csv", index=False)
print("Dataset saved: retail_sales.csv (1M rows)")


Dataset saved: retail_sales.csv (1M rows)


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("RetailSalesAnalysis").getOrCreate()

df = spark.read.csv("retail_sales.csv", header=True, inferSchema=True)
df.printSchema()


root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- region: string (nullable = true)



In [3]:
df = df.withColumn("revenue", F.col("quantity") * F.col("price"))
df = df.withColumn("order_month", F.month("order_date"))
df = df.withColumn("order_weekday", F.date_format("order_date", "E"))


In [4]:
df.groupBy("product_id").agg(F.sum("revenue").alias("total_rev")) \
  .orderBy(F.desc("total_rev")).show(10)


+----------+------------------+
|product_id|         total_rev|
+----------+------------------+
|       205| 2833954.582678805|
|       274|2826634.9653211506|
|        95|2824330.4189136056|
|        76|2799791.0453834813|
|       464|2799489.7498408942|
|        81| 2797065.920191269|
|        19| 2792988.497202345|
|        45| 2778836.708012622|
|       352|2776001.4635309894|
|       269| 2773642.455493124|
+----------+------------------+
only showing top 10 rows



In [5]:
df.groupBy("order_month").agg(F.sum("revenue").alias("monthly_rev")) \
  .orderBy("order_month").show()


+-----------+--------------------+
|order_month|         monthly_rev|
+-----------+--------------------+
|          1|1.1029762673571712E8|
|          2|   9.9842835170913E7|
|          3| 1.102683967617676E8|
|          4|1.0574521968316887E8|
|          5|1.0891324676277018E8|
|          6|1.0621190297750077E8|
|          7|1.1048403695400062E8|
|          8|1.0935017197532964E8|
|          9|1.0655383499653395E8|
|         10| 1.096744400807445E8|
|         11| 1.060096657663945E8|
|         12|1.0544170212355822E8|
+-----------+--------------------+



In [6]:
df.groupBy("region").agg(F.sum("revenue").alias("regional_rev")) \
  .orderBy(F.desc("regional_rev")).show()


+------+--------------------+
|region|        regional_rev|
+------+--------------------+
|  West| 3.240451326587347E8|
|  East|3.2232213555656636E8|
| South| 3.216136370937505E8|
| North| 3.208121746793491E8|
+------+--------------------+

