##Parâmetros e helpers

In [0]:
from pyspark.sql import functions as F, types as T
from pyspark.sql import Row
from datetime import datetime, timedelta
from faker import Faker
import random
import string

# ===== Parâmetros =====
CATALOG = "workshop_modelagem"            # exemplo: "workshop_catalog" ou None se não usar Unity Catalog
SCHEMA  = "bronze"        # esquema/database onde serão criadas as tabelas
SEED    = 42

N_CUSTOMERS   = 1000
N_PRODUCTS    = 500
N_ORDERS      = 5000
N_ORDER_ITEMS = 12000

# Percentuais de "problemas"
P_DUP_ORDERS                = 0.025   # ~2.5% order_id duplicados
P_NULL_CUSTOMER_IN_ORDERS   = 0.05    # ~5% customer_id nulos
P_STATUS_CASE_VARIATION     = 0.25
P_STRING_DATE_IN_ORDERS     = 0.50    # metade como string, metade como date coerente
P_STRING_NUMERIC_IN_FIELDS  = 0.15    # % de numéricos como string

P_DUP_PRODUCT_ID            = 0.03
P_INCONSISTENT_IS_ACTIVE    = 0.40
P_NULL_BRAND_SUBCATEGORY    = 0.10

P_CUSTOMER_INCONSISTENCY    = 0.20    # estados "SP", "sp", "São Paulo"
P_CUSTOMER_DUP_DIFF_UPDATE  = 0.10    # duplicar customer_id com last_update_date diferente
P_EMPTY_FIELDS_CUSTOMER     = 0.05

P_DUP_ORDERITEM_SAME_OP     = 0.05    # duplicar (order_id, product_id) com updated_at diferente
P_NULLS_DISCOUNT_PROMO      = 0.15

random.seed(SEED)
fake = Faker("pt_BR")
Faker.seed(SEED)

# ===== Nome totalmente qualificado de tabela =====
def fqtn(table):
    if CATALOG:
        return f"`{CATALOG}`.`{SCHEMA}`.`{table}`"
    else:
        return f"`{SCHEMA}`.`{table}`"

# ===== Criar schema/database =====
if CATALOG:
    spark.sql(f"CREATE CATALOG IF NOT EXISTS `{CATALOG}`")
    spark.sql(f"CREATE SCHEMA  IF NOT EXISTS `{CATALOG}`.`{SCHEMA}`")
else:
    spark.sql(f"CREATE DATABASE IF NOT EXISTS `{SCHEMA}`")

# ===== Utilidades =====
STATUSES = ["delivered","shipped","processing","cancelled","returned"]

def random_status_inconsistent():
    s = random.choice(STATUSES)
    if random.random() < P_STATUS_CASE_VARIATION:
        # variações de capitalização
        choices = [s.upper(), s.capitalize(), s.lower()]
        s = random.choice(choices)
    return s

def random_date_between(days_back=365):
    base = datetime.utcnow()
    delta = timedelta(days=random.randint(0, days_back), seconds=random.randint(0, 86399))
    d = base - delta
    return d

def random_date_mixed_formats(dt):
    # retorna string em formatos variados ("/", "-", com/sem tempo)
    # ex.: "2025-10-25", "2025/10/25 14:33:20", "25/10/2025", etc.
    formats = [
        "%Y-%m-%d",
        "%Y/%m/%d",
        "%Y-%m-%d %H:%M:%S",
        "%d/%m/%Y",
        "%d-%m-%Y %H:%M:%S",
    ]
    return dt.strftime(random.choice(formats))

def maybe_stringify_number(x):
    # converte numérico para string em parte dos casos
    if random.random() < P_STRING_NUMERIC_IN_FIELDS:
        return f"{x}"
    return x

def maybe_null(val, p=0.1):
    return None if random.random() < p else val

def dirty_state(uf):
    # introduz inconsistências: "SP", "sp", "São Paulo"
    if random.random() < P_CUSTOMER_INCONSISTENCY:
        variants = [uf, uf.lower(), "São Paulo" if uf.upper()=="SP" else uf]
        return random.choice(variants)
    return uf

def random_bool_inconsistent():
    # "true", "1", "yes", True, False...
    opts = ["true","1","yes","false","0","no", True, False]
    if random.random() < P_INCONSISTENT_IS_ACTIVE:
        return random.choice(opts)
    return True

def alnum(n=8):
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=n))

##Geração da Bronze – products

In [0]:
# Schema todo STRING para simular fonte raw heterogênea
prod_schema = T.StructType([
    T.StructField("product_id",    T.StringType(), True),
    T.StructField("product_name",  T.StringType(), True),
    T.StructField("category",      T.StringType(), True),
    T.StructField("subcategory",   T.StringType(), True),
    T.StructField("brand",         T.StringType(), True),
    T.StructField("cost_price",    T.StringType(), True),
    T.StructField("list_price",    T.StringType(), True),
    T.StructField("is_active",     T.StringType(), True),  # inconsistente
    T.StructField("last_update",   T.StringType(), True),
])

rows = []
categories = [
    ("Eletrônicos", ["Smartphones", "Notebooks", "TVs", "Acessórios"]),
    ("Casa", ["Cozinha", "Cama/Mesa/Banho", "Decoração"]),
    ("Vestuário", ["Camisetas", "Calçados", "Acessórios"])
]

for i in range(N_PRODUCTS):
    cat, subs = random.choice(categories)
    sub = random.choice(subs)
    pid = f"P{100000+i}"
    name = f"{random.choice(['Alpha','Pro','Max','Lite','Neo'])} {alnum(4)}"
    brand = None if random.random() < P_NULL_BRAND_SUBCATEGORY else random.choice(["Acme","Globex","Initech","Umbrella"])
    subc  = None if random.random() < P_NULL_BRAND_SUBCATEGORY else sub

    cost  = round(random.uniform(10, 2000), 2)
    price = round(cost * random.uniform(1.1, 2.5), 2)
    base = {
        "product_id": pid,
        "product_name": name,
        "category": cat,
        "subcategory": subc,
        "brand": brand,
        "cost_price": str(cost) if random.random() < 0.5 else None,
        "list_price": str(price),
        "is_active": random.choice(["Y", "N", None]),
        "last_update": random_date_mixed_formats(random_date_between(400))
    }
    rows.append(Row(**base))

df_products = spark.createDataFrame(rows, schema=prod_schema)
df_products.write.mode("overwrite").format("delta").saveAsTable(fqtn("products"))
display(df_products.limit(5))

##Geração da Bronze – customers


In [0]:
cust_schema = T.StructType([
    T.StructField("customer_id",       T.StringType(), True),
    T.StructField("customer_name",     T.StringType(), True),
    T.StructField("email",             T.StringType(), True),
    T.StructField("city",              T.StringType(), True),
    T.StructField("state",             T.StringType(), True),
    T.StructField("last_update_date",  T.StringType(), True),
])

ufs = ["SP","RJ","MG","PR","RS","SC","BA","PE","CE","DF"]
base_rows = []
for i in range(N_CUSTOMERS):
    cid = f"C{100000+i}"
    nome = fake.name()
    email = f"{nome.lower().replace(' ','')}@{random.choice(['mail.com','corp.com','example.org'])}"
    city  = fake.city()
    uf    = dirty_state(random.choice(ufs))

    # às vezes campos vazios
    if random.random() < P_EMPTY_FIELDS_CUSTOMER:
        if random.random() < 0.5:
            city = ""
        else:
            email = ""

    row = Row(
        customer_id      = cid,
        customer_name    = nome,
        email            = email,
        city             = city,
        state            = uf,
        last_update_date = random_date_mixed_formats(random_date_between(500))
                           if random.random() < 0.7 else str(random_date_between(500))
    )
    base_rows.append(row)

# Duplicar alguns customer_id com last_update_date diferente
extra = []
for _ in range(int(N_CUSTOMERS * P_CUSTOMER_DUP_DIFF_UPDATE)):
    r = random.choice(base_rows).asDict()
    r["last_update_date"] = random_date_mixed_formats(random_date_between(200))
    if random.random() < 0.6:
        r["city"]  = fake.city()
        r["state"] = dirty_state(random.choice(ufs))
    extra.append(Row(**r))

df_customers = spark.createDataFrame(
    base_rows + extra,
    schema = cust_schema
)
df_customers.write.mode("overwrite").format("delta").saveAsTable(fqtn("customers"))
display(df_customers.limit(5))


##Geração da Bronze – orders

In [0]:
P_NULL_CUSTOMER = 0.05  # Probability of customer_id being None

orders_schema = T.StructType([
    T.StructField("order_id",     T.StringType(), True),
    T.StructField("customer_id",  T.StringType(), True),
    T.StructField("order_date",   T.StringType(), True),
    T.StructField("order_status", T.StringType(), True),
    T.StructField("total_amount", T.StringType(), True),
])

customer_ids = [
    r["customer_id"]
    for r in df_customers.select("customer_id").distinct().collect()
]

rows = []
for i in range(N_ORDERS):
    oid = f"O{100000+i}"
    cust = None if random.random() < P_NULL_CUSTOMER else random.choice(customer_ids)
    order_date = random_date_mixed_formats(random_date_between(365))
    order_status = random_status_inconsistent()
    total_amount = str(round(random.uniform(50, 5000), 2))
    row = Row(
        order_id=oid,
        customer_id=cust,
        order_date=order_date,
        order_status=order_status,
        total_amount=total_amount
    )
    rows.append(row)

df_orders = spark.createDataFrame(rows, schema=orders_schema)
df_orders.write.mode("overwrite").format("delta").saveAsTable(fqtn("orders"))
display(df_orders.limit(5))

## Geração da Bronze – order_items

In [0]:
# Certifique-se de que estas variáveis e funções já estão definidas:
# N_ORDER_ITEMS, df_orders, df_products, random, Row, T, fqtn

# Se fqtn não estiver definido, adicione:
def fqtn(table_name):
    return f"workshop_modelagem.bronze.{table_name}"

items_schema = T.StructType([
    T.StructField("order_item_id",  T.StringType(), True),
    T.StructField("order_id",       T.StringType(), True),
    T.StructField("product_id",     T.StringType(), True),
    T.StructField("quantity",       T.StringType(), True),
    T.StructField("unit_price",     T.StringType(), True),
    T.StructField("discount_amount",T.StringType(), True),
    T.StructField("promotion_id",   T.StringType(), True),
    T.StructField("updated_at",     T.StringType(), True),
])

order_ids = [r["order_id"] for r in df_orders.select("order_id").distinct().collect()]
product_ids = [r["product_id"] for r in df_products.select("product_id").distinct().collect()]

rows = []
for i in range(N_ORDER_ITEMS):
    oid = random.choice(order_ids)
    pid = random.choice(product_ids)
    qty = random.randint(1, 5)
    unit = round(random.uniform(5, 1500), 2)
    disc = round(random.uniform(0, unit*0.3), 2) if random.random() > 0.3 else 0.0

    row = Row(
        order_item_id   = f"OI{100000+i}",
        order_id        = oid,
        product_id      = pid,
        quantity        = str(qty) if random.random() < 0.5 else qty,
        unit_price      = str(unit),
        discount_amount = str(disc),
        promotion_id    = None,
        updated_at      = None
    )
    rows.append(row)

df_order_items = spark.createDataFrame(rows, schema=items_schema)
df_order_items.write.mode("overwrite").format("delta").saveAsTable(fqtn("order_items"))
display(df_order_items.limit(5))