## 00_setup


In [0]:
import random, string
import numpy as np
from datetime import datetime, timedelta
from pyspark.sql import functions as F, types as T

In [0]:
spark.conf.set("spark.sql.session.timeZone", "UTC")

dbutils.widgets.text("catalog", "acme_dev")
dbutils.widgets.text("base_path", "abfss://landing@acmemcd.dfs.core.windows.net/")  
dbutils.widgets.text("start_date", "2025-01-01")
dbutils.widgets.text("days", "30")
dbutils.widgets.text("seed", "42")

CATALOG = dbutils.widgets.get("catalog")
BASE = dbutils.widgets.get("base_path").rstrip("/")
START_DATE = dbutils.widgets.get("start_date")
DAYS = int(dbutils.widgets.get("days"))
SEED = int(dbutils.widgets.get("seed"))

In [0]:
rng = random.Random(SEED)
np.random.seed(SEED)

# Helpers
def rand_pick(options):
    return options[rng.randrange(len(options))]

def misspell(s: str) -> str:
    if not s or len(s) < 3: return s
    # 3 simple variants: swap, drop, replace
    ops = ["swap", "drop", "replace"]
    op = rand_pick(ops)
    i = rng.randrange(1, len(s)-1)
    if op == "swap":
        s_list = list(s)
        s_list[i-1], s_list[i] = s_list[i], s_list[i-1]
        return "".join(s_list)
    if op == "drop":
        return s[:i] + s[i+1:]
    # replace
    return s[:i] + rand_pick(list(string.ascii_lowercase)) + s[i+1:]

def jitter_tz(ts: datetime) -> datetime:
    # simulate timezone drift (+/- hours) around DST
    shift = rng.choice([-8,-7,-6,-5,-4,-3, 3,4,5])  # PST..EST..weird
    return ts + timedelta(hours=shift)

COUNTRIES = ["US","US","US","CA","GB","DE","FR","AU","IN","BR","XX"] # XX invalid
REGIONS = ["north", "SOUTH", "West", "EAST", "emEA", "apac"]
EMOJIS = ["🙂","🚀","🔥","💡","😬","🧪","✨","🤖"]

# Names (light list)
FIRST = ["alex","casey","jordan","taylor","morgan","jamie","riley","nico","sam","chris","lee","devon","avery"]
LAST  = ["smith","garcia","patel","nguyen","johnson","brown","lee","martin","anderson","williams","jackson","thompson"]

# Create base customers (10k+) used to derive cross-system data
CUSTOMERS = 12000

base_customers = (spark.range(1, CUSTOMERS+1)
  .withColumn("account_id", (F.col("id") + F.lit(100000)).cast("string"))
  .withColumn("first_name", F.element_at(F.array([F.lit(x) for x in FIRST]), (F.rand(SEED)*len(FIRST)).cast("int")+1))
  .withColumn("last_name",  F.element_at(F.array([F.lit(x) for x in LAST ]),  (F.rand(SEED+1)*len(LAST )).cast("int")+1))
  .withColumn("email", F.concat_ws(".", F.col("first_name"), F.col("last_name"), F.col("account_id")).cast("string"))
  .withColumn("domain", F.element_at(F.array(F.lit("@example.com"),F.lit("@corp.com"),F.lit("@work.io")), (F.rand(SEED+2)*3).cast("int")+1))
  .withColumn("email", F.concat_ws("", F.col("email"), F.col("domain")))
  .withColumn("phone", F.concat_ws("", F.lit("+1"), F.lpad((F.col("id")*7%999999999).cast("string"), 10, "0")))
  .withColumn("addr_line1", F.concat_ws(" ", F.lit(""), F.col("id"), F.lit("Main St")))
  .withColumn("city", F.element_at(F.array(F.lit("Austin"),F.lit("Seattle"),F.lit("Toronto"),F.lit("London"),F.lit("Berlin")), (F.rand(SEED+3)*5).cast("int")+1))
  .withColumn("postal", F.lpad((F.col("id")*13 % 99999).cast("string"), 5, "0"))
  .withColumn("country_code", F.element_at(F.array([F.lit(x) for x in COUNTRIES]), (F.rand(SEED+4)*len(COUNTRIES)).cast("int")+1))
  .drop("id")
)

base_customers.createOrReplaceTempView("base_customers")
print(f"Base customers ready: {base_customers.count():,}")


### 10_crm

In [0]:
from datetime import date
from pyspark.sql import functions as F
from datetime import datetime

In [0]:
crm_root = f"{BASE}/crm" 

In [0]:
# 1. Derive contacts (2 per account on avg), with some shared emails
contacts_per_acct = 2
crm_contacts = (spark.table("base_customers")
  .withColumn("contact_seq", F.sequence(F.lit(1), F.lit(contacts_per_acct)))
  .withColumn("contact_id", F.explode("contact_seq"))
  .withColumn("contact_id", F.concat_ws("-", F.col("account_id"), F.col("contact_id")))
  .drop("contact_seq"))

# 10% share the same email across contacts (forces dedupe/lineage logic)
crm_contacts = (crm_contacts
  .withColumn("email", F.when(F.rand(SEED) < 0.10,
                               F.concat_ws("", F.col("first_name"), F.lit("."), F.col("last_name"), F.lit("@shared.com")))
                      .otherwise(F.col("email")))
)

# 2. Introduce misspellings in ~7% of names
misspell_udf = F.udf(misspell, T.StringType())
crm_contacts = (crm_contacts
  .withColumn("first_name", F.when(F.rand(SEED+5) < 0.07, misspell_udf("first_name")).otherwise("first_name"))
  .withColumn("last_name",  F.when(F.rand(SEED+6) < 0.07, misspell_udf("last_name" )).otherwise("last_name"))
)

# 3. Timestamps with timezone drift
base_ts = datetime.strptime(START_DATE+" 08:00:00", "%Y-%m-%d %H:%M:%S")
@F.udf("timestamp")
def drift_ts(_):
    ts = base_ts + timedelta(minutes=rng.randrange(0, 60*24*DAYS))
    return jitter_tz(ts)

crm_contacts = crm_contacts.withColumn("updated_at", drift_ts(F.lit(1)))

# 4. Phone field rename occurs halfway through the period
midpoint = datetime.strptime(START_DATE, "%Y-%m-%d") + timedelta(days=DAYS//2)
@F.udf("string")
def maybe_null_phone(updated_at):
    # 5% nulls to push quarantine; 10% malformed
    if rng.random() < 0.05: return None
    if rng.random() < 0.10: return "+1-555-abc-"+str(rng.randrange(1000,9999))
    return "+1" + str(rng.randrange(2000000000, 9999999999))

crm_contacts = crm_contacts.withColumn("primary_phone", maybe_null_phone("updated_at"))

# Write daily partitions with late arrivals (previous day files in later folders)
for d in range(DAYS):
    day = (datetime.strptime(START_DATE, "%Y-%m-%d") + timedelta(days=d)).date()
    day_path = f"{crm_root}/dt={day}"

    # Late arrivals: 5% of records from the previous day drop today
    todays = crm_contacts.where((F.hash("account_id") % DAYS) == d)
    if d > 0:
        late = crm_contacts.where((F.hash("account_id") % DAYS) == (d-1)).sample(0.05, seed=SEED)
        todays = todays.unionByName(late)

    # Before midpoint: "primary_phone"; after midpoint: rename to "phone_primary"
    todays = todays.repartition(400)

    phone_alias = "primary_phone" if datetime.combine(day, datetime.min.time()) < midpoint else "phone_primary"

    out = todays.selectExpr(
        "account_id",
        f"struct(contact_id, first_name, last_name, email, primary_phone as {phone_alias}) as contact",
        "updated_at",
        "country_code",
        "addr_line1",
        "city",
        "postal"
    )


    (out.coalesce(8)
            .write.mode("overwrite").json(day_path))

print("CRM daily JSON written with rename and late arrivals.")


### 20_marketing

In [0]:
mkt_root = f"{BASE}/marketing"


In [0]:
# Create monthly header maps for 3 months (extend if DAYS>90)
HEADER_VARIANTS = [
  {"lead_id":"lead_id","email":"email","first_name":"first_name","last_name":"last_name","country":"country_code","region":"region","utm_campaign":"utm_campaign","created_at":"created_at"},
  {"lead_id":"lead_key","email":"Email","first_name":"FirstName","last_name":"LastName","country":"Country","region":"Region","utm_campaign":"utm","created_at":"CreatedDate"},
  {"lead_id":"lid","email":"email_address","first_name":"fname","last_name":"lname","country":"country","region":"REGION","utm_campaign":"utmCampaign","created_at":"created"}
]

# Derive leads from customers (some missing emails)
leads = (spark.table("base_customers")
  .withColumn("lead_id", F.concat_ws("-", F.col("account_id"), F.lit("L")))
  .withColumn("email", F.when(F.rand(SEED) < 0.25, None).otherwise(F.col("email")))
  .withColumn("country", F.col("country_code"))
  .withColumn("region", F.element_at(F.array([F.lit(x) for x in REGIONS]), (F.rand(SEED+9)*len(REGIONS)).cast("int")+1))
  .withColumn("utm_campaign", F.element_at(F.array(F.lit("spring_launch"),F.lit("brand"),F.lit("retarget")), (F.rand(SEED+10)*3).cast("int")+1))
  .withColumn("created_at", F.current_timestamp())
  .select("lead_id","email","first_name","last_name","country","region","utm_campaign","created_at")
)

# Convert casing inconsistently
leads = (leads
  .withColumn("first_name", F.when(F.rand(SEED) < 0.3, F.upper("first_name")).otherwise(F.initcap("first_name")))
  .withColumn("last_name",  F.when(F.rand(SEED) < 0.3, F.lower("last_name")).otherwise(F.initcap("last_name")))
)

# Write daily files, with header variant based on month index
for d in range(DAYS):
    day = (datetime.strptime(START_DATE, "%Y-%m-%d") + timedelta(days=d))
    day_path = f"{mkt_root}/dt={day.date()}"
    hv = HEADER_VARIANTS[(day.month-1) % len(HEADER_VARIANTS)]

    out = leads.selectExpr(
        f"lead_id as `{hv['lead_id']}`",
        f"email as `{hv['email']}`",
        f"first_name as `{hv['first_name']}`",
        f"last_name as `{hv['last_name']}`",
        f"country as `{hv['country']}`",
        f"region as `{hv['region']}`",
        f"utm_campaign as `{hv['utm_campaign']}`",
        f"cast(created_at as string) as `{hv['created_at']}`"
    )

    (out.coalesce(4)
       .write.mode("overwrite")
       .option("header","true")
       .csv(day_path))

print("Marketing CSV with monthly header drift written.")


## 30_billing

In [0]:
bill_root = f"{BASE}/billing"


In [0]:
invoices_per_cust = 5

billing = (spark.table("base_customers")
  .withColumn("customer_key", F.concat_ws("-", F.col("account_id"), F.lit("B")))
  .withColumn("invoice_seq", F.sequence(F.lit(1), F.lit(invoices_per_cust)))
  .withColumn("invoice_id", F.explode("invoice_seq"))
  .drop("invoice_seq")
  .withColumn("invoice_id", F.concat_ws("-", F.col("customer_key"), F.col("invoice_id")))
  .withColumn("amount", (F.rand(SEED+11)*10000).cast("decimal(12,2)"))
  .withColumn("currency", F.element_at(F.array(F.lit("USD"),F.lit("USD"),F.lit("EUR"),F.lit("GBP")), (F.rand(SEED+12)*4).cast("int")+1))
  .withColumn("status", F.element_at(F.array(F.lit("OPEN"),F.lit("PAID"),F.lit("PAST_DUE")), (F.rand(SEED+13)*3).cast("int")+1))
  .withColumn("invoice_date", F.date_add(F.lit(START_DATE), (F.rand(SEED+14)*DAYS).cast("int")))
  .withColumn("addr_line1", F.when(F.rand(SEED) < 0.2, F.upper("addr_line1")).otherwise(F.lower("addr_line1")))
  .withColumn("city", F.when(F.rand(SEED) < 0.2, F.upper("city")).otherwise(F.initcap("city")))
  .withColumn("email", F.when(F.rand(SEED) < 0.1, None).otherwise(F.col("email")))
)

# Intentionally inject duplicates with different casing (1% duplicates)
dupes = billing.sample(0.01, seed=SEED).withColumn("addr_line1", F.upper("addr_line1"))
billing = billing.unionByName(dupes)

# Partition by invoice_date to create many Parquet files
(billing.repartition(64)
  .write.mode("overwrite")
  .partitionBy("invoice_date")
  .parquet(bill_root))

print("Billing Parquet written with higher volume and duplicates.")


40_support

In [0]:
import os
sup_root = f"{BASE}/support"


In [0]:
import os
sup_root = f"{BASE}/support"
# Base requester emails (some nulls)
req = (spark.table("base_customers")
  .withColumn("requester_email", F.when(F.rand(SEED) < 0.15, None).otherwise(F.col("email")))
  .withColumn("priority", F.element_at(F.array(F.lit("low"),F.lit("normal"),F.lit("high")), (F.rand(SEED+15)*3).cast("int")+1))
  .withColumn("csat", F.element_at(F.array(F.lit("Good"),F.lit("Bad"),F.lit("Mixed"),F.lit("1"),F.lit("5")), (F.rand(SEED+16)*5).cast("int")+1))
  .withColumn("created_at", F.current_timestamp())
  .select("account_id","requester_email","priority","csat","created_at"))

# Create 1-3 tickets per requester
req = req.withColumn("tseq", F.sequence(F.lit(1), (F.rand(SEED+17)*3+1).cast("int")))
req = req.withColumn("ticket_id", F.explode("tseq")).drop("tseq")
req = req.withColumn("ticket_id", F.concat_ws("-", F.col("account_id"), F.col("ticket_id")))

# Add text with emojis and occasional merge references
@F.udf("string")
def ticket_text(account_id):
    base = f"Issue for acct {account_id}: " + rand_pick(["login failure","invoice dispute","feature request","bug report","data export"])
    if rng.random() < 0.5:
        base += " " + rand_pick(EMOJIS)
    return base

@F.udf("string")
def maybe_merged(ticket_id):
    # 8% of tickets reference a merge to a prior ticket
    if rng.random() < 0.08:
        return "merged-into:" + ticket_id.split("-")[-1]  # pseudo link
    return None

sup = (req
  .withColumn("text", ticket_text("account_id"))
  .withColumn("merge_ref", maybe_merged("ticket_id"))
)

# Write daily JSONL files; alternate encodings between UTF-8 and UTF-16LE
for d in range(DAYS):
    day = (datetime.strptime(START_DATE, "%Y-%m-%d") + timedelta(days=d)).date()
    day_dir = f"{sup_root}/dt={day}"
    dbutils.fs.mkdirs(day_dir)

    day_df = sup.where((F.hash("account_id") % DAYS) == d).select(
        "ticket_id","account_id","requester_email","priority","csat","created_at","text","merge_ref"
    )

    # Collect to driver and write JSONL with chosen encoding (small per-day sample)
    rows = [r.asDict(recursive=True) for r in day_df.limit(30000).collect()]  # cap to keep file sizes moderate
    enc = "utf-8" if d % 2 == 0 else "utf-16"
    local_path = f"/tmp/support_{day}.jsonl"
    with open(local_path, "w", encoding=enc, newline="\n") as f:
        for r in rows:
            import json
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

    dbutils.fs.cp(f"file:{local_path}", f"{day_dir}/support_{day}_{enc}.jsonl")

print("Support JSONL written with mixed encodings and merges.")




### 50_checks

In [0]:
paths = [f"{BASE}/crm", f"{BASE}/marketing", f"{BASE}/billing", f"{BASE}/support"]


In [0]:
for p in paths:
    print("\n===", p)
    display(dbutils.fs.ls(p))


In [0]:
from pyspark.sql import functions as F

crm_ct = spark.read.json(f"{BASE}/crm/*").count()
mkt_ct = spark.read.option("header","true").csv(f"{BASE}/marketing/*").count()
bill_ct = spark.read.parquet(f"{BASE}/billing/*").count()
# sup_ct = spark.read.json(f"{BASE}/support/*/*").count()

print(f"CRM rows: {crm_ct:,}")
print(f"Marketing rows: {mkt_ct:,}")
print(f"Billing rows: {bill_ct:,}")
# print(f"Support rows: {sup_ct:,}")
