In [0]:
pip install dbldatagen faker

Python interpreter will be restarted.
Collecting dbldatagen
  Downloading dbldatagen-0.4.0.post1-py3-none-any.whl (122 kB)
Collecting faker
  Downloading faker-37.4.0-py3-none-any.whl (1.9 MB)
Collecting tzdata
  Downloading tzdata-2025.2-py2.py3-none-any.whl (347 kB)
Installing collected packages: tzdata, faker, dbldatagen
Successfully installed dbldatagen-0.4.0.post1 faker-37.4.0 tzdata-2025.2
Python interpreter will be restarted.


In [0]:
import dbldatagen as dg
from faker import Faker
from datetime import datetime, date
import hashlib
import random
from pyspark.sql.types import StringType, TimestampType, BooleanType
from pyspark.sql.functions import col, udf, when, expr, rand

# Initialize Faker
fake = Faker('en_IN')

# Configuration
num_customers = 1000
devices = ['Android', 'iOS', 'Desktop']
signup_sources = ['organic', 'paid', 'referral', 'affiliate']
countries = [
    "India", "Singapore", "Japan", "Indonesia", "Malaysia", "Thailand",
    "Philippines", "Vietnam", "Bangladesh", "Sri Lanka", "Nepal"
]
cities_map = {
    "India": ["Mumbai", "Delhi", "Bangalore", "Hyderabad", "Chennai", "Kolkata"],
    "Singapore": ["Singapore"],
    "Japan": ["Tokyo", "Osaka", "Kyoto", "Nagoya"],
    "Indonesia": ["Jakarta", "Bandung", "Surabaya"],
    "Malaysia": ["Kuala Lumpur", "Penang", "Johor Bahru"],
    "Thailand": ["Bangkok", "Chiang Mai", "Phuket"],
    "Philippines": ["Manila", "Cebu", "Davao"],
    "Vietnam": ["Hanoi", "Ho Chi Minh City", "Da Nang"],
    "Bangladesh": ["Dhaka", "Chittagong", "Khulna"],
    "Sri Lanka": ["Colombo", "Kandy", "Galle"],
    "Nepal": ["Kathmandu", "Pokhara", "Lalitpur"]
}
batch_id_val = str(fake.uuid4())
ingest_ts_val = datetime.utcnow().isoformat()
source_val = "faker"

# Generate name + gender
def generate_name_and_gender():
    gender = random.choice(['Male', 'Female'])
    name = fake.name_male() if gender == 'Male' else fake.name_female()
    return name, gender

name_gender_pairs = [generate_name_and_gender() for _ in range(1000)]
user_names = [ng[0] for ng in name_gender_pairs]
user_genders = [ng[1] for ng in name_gender_pairs]
emails = [fake.email() for _ in range(1000)]
signup_dates = [fake.date_between(start_date='-1y', end_date=date(2025, 2, 26)) for _ in range(1000)]

# Record hash function
def record_hash_udf(*cols):
    combined = "||".join([str(c) if c is not None else "NULL" for c in cols])
    return hashlib.md5(combined.encode("utf-8")).hexdigest()
record_hash = udf(record_hash_udf, StringType())

@udf(StringType())
def assign_city(country):
    return random.choice(cities_map.get(country, ["Unknown"]))

# Generate DataFrame
customers = (
    dg.DataGenerator(spark, name="customers", rows=num_customers, partitions=4)
    .withColumn("customer_id", "long", minValue=1000000, uniqueValues=num_customers)
    .withColumn("name", StringType(), values=user_names)
    .withColumn("email", StringType(), values=emails)
    .withColumn("gender", StringType(), values=user_genders)
    .withColumn("country", StringType(), values=countries)
    .withColumn("signup_date", TimestampType(), values=signup_dates)
    .withColumn("device", StringType(), values=devices)
    .withColumn("signup_source", StringType(), values=signup_sources)
    .withColumn("activation_date", TimestampType(), expr("signup_date + interval 1 day"))
    .withColumn("batch_id", StringType(), values=[batch_id_val])
    .withColumn("ingest_ts", StringType(), values=[ingest_ts_val])
    .withColumn("source", StringType(), values=[source_val])
)

df_customers = customers.build()

# Add city from country
df_customers = df_customers.withColumn("city", assign_city(col("country")))

# Assign lifecycle stage using a single bucket
df_customers = df_customers.withColumn("lifecycle_bucket", rand())
df_customers = df_customers.withColumn(
    "lifecycle_stage",
    when(col("lifecycle_bucket") < 0.05, "Visitor")
    .when(col("lifecycle_bucket") < 0.15, "Lead")
    .when(col("lifecycle_bucket") < 0.27, "Activated")
    .when(col("lifecycle_bucket") < 0.37, "Engaged")
    .when(col("lifecycle_bucket") < 0.72, "Paying")
    .when(col("lifecycle_bucket") < 0.90, "Retained")
    .otherwise("Lost")
)

# Set churn flag for "Lost" only
df_customers = df_customers.withColumn(
    "churn_flag",
    when(col("lifecycle_stage") == "Lost", True).otherwise(False)
)

# Add churn date if churned
df_customers = df_customers.withColumn("churn_offset", (col("customer_id") % 31 + 60).cast("int"))
df_customers = df_customers.withColumn(
    "churn_date",
    when(col("churn_flag") == True, expr("date_add(activation_date, churn_offset)"))
)

# Add record hash
df_customers = df_customers.withColumn(
    "record_hash",
    record_hash(
        "customer_id", "name", "email", "signup_date", "device", "signup_source",
        "gender", "activation_date", "churn_flag", "churn_date",
        "country", "city", "lifecycle_stage"
    )
)

# Final column order
df_customers = df_customers.drop("lifecycle_bucket")
df_customers = df_customers.select(
    "customer_id", "name", "email", "gender", "country", "city",
    "signup_date", "activation_date", "churn_flag", "churn_date",
    "device", "signup_source", "lifecycle_stage",
    "batch_id", "ingest_ts", "source", "record_hash"
)

# Create view and preview
df_customers.createOrReplaceTempView("customers")

In [0]:
display(df_customers.limit(50))

customer_id,name,email,gender,country,city,signup_date,activation_date,churn_flag,churn_date,device,signup_source,lifecycle_stage,batch_id,ingest_ts,source,record_hash
1000000,Mohammed Kalita,jarya@example.net,Male,India,Mumbai,2025-01-12T00:00:00.000+0000,2024-01-01T00:00:00.000+0000,False,,Android,organic,Visitor,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,7b6856c7f37dbd1641ec66d2d4d2c93e
1000001,Adya Mishra,ravaladvik@example.com,Female,Singapore,Singapore,2024-06-26T00:00:00.000+0000,2024-01-02T00:00:00.000+0000,False,,iOS,paid,Retained,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,c5638a19ab075a2e2f06e8d29fe0412a
1000002,Nitesh Khosla,lallajagat@example.org,Male,Japan,Tokyo,2025-01-15T00:00:00.000+0000,2024-01-03T00:00:00.000+0000,False,,Desktop,referral,Engaged,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,aa268998bf329d1c03738599615af72f
1000003,Krisha Sodhi,warda27@example.org,Female,Indonesia,Jakarta,2024-07-12T00:00:00.000+0000,2024-01-04T00:00:00.000+0000,True,2024-03-09,Android,affiliate,Lost,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,cbbfc1cd71adab759fbc596bce1e1c97
1000004,Anita Walla,zsathe@example.org,Female,Malaysia,Johor Bahru,2024-10-20T00:00:00.000+0000,2024-01-05T00:00:00.000+0000,False,,iOS,organic,Activated,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,a03ec14fccd5ff1c3116afbd3521adde
1000005,Ansh Lall,birgirish@example.net,Male,Thailand,Phuket,2024-11-03T00:00:00.000+0000,2024-01-06T00:00:00.000+0000,False,,Desktop,paid,Paying,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,d9a28f0a4d9fd2a232bbea60b4d51a96
1000006,Hritik Thaker,qrao@example.net,Male,Philippines,Manila,2025-01-08T00:00:00.000+0000,2024-01-07T00:00:00.000+0000,False,,Android,referral,Engaged,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,c1b4c6ecea9fbcc67b5dd1b0833514ae
1000007,Chanchal Mannan,daksheshpillai@example.com,Female,Vietnam,Da Nang,2024-06-27T00:00:00.000+0000,2024-01-08T00:00:00.000+0000,False,,iOS,affiliate,Visitor,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,9a5cc92f73a20069c57972d3581b4f90
1000008,Viraj Saran,hraval@example.com,Male,Bangladesh,Khulna,2024-12-10T00:00:00.000+0000,2024-01-09T00:00:00.000+0000,False,,Desktop,organic,Activated,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,ff661db6e2b8b47b954261737f4479ae
1000009,Nicholas Chacko,poojaram@example.com,Male,Sri Lanka,Kandy,2024-11-23T00:00:00.000+0000,2024-01-10T00:00:00.000+0000,False,,Android,paid,Paying,568940cd-f785-472d-b9bd-41f2c506eef6,2025-06-16T17:40:00.517639,faker,99136f8dd9a0114f90adf1933452813d


In [0]:
from datetime import timedelta
from pyspark.sql.types import StringType, FloatType, BooleanType, DateType
from pyspark.sql.functions import expr
import dbldatagen as dg
import random
from faker import Faker

fake = Faker()
spark.conf.set("spark.sql.shuffle.partitions", "4")

# Step 1: Filter relevant customers
customer_df = spark.sql("""
    SELECT customer_id, name, activation_date, churn_date, lifecycle_stage
    FROM customers
    WHERE lifecycle_stage IN ('Paying', 'Retained', 'Lost')
""").cache()

customer_ids = [row['customer_id'] for row in customer_df.collect()]
customer_names = {row['customer_id']: row['name'] for row in customer_df.collect()}
customer_activation = {row['customer_id']: row['activation_date'] for row in customer_df.collect()}
customer_churn = {row['customer_id']: row['churn_date'] for row in customer_df.collect()}

# Step 2: Sampling strategy
vip_customers = random.sample(customer_ids, int(0.1 * len(customer_ids)))
weighted_customer_ids = customer_ids + vip_customers * 4

num_transactions = 5000
customer_id_samples = random.choices(weighted_customer_ids, k=num_transactions)
customer_name_samples = [customer_names[cid] for cid in customer_id_samples]

# Step 3: Payment dates between activation and churn
payment_dates = []
for cid in customer_id_samples:
    start = customer_activation[cid]
    end = customer_churn[cid] if customer_churn[cid] else start + timedelta(days=90)

    # Ensure both are datetime
    if isinstance(start, date) and not isinstance(start, datetime):
        start = datetime.combine(start, datetime.min.time())
    if isinstance(end, date) and not isinstance(end, datetime):
        end = datetime.combine(end, datetime.min.time())

    days_range = max((end - start).days, 1)
    offset = random.randint(1, days_range)
    payment_dates.append(start + timedelta(days=offset))

# Step 4: Full product list with prices
product_names = [
    "Milk", "Cheese", "Yogurt", "Apples", "Bananas", "Lettuce", "Tomatoes", "Tomato sauce",
    "Canned beans", "Canned soup", "Spaghetti", "Rice", "Quinoa", "Oatmeal", "Potato chips",
    "Pretzels", "Nuts", "Soda", "Bottled water", "Coffee", "Tea", "Frozen pizza",
    "Frozen vegetables", "Ice cream", "Frozen entrees", "Sliced bread", "Bagels",
    "Hamburger and hot dog buns", "Pastries", "Ketchup", "Mustard", "Mayonnaise", "BBQ sauce",
    "Soy sauce", "Salad dressing", "Toilet paper", "Paper towels", "Laundry detergent",
    "Dish soap", "Trash bags"
]

product_categories = {
    "Milk": "Dairy", "Cheese": "Dairy", "Yogurt": "Dairy",
    "Apples": "Produce", "Bananas": "Produce", "Lettuce": "Produce", "Tomatoes": "Produce",
    "Tomato sauce": "Canned", "Canned beans": "Canned", "Canned soup": "Canned",
    "Spaghetti": "Grains", "Rice": "Grains", "Quinoa": "Grains", "Oatmeal": "Grains",
    "Potato chips": "Snacks", "Pretzels": "Snacks", "Nuts": "Snacks",
    "Soda": "Beverage", "Bottled water": "Beverage", "Coffee": "Beverage", "Tea": "Beverage",
    "Frozen pizza": "Frozen", "Frozen vegetables": "Frozen", "Ice cream": "Frozen", "Frozen entrees": "Frozen",
    "Sliced bread": "Bakery", "Bagels": "Bakery", "Hamburger and hot dog buns": "Bakery", "Pastries": "Bakery",
    "Ketchup": "Condiments", "Mustard": "Condiments", "Mayonnaise": "Condiments", "BBQ sauce": "Condiments",
    "Soy sauce": "Condiments", "Salad dressing": "Condiments",
    "Toilet paper": "Household", "Paper towels": "Household", "Laundry detergent": "Household",
    "Dish soap": "Household", "Trash bags": "Household"
}

product_prices = {
    "Milk": 45, "Cheese": 90, "Yogurt": 30, "Apples": 50, "Bananas": 40, "Lettuce": 35, "Tomatoes": 48,
    "Tomato sauce": 55, "Canned beans": 65, "Canned soup": 70, "Spaghetti": 72, "Rice": 60, "Quinoa": 80,
    "Oatmeal": 50, "Potato chips": 35, "Pretzels": 38, "Nuts": 120, "Soda": 45, "Bottled water": 20,
    "Coffee": 150, "Tea": 70, "Frozen pizza": 130, "Frozen vegetables": 60, "Ice cream": 90,
    "Frozen entrees": 140, "Sliced bread": 35, "Bagels": 40, "Hamburger and hot dog buns": 42,
    "Pastries": 55, "Ketchup": 40, "Mustard": 42, "Mayonnaise": 60, "BBQ sauce": 65, "Soy sauce": 50,
    "Salad dressing": 70, "Toilet paper": 80, "Paper towels": 75, "Laundry detergent": 120,
    "Dish soap": 60, "Trash bags": 85
}

product_samples = random.choices(product_names, k=num_transactions)
product_category_samples = [product_categories[p] for p in product_samples]
product_price_samples = [float(product_prices[p]) for p in product_samples]

# Other configurations
plan_types = ["Free", "Basic", "Premium"]
payment_methods = ["UPI", "Wallet", "Card", "NetBanking"]

# Step 5: Generate transaction table
df_txn = (
    dg.DataGenerator(spark, name="transactions", rows=num_transactions, partitions=4)
    .withColumn("transaction_id", "long", minValue=200000, uniqueValues=num_transactions * 2)
    .withColumn("customer_id", "long", values=customer_id_samples)
    .withColumn("customer_name", StringType(), values=customer_name_samples)
    .withColumn("product_name", StringType(), values=product_samples)
    .withColumn("product_category", StringType(), values=product_category_samples)
    .withColumn("payment_date", DateType(), values=payment_dates)
    .withColumn("amount", FloatType(), values=product_price_samples)
    .withColumn("refund_flag", BooleanType(), values=[True, False])
    .withColumn("plan_type", StringType(), values=plan_types + [None])
    .withColumn("payment_method", StringType(), values=payment_methods)
    .withColumn("batch_id", "long", minValue=5000, maxValue=6000)
    .withColumn("ingest_ts", "timestamp", expr="current_timestamp()")
    .withColumn("source", StringType(), values=["app", "web", "api"])
    .withColumn("record_hash", StringType(), expr="sha2(concat(transaction_id, customer_id), 256)")
)

df_transactions = df_txn.build()
df_transactions.createOrReplaceTempView("transactions")
display(df_transactions.limit(10))

transaction_id,customer_id,customer_name,product_name,product_category,payment_date,amount,refund_flag,plan_type,payment_method,batch_id,ingest_ts,source,record_hash
200000,1000615,Yagnesh Dora,Quinoa,Grains,2024-11-29,80.0,True,Free,UPI,5000,2025-06-16T17:45:16.952+0000,app,ee81c04491da08f668c84208758843639c3320c7f1ef64a53dc418d89b9c5ee3
200001,1000379,Krishna Wable,Frozen entrees,Frozen,2024-03-14,140.0,False,Basic,Wallet,5001,2025-06-16T17:45:16.952+0000,web,962a8dae862ef6a60a6bed2e03de321cef464e1260ed880c1399f012a5b39ec8
200002,1000485,Vrinda Som,Mayonnaise,Condiments,2024-06-02,60.0,True,Premium,Card,5002,2025-06-16T17:45:16.952+0000,api,aac30b156607b4631c2b365da1900299a0f032559e7edd9283fd334615a2cb87
200003,1000869,Nathan Ghosh,Yogurt,Dairy,2024-07-27,30.0,False,,NetBanking,5003,2025-06-16T17:45:16.952+0000,app,f07876fe032ff06b510ebfd99ee841192a6dad0f705748cc25f18d1a77c00ebe
200004,1000458,Jhalak Salvi,Frozen pizza,Frozen,2024-05-21,130.0,True,Free,UPI,5004,2025-06-16T17:45:16.952+0000,web,0a84a68d67095ce5b8ef08aa5d35da78c8b0f4e6f6a100e65fb83c5b08dac778
200005,1000698,Anamika Prabhakar,Bananas,Produce,2024-12-17,40.0,False,Basic,Wallet,5005,2025-06-16T17:45:16.952+0000,api,459f3c8aa22ca6d77a9cc6a4c2d1c7bc358a7c7f064a57bf5c4abbd37bf66465
200006,1000052,Akshay Singh,Soda,Beverage,2024-03-21,45.0,True,Premium,Card,5006,2025-06-16T17:45:16.952+0000,app,c1a5a07b8739e383eaa277a51cde4fc1bf942993692940419313777d765a781a
200007,1000791,Isaiah Kuruvilla,Cheese,Dairy,2024-03-03,90.0,False,,NetBanking,5007,2025-06-16T17:45:16.952+0000,web,e8143db93e3c07d920dcda5c5b1c903afaa83b24e1a4e8e0939686cf4452ef06
200008,1000802,Damyanti Lal,Soda,Beverage,2024-05-16,45.0,True,Free,UPI,5008,2025-06-16T17:45:16.952+0000,api,d692b2d1568075a8a920e6f5a22a0fc352ee36b06c9a73d700e1e0cbd1bbc621
200009,1000966,Robert Aurora,Toilet paper,Household,2024-09-10,80.0,False,Basic,Wallet,5009,2025-06-16T17:45:16.952+0000,app,f0b5a081ba5a3314bbbb0bd0b133fdfcc3b425838b342a6b9087069a33bea4f7


In [0]:
display(df_transactions.limit(50))

transaction_id,customer_id,customer_name,product_name,product_category,payment_date,amount,refund_flag,plan_type,payment_method,batch_id,ingest_ts,source,record_hash
200000,1000615,Yagnesh Dora,Quinoa,Grains,2024-11-29,80.0,True,Free,UPI,5000,2025-06-16T17:45:18.471+0000,app,ee81c04491da08f668c84208758843639c3320c7f1ef64a53dc418d89b9c5ee3
200001,1000379,Krishna Wable,Frozen entrees,Frozen,2024-03-14,140.0,False,Basic,Wallet,5001,2025-06-16T17:45:18.471+0000,web,962a8dae862ef6a60a6bed2e03de321cef464e1260ed880c1399f012a5b39ec8
200002,1000485,Vrinda Som,Mayonnaise,Condiments,2024-06-02,60.0,True,Premium,Card,5002,2025-06-16T17:45:18.471+0000,api,aac30b156607b4631c2b365da1900299a0f032559e7edd9283fd334615a2cb87
200003,1000869,Nathan Ghosh,Yogurt,Dairy,2024-07-27,30.0,False,,NetBanking,5003,2025-06-16T17:45:18.471+0000,app,f07876fe032ff06b510ebfd99ee841192a6dad0f705748cc25f18d1a77c00ebe
200004,1000458,Jhalak Salvi,Frozen pizza,Frozen,2024-05-21,130.0,True,Free,UPI,5004,2025-06-16T17:45:18.471+0000,web,0a84a68d67095ce5b8ef08aa5d35da78c8b0f4e6f6a100e65fb83c5b08dac778
200005,1000698,Anamika Prabhakar,Bananas,Produce,2024-12-17,40.0,False,Basic,Wallet,5005,2025-06-16T17:45:18.471+0000,api,459f3c8aa22ca6d77a9cc6a4c2d1c7bc358a7c7f064a57bf5c4abbd37bf66465
200006,1000052,Akshay Singh,Soda,Beverage,2024-03-21,45.0,True,Premium,Card,5006,2025-06-16T17:45:18.471+0000,app,c1a5a07b8739e383eaa277a51cde4fc1bf942993692940419313777d765a781a
200007,1000791,Isaiah Kuruvilla,Cheese,Dairy,2024-03-03,90.0,False,,NetBanking,5007,2025-06-16T17:45:18.471+0000,web,e8143db93e3c07d920dcda5c5b1c903afaa83b24e1a4e8e0939686cf4452ef06
200008,1000802,Damyanti Lal,Soda,Beverage,2024-05-16,45.0,True,Free,UPI,5008,2025-06-16T17:45:18.471+0000,api,d692b2d1568075a8a920e6f5a22a0fc352ee36b06c9a73d700e1e0cbd1bbc621
200009,1000966,Robert Aurora,Toilet paper,Household,2024-09-10,80.0,False,Basic,Wallet,5009,2025-06-16T17:45:18.471+0000,app,f0b5a081ba5a3314bbbb0bd0b133fdfcc3b425838b342a6b9087069a33bea4f7


In [0]:
# %sql
# SELECT 
#     DISTINCT c.lifecycle_stage
# FROM transactions t
# JOIN customers c 
#     ON t.customer_id = c.customer_id
# WHERE c.lifecycle_stage NOT IN ('Paying', 'Retained', 'Lost');

In [0]:
# %sql
# SELECT 
#     customer_id,
#     COUNT(*) AS num_transactions
# FROM transactions
# GROUP BY customer_id
# ORDER BY num_transactions DESC
# LIMIT 10;


In [0]:
# %sql
# SELECT 
#     t.transaction_id,
#     t.customer_id,
#     t.payment_date,
#     c.activation_date,
#     c.churn_date
# FROM transactions t
# JOIN customers c ON t.customer_id = c.customer_id
# WHERE 
#     t.payment_date < c.activation_date
#     OR (c.churn_date IS NOT NULL AND t.payment_date > c.churn_date);


In [0]:
# %sql
# SELECT 
#     product_name, 
#     ROUND(AVG(amount), 2) AS avg_price,
#     COUNT(*) AS num_sales
# FROM transactions
# GROUP BY product_name
# ORDER BY avg_price DESC;


In [0]:
%sql
    SELECT 
    payment_method, 
    COUNT(*) AS method_count
FROM transactions
GROUP BY payment_method;


payment_method,method_count
Wallet,1250
Card,1250
NetBanking,1250
UPI,1250


In [0]:
import dbldatagen as dg
from pyspark.sql.types import StringType, LongType, FloatType, BooleanType, TimestampType
from faker import Faker
import random
from datetime import timedelta, datetime

fake = Faker()
spark.conf.set("spark.sql.shuffle.partitions", "4")

# Load customers from valid lifecycle stages
customer_df = spark.sql("""
    SELECT customer_id, name, activation_date, churn_date
    FROM customers
    WHERE lifecycle_stage IN ('Activated', 'Engaged', 'Paying', 'Retained', 'Lost')
""").cache()

customer_ids = [row['customer_id'] for row in customer_df.collect()]
activation_dates = {row['customer_id']: row['activation_date'] for row in customer_df.collect()}
churn_dates = {row['customer_id']: row['churn_date'] for row in customer_df.collect()}

# Weighted VIP sampling
vip_customers = random.sample(customer_ids, int(0.1 * len(customer_ids)))
weighted_customer_ids = customer_ids + vip_customers * 5

# Config
num_events = 1000
event_customer_samples = random.choices(weighted_customer_ids, k=num_events)

# Event dates after activation and before churn
def random_event_time(cust_id):
    activation = activation_dates[cust_id]
    churn = churn_dates.get(cust_id)
    activation_dt = datetime.combine(activation, datetime.min.time())
    if churn:
        churn_dt = datetime.combine(churn, datetime.min.time())
        max_days = (churn_dt - activation_dt).days
        delta_days = random.randint(0, max_days if max_days > 0 else 1)
    else:
        delta_days = random.randint(0, 90)
    delta_seconds = random.randint(0, 86400)
    return activation_dt + timedelta(days=delta_days, seconds=delta_seconds)

event_times = [random_event_time(cid) for cid in event_customer_samples]

# Static pool values
event_types = ["login", "logout", "purchase", "cart_add", "cart_remove", "wishlist_add", "review", "support_chat"]
platforms = ["web", "mobile_app", "tablet_app"]
features = ["search", "product_view", "checkout", "payment", "profile_update", "notification", "chat"]
sources = ["app", "web", "api"]
session_ids = [fake.uuid4() for _ in range(num_events)]
session_durations = [random.randint(30, 900) for _ in range(num_events)]
event_values = [round(random.uniform(1, 100), 2) for _ in range(num_events)]
event_success_flags = [random.choices([True, False], weights=[0.85, 0.15])[0] for _ in range(num_events)]
repeat_use_counts = [random.randint(1, 5) for _ in range(num_events)]

# Generate dataframe
df_events_gen = (
    dg.DataGenerator(spark, name="events", rows=num_events, partitions=4)
    .withColumn("event_id", LongType(), minValue=700000, uniqueValues=num_events * 2)
    .withColumn("customer_id", LongType(), values=event_customer_samples)
    .withColumn("event_type", StringType(), values=event_types)
    .withColumn("event_date", TimestampType(), values=event_times)
    .withColumn("platform", StringType(), values=platforms)
    .withColumn("feature_used", StringType(), values=features)
    .withColumn("session_id", StringType(), values=session_ids)
    .withColumn("session_duration", LongType(), values=session_durations)
    .withColumn("event_value", FloatType(), values=event_values)
    .withColumn("event_success_flag", BooleanType(), values=event_success_flags)
    .withColumn("repeat_feature_use_count", LongType(), values=repeat_use_counts)
    .withColumn("batch_id", LongType(), minValue=8000, maxValue=9000)
    .withColumn("ingest_ts", "timestamp", expr="current_timestamp()")
    .withColumn("source", StringType(), values=sources)
    .withColumn("record_hash", StringType(), expr="sha2(concat(event_id, customer_id), 256)")
)

df_events = df_events_gen.build()
df_events.createOrReplaceTempView("events")
display(df_events.limit(10))


event_id,customer_id,event_type,event_date,platform,feature_used,session_id,session_duration,event_value,event_success_flag,repeat_feature_use_count,batch_id,ingest_ts,source,record_hash
700000,1000859,login,2024-06-15T00:06:53.000+0000,web,search,b27ab346-e5f0-4429-a828-f0b09474e0e4,506,93.43,False,4,8000,2025-06-16T17:46:21.448+0000,app,9e0f8c8ef79abbd6ce1ce1242cdec5b7f08c5d5faf8a9e839016a4e75d6dffa0
700001,1000171,logout,2024-09-15T01:33:42.000+0000,mobile_app,product_view,6821cc3f-1bee-4912-aec1-2d569b4e1fce,578,10.67,True,5,8001,2025-06-16T17:46:21.448+0000,web,5d82097dcc0897edcdd8343b0953f4fa5238b5730b0df0afc39ad0aaed746226
700002,1000856,purchase,2024-06-16T04:01:21.000+0000,tablet_app,checkout,06914720-8ca4-4da1-a828-e30935436c67,419,19.73,True,5,8002,2025-06-16T17:46:21.448+0000,api,ed170ea8314bbd15815da990521ca19f2115167c6e6d0a352c3585ac4a9d6924
700003,1000797,cart_add,2024-03-17T08:59:04.000+0000,web,payment,9d000aec-f34e-4749-869c-f06cc4a1455e,696,78.52,True,3,8003,2025-06-16T17:46:21.448+0000,app,91ac0abf2d3e5319b12637f1b38311210a32105f96afe40621809b39833598b0
700004,1000650,cart_remove,2024-11-17T12:55:46.000+0000,mobile_app,profile_update,6e26543e-19f0-4a28-a467-e3b5707d8baa,73,64.34,True,4,8004,2025-06-16T17:46:21.448+0000,web,9f696da02a397fa914f13311758ec4dad3a1a9b4eb821587776647bd1ecf042b
700005,1000579,wishlist_add,2024-10-20T16:38:41.000+0000,tablet_app,notification,38658ce7-88e3-488c-ad06-c843f436ac99,46,75.23,True,4,8005,2025-06-16T17:46:21.448+0000,api,a7bbb8f4cf6cf238592a1ab55a7449cd45eb3c78cd0f19e832a4121b0fab91a1
700006,1000227,review,2024-09-01T00:35:23.000+0000,web,chat,42143556-9220-4883-87ca-1e413e9ba953,574,6.9,True,1,8006,2025-06-16T17:46:21.448+0000,app,9f063ee6e801943b172fc78fe5833c7f71ede8d1f926b4d286d06a623bb04065
700007,1000999,support_chat,2024-12-04T17:15:34.000+0000,mobile_app,search,a4d94af4-d431-4c37-9f0f-ec8de63aa844,199,14.69,False,5,8007,2025-06-16T17:46:21.448+0000,web,93d3643a794baab5545467b5952c759a487e2a214bd7e4961f4dc22159fc3061
700008,1000547,login,2024-07-16T05:42:29.000+0000,tablet_app,product_view,05ec52b6-a465-4603-9118-d2d6aa8b82e8,90,68.75,True,3,8008,2025-06-16T17:46:21.448+0000,api,fc536455d55062f8399113a47cb989387b34566d7a502eba7bcbd1abe8065f44
700009,1000834,logout,2024-05-17T18:45:31.000+0000,web,checkout,f027a15c-11cf-4d7b-a973-e416f32347e2,426,66.9,True,2,8009,2025-06-16T17:46:21.448+0000,app,8f70b0ec9c77c265b51db1e077f0e64307cc0ebfa0fb06e8697180c324e3be99


In [0]:
# %sql
# -- 2.1 Validate customer_id exists in customers table
# SELECT e.customer_id
# FROM events e
# LEFT JOIN customers c ON e.customer_id = c.customer_id
# WHERE c.customer_id IS NULL;

In [0]:
# %sql
# SELECT e.*
# FROM events e
# JOIN customers c ON e.customer_id = c.customer_id
# WHERE e.event_date < c.activation_date
#    OR (c.churn_date IS NOT NULL AND e.event_date > c.churn_date);

In [0]:
# # %sql
# # -- 6.1 Customers with no events at all (should be minimal for engaged/retained)
# # SELECT c.customer_id, c.lifecycle_stage
# FROM customers c
# # LEFT JOIN events e ON c.customer_id = e.customer_id
# # WHERE e.customer_id IS NULL AND c.lifecycle_stage IN ('Activated', 'Engaged', 'Paying', 'Retained', 'Lost'); 

In [0]:
import dbldatagen as dg
from pyspark.sql.types import StringType, LongType, FloatType, DateType, IntegerType
from faker import Faker
from pyspark.sql.functions import expr
import random
from datetime import timedelta
from collections import defaultdict

fake = Faker()
spark.conf.set("spark.sql.shuffle.partitions", "4")

# Step 1: Pull valid country-city pairs from customer table
customer_geo_df = spark.sql("""
    SELECT DISTINCT country, city
    FROM customers
    WHERE country IS NOT NULL AND city IS NOT NULL
""").collect()

# Create mapping: country → list of cities
country_city_map = defaultdict(set)
for row in customer_geo_df:
    country_city_map[row["country"]].add(row["city"])

# Convert to list format
for key in country_city_map:
    country_city_map[key] = list(country_city_map[key])

# Step 2: Sample (country, city) pairs per campaign
num_campaigns = 1000
target_country_samples = []
target_city_samples = []

for _ in range(num_campaigns):
    country = random.choice(list(country_city_map.keys()))
    city = random.choice(country_city_map[country])
    target_country_samples.append(country)
    target_city_samples.append(city)

# Step 3: Other campaign attributes
channels = ["email", "social_media", "tv", "radio", "print", "web", "mobile_push"]
signup_sources = ["organic", "paid", "referral", "affiliate"]
campaign_names = [f"{fake.catch_phrase()} Campaign" for _ in range(num_campaigns)]

# Randomized campaign timelines and metrics
campaign_start_dates = [fake.date_between(start_date='-1y', end_date='today') for _ in range(num_campaigns)]
campaign_end_dates = [start + timedelta(days=random.randint(7, 60)) for start in campaign_start_dates]
costs = [round(random.uniform(1000, 50000), 2) for _ in range(num_campaigns)]
impressions = [random.randint(10000, 1000000) for _ in range(num_campaigns)]
clicks = [random.randint(1000, impressions[i]) for i in range(num_campaigns)]
conversions = [random.randint(100, clicks[i]) for i in range(num_campaigns)]

# Step 4: Build the Campaigns table
df_campaigns_gen = (
    dg.DataGenerator(spark, name="campaigns", rows=num_campaigns, partitions=2)
    .withColumn("campaign_id", LongType(), minValue=900000, uniqueValues=num_campaigns * 2)
    .withColumn("campaign_name", StringType(), values=campaign_names)
    .withColumn("channel", StringType(), values=channels)
    .withColumn("start_date", DateType(), values=campaign_start_dates)
    .withColumn("end_date", DateType(), values=campaign_end_dates)
    .withColumn("cost", FloatType(), values=costs)
    .withColumn("impressions", IntegerType(), values=impressions)
    .withColumn("clicks", IntegerType(), values=clicks)
    .withColumn("conversions", IntegerType(), values=conversions)
    .withColumn("signup_source", StringType(), values=signup_sources)
    .withColumn("target_country", StringType(), values=target_country_samples)
    .withColumn("target_city", StringType(), values=target_city_samples)
    .withColumn("batch_id", LongType(), minValue=9000, maxValue=10000)
    .withColumn("ingest_ts", "timestamp", expr("current_timestamp()"))
    .withColumn("source", StringType(), values=["internal", "external"])
    .withColumn("record_hash", StringType(), expr="sha2(cast(campaign_id as string), 256)"))

df_campaigns = df_campaigns_gen.build()
df_campaigns.createOrReplaceTempView("campaigns")

# Optional: Preview
display(df_campaigns.limit(10))


campaign_id,campaign_name,channel,start_date,end_date,cost,impressions,clicks,conversions,signup_source,target_country,target_city,batch_id,ingest_ts,source,record_hash
900000,Exclusive transitional intranet Campaign,email,2024-11-11,2024-12-24,6574.29,820219,772850,205090,organic,India,Delhi,9000,2024-01-01T00:00:00.000+0000,internal,d1d282f04815e254882e9fee0144155003a6c2383f1614c3c3a8f575b6f2db2f
900001,Seamless exuding encryption Campaign,social_media,2024-06-26,2024-07-12,41429.43,677674,548056,190204,paid,Singapore,Singapore,9001,2024-01-02T00:00:00.000+0000,external,68bc4088507d3b4859f20bd52d28745726f5cced789e374bebd8f0fef4e91e26
900002,Extended human-resource database Campaign,tv,2024-08-20,2024-09-04,41748.41,327654,238272,54304,referral,Singapore,Singapore,9002,2024-01-03T00:00:00.000+0000,internal,d4c81d964dc5457b065dac9c046026f4d3c769c6e7c2d6df7c8ba4f291e0fbfd
900003,Virtual impactful capability Campaign,radio,2024-07-23,2024-09-02,33374.13,555165,141805,135663,affiliate,India,Bangalore,9003,2024-01-04T00:00:00.000+0000,external,cf9c918c6f1963bc717ed02671f8040e1fcf85b714c0dfe0145234f8b0099c7c
900004,Configurable zero-defect attitude Campaign,print,2025-01-23,2025-03-07,39911.33,387963,91909,46639,organic,Philippines,Cebu,9004,2024-01-05T00:00:00.000+0000,internal,5e4e3c5e2c4e91391e5128ee164b345f7732e3d6e968846cc37f2f659b3ffa6b
900005,Front-line user-facing secured line Campaign,web,2024-11-20,2024-12-30,16857.36,961666,228706,38215,paid,Sri Lanka,Kandy,9005,2024-01-06T00:00:00.000+0000,external,8c60e46987dec610c92362063d9bc20b9e1d1d3659f9398093fd4286103208cb
900006,User-friendly tertiary Graphical User Interface Campaign,mobile_push,2024-11-24,2024-12-31,25382.31,100495,5259,788,referral,Singapore,Singapore,9006,2024-01-07T00:00:00.000+0000,internal,51342ec2680e38ebc2664425c0783671dd5aa3690fc1c6cadd5f991c43b8ea1f
900007,Virtual 6thgeneration strategy Campaign,email,2025-01-26,2025-03-03,36811.87,566076,540173,426686,affiliate,Singapore,Singapore,9007,2024-01-08T00:00:00.000+0000,external,ce56a2df49fb989de29ad24f48913d860abbeaaa81fe6cd50483c59da676edfa
900008,Exclusive next generation productivity Campaign,social_media,2025-04-10,2025-05-24,30971.16,114609,90210,63029,organic,Indonesia,Jakarta,9008,2024-01-09T00:00:00.000+0000,internal,0f9106768c175c1cffafae8700c1aa7e04f6c94c7dd46104b534e7f77b0f554c
900009,Down-sized composite service-desk Campaign,tv,2024-08-25,2024-09-09,42470.04,892720,712183,698903,paid,Singapore,Singapore,9009,2024-01-10T00:00:00.000+0000,external,b415b193128ffb38ac4cc26c6a6a4b1e596881adaf454e84617e814a409fa279


In [0]:
import dbldatagen as dg
from pyspark.sql.types import StringType, LongType, TimestampType, IntegerType
from faker import Faker
import random
from datetime import timedelta, datetime

fake = Faker()
spark.conf.set("spark.sql.shuffle.partitions", "4")

# Step 1: Load only valid customers (from lifecycle stages that can have issues)
customer_df = spark.sql("""
    SELECT customer_id, activation_date, churn_date, country, city
    FROM customers
    WHERE lifecycle_stage IN ('Activated', 'Engaged', 'Paying', 'Retained', 'Lost')
""").cache()

# Prepare customer lookups
customer_data = customer_df.collect()
customer_ids = [row['customer_id'] for row in customer_data]
activation_dates = {row['customer_id']: row['activation_date'] for row in customer_data}
churn_dates = {row['customer_id']: row['churn_date'] for row in customer_data}
countries = {row['customer_id']: row['country'] for row in customer_data}
cities = {row['customer_id']: row['city'] for row in customer_data}

# Sample tickets
num_tickets = 1000
ticket_customer_samples = random.choices(customer_ids, k=num_tickets)

# Issue and resolution values
issue_types = ["billing", "technical", "account", "shipping", "refund", "general inquiry"]
resolution_statuses = ["resolved", "pending", "escalated", "closed_without_resolution"]
agent_names = [fake.name() for _ in range(40)]
sources = ["email", "chat", "phone"]

# Generate ticket times and supporting data
ticket_opened_times = []
ticket_closed_times = []
resolution_times = []
linked_transaction_ids = []
sampled_agent_names = []
feedback_scores = []
ticket_countries = []
ticket_cities = []

for cid in ticket_customer_samples:
    activation = activation_dates[cid]
    churn = churn_dates.get(cid)
    activation_dt = datetime.combine(activation, datetime.min.time())

    # Ticket opened after activation but before churn (if exists)
    max_days = (datetime.combine(churn, datetime.min.time()) - activation_dt).days if churn else 90
    delta_days = random.randint(1, max_days if max_days > 1 else 2)
    opened = activation_dt + timedelta(days=delta_days, seconds=random.randint(0, 86400))
    closed = opened + timedelta(days=random.randint(0, 15), seconds=random.randint(0, 3600)) if random.random() > 0.15 else None
    resolution_time = (closed - opened).total_seconds() / 3600 if closed else None

    # Populate per-ticket values
    ticket_opened_times.append(opened)
    ticket_closed_times.append(closed)
    resolution_times.append(int(resolution_time) if resolution_time else None)
    linked_transaction_ids.append(random.randint(200000, 210000))  # Simulated range
    sampled_agent_names.append(random.choice(agent_names))
    feedback_scores.append(random.randint(1, 5) if random.random() > 0.3 else None)
    ticket_countries.append(countries[cid])
    ticket_cities.append(cities[cid])

# Generate table
df_support_gen = (
    dg.DataGenerator(spark, name="support", rows=num_tickets, partitions=4)
    .withColumn("ticket_id", LongType(), minValue=1000000, uniqueValues=num_tickets * 2)
    .withColumn("customer_id", LongType(), values=ticket_customer_samples)
    .withColumn("issue_type", StringType(), values=issue_types)
    .withColumn("ticket_opened_date", TimestampType(), values=ticket_opened_times)
    .withColumn("ticket_closed_date", TimestampType(), values=ticket_closed_times)
    .withColumn("resolution_status", StringType(), values=resolution_statuses)
    .withColumn("agent_name", StringType(), values=sampled_agent_names)
    .withColumn("feedback_score", IntegerType(), values=feedback_scores)
    .withColumn("resolution_time", IntegerType(), values=resolution_times)
    .withColumn("linked_transaction_id", LongType(), values=linked_transaction_ids)
    .withColumn("country", StringType(), values=ticket_countries)
    .withColumn("city", StringType(), values=ticket_cities)
    .withColumn("batch_id", LongType(), minValue=10000, maxValue=11000)
    .withColumn("ingest_ts", "timestamp", expr="current_timestamp()")
    .withColumn("source", StringType(), values=sources)
    .withColumn("record_hash", StringType(), expr="sha2(concat(ticket_id, customer_id), 256)")
)

# Build & register
df_support = df_support_gen.build()
df_support.createOrReplaceTempView("support")

# Optional: View sample
display(df_support.limit(10))

ticket_id,customer_id,issue_type,ticket_opened_date,ticket_closed_date,resolution_status,agent_name,feedback_score,resolution_time,linked_transaction_id,country,city,batch_id,ingest_ts,source,record_hash
1000000,1000859,billing,2024-05-14T11:15:06.000+0000,2024-05-17T11:33:24.000+0000,resolved,Kimberly Koch,,72.0,204903,Singapore,Singapore,10000,2025-06-16T17:48:50.940+0000,email,792703ad9091867e79971d45fa0104021fefe7105a8637613fa0008479cfdecf
1000001,1000420,technical,2024-04-07T05:47:55.000+0000,,pending,Gina Hunter,3.0,,200908,Japan,Kyoto,10001,2025-06-16T17:48:50.940+0000,chat,1ccfd6e80576acd873d0426a70ac4b588648753c5c413d57596c57969dd2277a
1000002,1000553,account,2024-09-23T21:42:17.000+0000,,escalated,Donald Johnson,1.0,,203484,Indonesia,Bandung,10002,2025-06-16T17:48:50.940+0000,phone,7343f751da9d11337d87d1b8d9f1bdb1e30e55a3fe730d9ded31d4144bcac534
1000003,1000412,shipping,2024-03-09T15:14:41.000+0000,2024-03-23T15:33:38.000+0000,closed_without_resolution,Jason Stevenson,4.0,336.0,209014,Thailand,Chiang Mai,10003,2025-06-16T17:48:50.940+0000,email,1100db1260fa24310f0fc337b2e7cea42b6ecab71911e751c705ab5fbe8234cd
1000004,1000114,refund,2024-04-28T01:40:52.000+0000,2024-05-02T02:23:54.000+0000,resolved,Heather Graham,4.0,96.0,201915,Malaysia,Kuala Lumpur,10004,2025-06-16T17:48:50.940+0000,chat,5e568ed3207dfa126e21db59c18fec0c33d77d3ea898d9df0cde546934aa4ec9
1000005,1000938,general inquiry,2024-10-17T04:45:19.000+0000,2024-10-31T05:26:24.000+0000,pending,Amanda Jennings,,336.0,203363,Indonesia,Jakarta,10005,2025-06-16T17:48:50.940+0000,phone,77cb73109657db9ac5d60a4cc2971edaa8e84991dd6384d2640b07c85428b355
1000006,1000727,billing,2025-01-10T22:28:28.000+0000,2025-01-25T23:02:52.000+0000,escalated,Mary Jones,,360.0,200058,Singapore,Singapore,10006,2025-06-16T17:48:50.940+0000,email,2808be81f70aa7be57590e4175d26ff49ded6aa0be3e37b27a649896b5b57138
1000007,1000978,technical,2024-10-08T08:19:27.000+0000,,closed_without_resolution,Valerie Campbell,4.0,,202933,Nepal,Kathmandu,10007,2025-06-16T17:48:50.940+0000,chat,21dd48542b47b227045f01d278c3197dc9ebb549c48b94718175532eeb52d24d
1000008,1000050,account,2024-03-01T12:20:04.000+0000,2024-03-08T12:42:34.000+0000,resolved,Wendy Woods,5.0,168.0,206442,Philippines,Cebu,10008,2025-06-16T17:48:50.940+0000,phone,eb48be3b2e2131d47754829591b43268c8f604050cd0e82abc301aa5e529c4ab
1000009,1000390,shipping,2024-03-14T12:11:17.000+0000,,pending,Crystal Neal,,,202386,Thailand,Chiang Mai,10009,2025-06-16T17:48:50.940+0000,email,e2aa0c8836094a59249dc913edc198a2a8bf706f8408449a507e8c042833ce36


In [0]:
%sql
SELECT DISTINCT c.lifecycle_stage
FROM support s
LEFT JOIN customers c ON s.customer_id = c.customer_id
WHERE c.lifecycle_stage NOT IN ('Activated', 'Engaged', 'Paying', 'Retained', 'Lost');


lifecycle_stage


In [0]:
%sql
SELECT s.customer_id, c.activation_date, s.ticket_opened_date
FROM support s
JOIN customers c ON s.customer_id = c.customer_id
WHERE s.ticket_opened_date < c.activation_date;


customer_id,activation_date,ticket_opened_date


In [0]:
%sql
SELECT ticket_id, ticket_opened_date, ticket_closed_date
FROM support
WHERE ticket_closed_date IS NOT NULL
  AND ticket_closed_date < ticket_opened_date;


ticket_id,ticket_opened_date,ticket_closed_date


In [0]:
%sql
SELECT ticket_id, ticket_closed_date, resolution_time
FROM support
WHERE (ticket_closed_date IS NULL AND resolution_time IS NOT NULL)
   OR (ticket_closed_date IS NOT NULL AND resolution_time IS NULL);

ticket_id,ticket_closed_date,resolution_time


In [0]:
%sql
CREATE SCHEMA dev

In [0]:
%sql
CREATE OR REPLACE TABLE dev.stg_customers AS
SELECT * FROM customers

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE dev.stg_transactions AS
SELECT * FROM transactions

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE dev.stg_events AS
SELECT * FROM events

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE dev.stg_campaigns AS
SELECT * FROM campaigns

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE dev.stg_support AS
SELECT * FROM support

num_affected_rows,num_inserted_rows
