Import Libs

In [None]:
from pyspark.sql import functions as F


BASIC CONFIG

In [None]:
# Create & use a fresh DB so we avoid old schema conflicts
spark.sql("CREATE DATABASE IF NOT EXISTS retail_raw")
spark.sql("USE retail_raw")

# Sizes / knobs you can tweak
num_products   = 500      # number of products
num_customers  = 10000    # number of customers
num_stores     = 25       # number of stores
num_staff      = 300      # number of staff
num_pos_sales  = 300_000  # POS rows
num_online     = 120_000  # Online rows

start_date = "2022-01-01"
end_date   = "2025-10-31"

1. PRODUCT MASTER (pim_product_raw)

In [None]:
products_df = (
    spark.range(1, num_products + 1)
    .withColumn("product_id", F.col("id").cast("int"))
    .withColumn(
        "product_sku",
        F.concat(F.lit("SKU"), F.lpad(F.col("id").cast("string"), 6, "0"))
    )
    .withColumn("product_name", F.concat(F.lit("Product "), F.col("id")))
    .withColumn(
        "category",
        F.expr("""
            element_at(
                array('AUTOMOTIVE','FASHION','HOME_FURNISHING',
                      'ELECTRONICS','GROCERY','HEALTHCARE'),
                cast(rand()*6 + 1 as int)
            )
        """)
    )
    .withColumn(
        "brand",
        F.expr("""
            element_at(
                array('BRAND_A','BRAND_B','BRAND_C',
                      'BRAND_D','BRAND_E','BRAND_F'),
                cast(rand()*6 + 1 as int)
            )
        """)
    )
    .withColumn(
        "regular_price",
        (F.rand() * (500 - 5) + 5).cast("decimal(10,2)")
    )
    .withColumn(
        "cost_price",
        (F.col("regular_price") * (F.lit(0.6) + F.rand() * 0.2)).cast("decimal(10,2)")
    )
    .withColumn(
        "uom",
        F.expr("element_at(array('EA','BOX','PACK'), cast(rand()*3 + 1 as int))")
    )
    .withColumn(
        "is_active",
        (F.rand() > 0.05)
    )
    .withColumn("created_at_utc", F.current_timestamp())
    .drop("id")
)

products_df.write.mode("overwrite").format("delta").saveAsTable("pim_product_raw")

2. CUSTOMER MASTER (customer_crm_raw)

In [None]:
customers_df = (
    spark.range(1, num_customers + 1)
    .withColumn("customer_id", F.col("id").cast("int"))
    .withColumn(
        "crm_customer_code",
        F.concat(F.lit("CU"), F.lpad(F.col("id").cast("string"), 6, "0"))
    )
    .withColumn(
        "first_name",
        F.expr("""
            element_at(
                array('Alex','Mary','John','Fatma','Ahmed','Grace','Peter','Lin'),
                cast(rand()*8 + 1 as int)
            )
        """)
    )
    .withColumn(
        "last_name",
        F.expr("""
            element_at(
                array('Mogengo','Smith','Omar','Chen','Khan','Mwangi','Patel','Ali'),
                cast(rand()*8 + 1 as int)
            )
        """)
    )
    .withColumn(
        "gender",
        F.expr("element_at(array('M','F'), cast(rand()*2 + 1 as int))")
    )
    .withColumn(
        "email",
        F.concat(
            F.lower(F.col("first_name")),
            F.lit("."),
            F.lower(F.col("last_name")),
            F.lit("+"),
            F.col("customer_id"),
            F.lit("@example.com")
        )
    )
    .withColumn(
        "phone_number",
        F.concat(F.lit("+9715"), F.lpad((F.rand() * 10_000_000).cast("int").cast("string"), 7, "0"))
    )
    .withColumn(
        "city",
        F.expr("""
            element_at(
                array('Dubai','Abu Dhabi','Sharjah','Riyadh','Doha','Nairobi'),
                cast(rand()*6 + 1 as int)
            )
        """)
    )
    .withColumn(
        "country",
        F.expr("""
            element_at(
                array('UAE','UAE','UAE','Saudi Arabia','Qatar','Kenya'),
                cast(rand()*6 + 1 as int)
            )
        """)
    )
    .withColumn(
        "customer_segment",
        F.expr("""
            element_at(
                array('Value','Premium','VIP'),
                cast(rand()*3 + 1 as int)
            )
        """)
    )
    .withColumn("is_loyalty_member", (F.rand() > 0.6))
    .withColumn("is_active", (F.rand() > 0.1))
    .withColumn("created_at_utc", F.current_timestamp())
    .drop("id")
)

customers_df.write.mode("overwrite").format("delta").saveAsTable("customer_crm_raw")

3. STORE MASTER (store_master_raw)

In [None]:
stores_df = (
    spark.range(1, num_stores + 1)
    .withColumn("store_id", F.col("id").cast("int"))
    .withColumn(
        "store_code",
        F.concat(F.lit("ST"), F.lpad(F.col("id").cast("string"), 4, "0"))
    )
    .withColumn(
        "store_name",
        F.concat(F.lit("Store "), F.col("id"))
    )
    .withColumn(
        "store_type",
        F.expr("""
            element_at(
                array('MALL','HIGH_STREET','HYPERMARKET','SUPERMARKET'),
                cast(rand()*4 + 1 as int)
            )
        """)
    )
    .withColumn(
        "city",
        F.expr("""
            element_at(
                array('Dubai','Abu Dhabi','Sharjah','Riyadh','Doha'),
                cast(rand()*5 + 1 as int)
            )
        """)
    )
    .withColumn(
        "country",
        F.expr("""
            element_at(
                array('UAE','UAE','UAE','Saudi Arabia','Qatar'),
                cast(rand()*5 + 1 as int)
            )
        """)
    )
    .withColumn(
        "opening_date",
        F.expr(f"date_add(to_date('{start_date}'), cast(rand()*365 as int))")
    )
    .withColumn("is_active", (F.rand() > 0.05))
    .withColumn("created_at_utc", F.current_timestamp())
    .drop("id")
)

stores_df.write.mode("overwrite").format("delta").saveAsTable("store_master_raw")