## Importing the libraries

In [3]:

!pip install pyspark Faker
from pyspark.sql import SparkSession
from faker import Faker
import random
from datetime import datetime

spark = SparkSession.builder.appName("ecommerce_fake").getOrCreate()
fake = Faker()

random.seed(42)
Faker.seed(42)



In [4]:
def generate_customers_df(n_customers=1000):
    countries = ["US", "UK", "BR", "CA", "DE", "FR", "IN", "AU"]
    genders = ["M", "F", "O"]

    data = []
    for customer_id in range(1, n_customers + 1):
        first_name = fake.first_name()
        last_name = fake.last_name()
        email = fake.email()
        phone = fake.phone_number()
        country = random.choice(countries)
        city = fake.city()
        gender = random.choice(genders)
        birth_date = fake.date_of_birth(minimum_age=18, maximum_age=80)
        signup_date = fake.date_between(start_date="-2y", end_date="today")

        data.append(
            (
                customer_id,
                first_name,
                last_name,
                email,
                phone,
                country,
                city,
                gender,
                birth_date,
                signup_date,
            )
        )

    columns = [
        "customer_id",   # PK
        "first_name",
        "last_name",
        "email",
        "phone",
        "country",
        "city",
        "gender",
        "birth_date",
        "signup_date",
    ]

    df_customers = spark.createDataFrame(data, schema=columns)
    return df_customers

# create customers table
df_customers = generate_customers_df(1000)
df_customers.show(5)


+-----------+----------+---------+--------------------+--------------------+-------+---------------+------+----------+-----------+
|customer_id|first_name|last_name|               email|               phone|country|           city|gender|birth_date|signup_date|
+-----------+----------+---------+--------------------+--------------------+-------+---------------+------+----------+-----------+
|          1|  Danielle|  Johnson|  john21@example.net|001-581-896-0013x...|     UK|  South Bridget|     M|1995-12-12| 2023-12-19|
|          2|     Linda|    Wolfe|joshua35@example.org|        361-855-9407|     DE|  New Kellystad|     M|1981-04-29| 2025-05-13|
|          3|   Brandon|    Perez|tracie31@example.com|    575-425-5341x928|     CA|       Grayside|     M|1958-10-16| 2024-08-08|
|          4|     Kayla|    Brown|trujillorichard@e...|        495.537.6724|     UK|   Juliechester|     O|2001-07-07| 2024-10-01|
|          5|   Jeffrey|   Bright|  ddavis@example.org|       (626)291-6697|     UK

In [5]:
def generate_products_df(n_products=300):
    categories = {
        "Electronics": ["Smartphone", "Laptop", "Headphones", "Tablet", "Smartwatch"],
        "Clothing": ["T-Shirt", "Jeans", "Jacket", "Sneakers", "Hat"],
        "Home": ["Chair", "Table", "Lamp", "Sofa", "Curtains"],
        "Beauty": ["Shampoo", "Perfume", "Cream", "Makeup Kit", "Lotion"],
        "Sports": ["Football", "Tennis Racket", "Yoga Mat", "Dumbbells", "Bicycle"],
    }

    data = []
    product_id = 1
    for _ in range(n_products):
        category = random.choice(list(categories.keys()))
        name = random.choice(categories[category])
        brand = fake.company()
        base_cost = round(random.uniform(5, 300), 2)
        price = round(base_cost * random.uniform(1.2, 3.0), 2)
        is_active = random.choices([True, False], weights=[0.9, 0.1], k=1)[0]
        created_at = fake.date_between(start_date="-2y", end_date="today")

        data.append(
            (
                product_id,
                name,
                brand,
                category,
                base_cost,
                price,
                is_active,
                created_at,
            )
        )
        product_id += 1

    columns = [
        "product_id",   # PK
        "product_name",
        "brand",
        "category",
        "cost",
        "unit_price",
        "is_active",
        "created_at",
    ]

    df_products = spark.createDataFrame(data, schema=columns)
    return df_products

# create products table
df_products = generate_products_df(300)
df_products.show(5)


+----------+------------+--------------------+--------+------+----------+---------+----------+
|product_id|product_name|               brand|category|  cost|unit_price|is_active|created_at|
+----------+------------+--------------------+--------+------+----------+---------+----------+
|         1|     Perfume|Sullivan, Ballard...|  Beauty| 68.47|    125.66|     true|2024-08-31|
|         2|      Lotion|           Ware-Pope|  Beauty|265.29|    630.77|     true|2025-11-05|
|         3|    Football|Jones, Berry and ...|  Sports|254.92|    503.64|     true|2025-02-21|
|         4|        Sofa|Willis, Garcia an...|    Home|112.95|    216.98|     true|2025-12-01|
|         5|      Jacket|Carrillo, Shelton...|Clothing| 168.2|     224.1|     true|2025-08-19|
+----------+------------+--------------------+--------+------+----------+---------+----------+
only showing top 5 rows


In [6]:
def generate_orders_df(n_orders, df_customers):
    # collect valid customer_ids (FK)
    customer_ids = [r.customer_id for r in df_customers.select("customer_id").collect()]

    channels = ["web", "mobile_app", "marketplace", "physical_store"]
    statuses = ["completed", "pending", "canceled", "refunded"]
    payment_methods = ["credit_card", "debit_card", "pix", "paypal", "cash"]

    data = []
    for order_id in range(1, n_orders + 1):
        customer_id = random.choice(customer_ids)  # FK -> customers
        order_date = fake.date_time_between(start_date="-1y", end_date="now")
        channel = random.choice(channels)
        status = random.choices(
            statuses, weights=[0.7, 0.15, 0.1, 0.05], k=1
        )[0]
        payment_method = random.choices(
            payment_methods, weights=[0.6, 0.15, 0.15, 0.05, 0.05], k=1
        )[0]
        shipping_cost = round(random.uniform(0, 40), 2)
        items_amount = round(random.uniform(20, 1000), 2)
        order_total = round(items_amount + shipping_cost, 2)

        data.append(
            (
                order_id,
                customer_id,
                order_date,
                channel,
                status,
                payment_method,
                shipping_cost,
                order_total,
            )
        )

    columns = [
        "order_id",        # PK
        "customer_id",     # FK -> dim_customers
        "order_date",
        "channel",
        "status",
        "payment_method",
        "shipping_cost",
        "order_total",
    ]

    df_orders = spark.createDataFrame(data, schema=columns)
    return df_orders

# create orders table
df_orders = generate_orders_df(5000, df_customers)
df_orders.show(5)


+--------+-----------+--------------------+--------------+---------+--------------+-------------+-----------+
|order_id|customer_id|          order_date|       channel|   status|payment_method|shipping_cost|order_total|
+--------+-----------+--------------------+--------------+---------+--------------+-------------+-----------+
|       1|        299|2025-06-03 17:11:...|           web|completed|   credit_card|         31.4|     512.93|
|       2|        917|2025-07-01 13:39:...|   marketplace|completed|   credit_card|          7.4|     895.18|
|       3|        379|2025-08-29 17:33:...|    mobile_app|completed|   credit_card|         18.6|     260.32|
|       4|        276|2025-03-26 02:11:...|           web|completed|    debit_card|         3.87|     678.65|
|       5|        828|2025-06-25 07:50:...|physical_store| refunded|   credit_card|        22.14|     866.12|
+--------+-----------+--------------------+--------------+---------+--------------+-------------+-----------+
only showi

In [7]:
def generate_order_items_df(df_orders, df_products, max_items_per_order=5):
    # collect valid FKs
    order_ids = [r.order_id for r in df_orders.select("order_id").collect()]
    products = df_products.select("product_id", "unit_price").collect()
    product_ids = [p.product_id for p in products]
    price_lookup = {p.product_id: float(p.unit_price) for p in products}

    data = []
    order_item_id = 1

    for order_id in order_ids:
        n_items = random.randint(1, max_items_per_order)
        # avoid duplicate products in same order
        chosen_products = random.sample(product_ids, n_items)

        for product_id in chosen_products:
            quantity = random.randint(1, 5)
            unit_price = round(price_lookup[product_id] *
                               random.uniform(0.8, 1.2), 2)  # promo variation
            line_total = round(unit_price * quantity, 2)

            data.append(
                (
                    order_item_id,
                    order_id,      # FK -> orders
                    product_id,    # FK -> products
                    quantity,
                    unit_price,
                    line_total,
                )
            )
            order_item_id += 1

    columns = [
        "order_item_id",  # PK
        "order_id",       # FK -> fact_orders
        "product_id",     # FK -> dim_products
        "quantity",
        "unit_price",
        "line_total",
    ]

    df_order_items = spark.createDataFrame(data, schema=columns)
    return df_order_items

# create order_items table
df_order_items = generate_order_items_df(df_orders, df_products)
df_order_items.show(5)


+-------------+--------+----------+--------+----------+----------+
|order_item_id|order_id|product_id|quantity|unit_price|line_total|
+-------------+--------+----------+--------+----------+----------+
|            1|       1|       106|       4|     72.99|    291.96|
|            2|       1|       104|       4|    511.08|   2044.32|
|            3|       1|       150|       4|    284.97|   1139.88|
|            4|       1|       203|       1|    461.42|    461.42|
|            5|       2|       249|       5|     47.89|    239.45|
+-------------+--------+----------+--------+----------+----------+
only showing top 5 rows


In [8]:
df_customers.write.csv("dCustomers", mode="overwrite")
df_orders.write.csv("fOrders", mode="overwrite")
df_products.write.csv("dProducts", mode="overwrite")
df_order_items.write.csv("fOrderItems", mode="overwrite")

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [10]:
df_customers.coalesce(1).write.option("header", "True").option("sep", ";").csv("/content/drive/MyDrive/PESSOAL/PROJETOS/Studying-Data-Engineering/fake-dataset/files/dCustomers", mode="overwrite")
df_orders.coalesce(1).write.option("header", "True").option("sep", ";").csv("/content/drive/MyDrive/PESSOAL/PROJETOS/Studying-Data-Engineering/fake-dataset/files/fOrdes", mode = "overwrite")
df_products.coalesce(1).write.option("header", "True").option("sep", ";").csv("/content/drive/MyDrive/PESSOAL/PROJETOS/Studying-Data-Engineering/fake-dataset/files/dProducts", mode = "overwrite")
df_order_items.coalesce(1).write.option("header", "True").option("sep", ";").csv("/content/drive/MyDrive/PESSOAL/PROJETOS/Studying-Data-Engineering/fake-dataset/files/dOrders_items", mode = "overwrite")