In [0]:
pip install faker

In [0]:
import random
import uuid
from faker import Faker
from pyspark.sql.types import *
from datetime import datetime, timedelta
from decimal import Decimal, ROUND_HALF_UP
import decimal

# --- SETUP INICIAL ---
fake = Faker('pt_BR')

# --- 1. DEFINIÇÃO DOS SCHEMAS ---

schema_cliente = StructType([
    StructField("id_cli", IntegerType(), False),
    StructField("nm_cli", StringType(), True),
    StructField("dt_nsct_cli", DateType(), True),
    StructField("email_cli", StringType(), True),
    StructField("tel_cli", StringType(), True),
    StructField("end_cli", StringType(), True),
    StructField("munic_cli", StringType(), True),
    StructField("uf_cli", StringType(), True),
    StructField("cep_cli", StringType(), True),
    StructField("rend_cli", DecimalType(10, 2), True),
    StructField("profi_cli", StringType(), True)
])

schema_conta = StructType([
    StructField("id_conta", StringType(), False),      # UUID ou Hash
    StructField("id_cli", IntegerType(), False),       # FK para tb_cliente
    StructField("num_conta", StringType(), True),      # Número da conta
    StructField("cod_agencia", StringType(), True),    # Código da agência
    StructField("vlr_saldo_atual", DecimalType(18, 2), True),
    StructField("vlr_limite_esp", DecimalType(18, 2), True),
    StructField("status_conta", StringType(), True),   # Ativa, Bloqueada, Encerrada
    StructField("dt_abertura", DateType(), True),
    StructField("ind_cartao_deb", BooleanType(), True) # True/False
])

schema_cartao = StructType([
    StructField("id_cartao", StringType(), False),
    StructField("id_conta", StringType(), False),
    StructField("bandeira_cartao", StringType(), True),
    StructField("categoria_cartao", StringType(), True),
    StructField("num_cartao_final", StringType(), True),
    StructField("dt_validade", StringType(), True),
    StructField("vlr_limite_tot", DecimalType(18, 2), True),
    StructField("vlr_limite_disp", DecimalType(18, 2), True),
    StructField("dia_vencimento", IntegerType(), True),
    StructField("status_cartao", StringType(), True)
])

schema_fatura = StructType([
    StructField("id_fatura", StringType(), False),
    StructField("id_cartao", StringType(), False),
    StructField("mes_ref", StringType(), True),
    StructField("dt_fechamento", DateType(), True),
    StructField("dt_vencimento", DateType(), True),
    StructField("vlr_compras_mes", DecimalType(18, 2), True),
    StructField("vlr_saldo_anterior", DecimalType(18, 2), True),
    StructField("vlr_encargos_totais", DecimalType(18, 2), True),
    StructField("vlr_ajustes_creditos", DecimalType(18, 2), True),
    StructField("vlr_total_fatura", DecimalType(18, 2), True),
    StructField("vlr_minimo", DecimalType(18, 2), True),
    StructField("status_fatura", StringType(), True)
])


schema_extrato = StructType([
    StructField("id_movimentacao", StringType(), False),
    StructField("id_conta", StringType(), False),
    StructField("ts_movimentacao", TimestampType(), True),
    StructField("dt_contabil", DateType(), True),
    StructField("vlr_transacao", DecimalType(18, 2), True),
    StructField("tp_operacao", StringType(), True),
    StructField("ds_historico", StringType(), True),
    StructField("vlr_saldo_pos_mov", DecimalType(18, 2), True),
    StructField("tp_instrumento", StringType(), True),
    StructField("id_transacao_ref", StringType(), True)
])

schema_pagamento = StructType([
    StructField("id_pagamento", StringType(), False),
    StructField("id_fatura", StringType(), False),
    StructField("ts_pagamento", TimestampType(), True),
    StructField("dt_liquidacao", DateType(), True),
    StructField("vlr_pago", DoubleType(), True),
    StructField("meio_pagamento", StringType(), True),
    StructField("tp_pagamento", StringType(), True),
    StructField("ind_atraso", BooleanType(), True),
    StructField("id_autenticacao", StringType(), True)
])

schema_transacoes_cartao = StructType([
    StructField("id_transacao", StringType(), False),
    StructField("id_cartao", StringType(), False),
    StructField("ts_transacao", TimestampType(), True),
    StructField("vlr_transacao", DecimalType(18, 2), True),
    StructField("nm_estabelecimento", StringType(), True),
    StructField("categoria_trans", StringType(), True),
    StructField("qtde_parcelas", IntegerType(), True),
    StructField("num_parcela", IntegerType(), True),
    StructField("ind_contato", BooleanType(), True),
    StructField("status_transacao", StringType(), True)
])

#2 --- Funcoes 
def gerar_dados_clientes(qtd_registros=10):

    dados = []
    
    for i in range(1, qtd_registros + 1):
        registro = (
            i,                                      # id_cli
            fake.name(),                            # nm_cli
            fake.date_of_birth(minimum_age=18),     # dt_nsct_cli
            fake.email(),                           # email_cli
            fake.phone_number(),                    # tel_cli
            fake.street_address(),                  # end_cli
            fake.city(),                            # munic_cli
            fake.state_abbr(),                      # uf_cli
            fake.postcode().replace("-", ""),       # cep_cli
            decimal.Decimal(f"{random.uniform(1500, 30000):.2f}"),
            fake.job()                              # profi_cli
        )
        dados.append(registro)
    
    return spark.createDataFrame(dados, schema=schema_cliente)

def gerar_dados_contas(lista_id_clientes, qtd_por_cliente=1):

    dados_contas = []
    
    status_opcoes = ['ATIVA', 'BLOQUEADA', 'INATIVA']
    
    for id_cliente in lista_id_clientes:
        for _ in range(qtd_por_cliente):
                    
            saldo = decimal.Decimal(f"{random.uniform(-500, 50000):.2f}")
            limite = decimal.Decimal(random.choice([1000, 2000, 5000, 10000]))
            
            registro = (
                str(uuid.uuid4()),                  # id_conta (Gerando um UUID único)
                id_cliente,                         # id_cli (FK)
                str(fake.random_number(digits=8)),  # num_conta
                str(fake.random_number(digits=4)),  # cod_agencia
                saldo,                              # vlr_saldo_atual
                limite,                             # vlr_limite_esp
                random.choice(status_opcoes),       # status_conta
                fake.date_between(start_date='-10y', end_date='today'), # dt_abertura
                random.choice([True, False])        # ind_cartao_deb
            )
            dados_contas.append(registro)
    
    return spark.createDataFrame(dados_contas, schema=schema_conta)


def gerar_dados_cartoes(lista_id_contas, qtd_por_conta=1):

    dados_cartoes = []
    
    bandeiras = ['VISA', 'MASTERCARD', 'ELO', 'AMEX']
    categorias = ['BASIC', 'GOLD', 'PLATINUM', 'BLACK', 'INFINITE']
    status_list = ['ATIVO', 'BLOQUEADO', 'CANCELADO', 'AGUARDANDO ATIVAÇÃO']
    # Definimos a precisão de 2 casas decimais (financeiro)
    TWO_PLACES = Decimal("0.01")

    for id_conta in lista_id_contas:
        for _ in range(qtd_por_conta):
            # 1. Gerar e arredondar o limite total
            limite_total = Decimal(str(random.uniform(1000, 50000))).quantize(TWO_PLACES, rounding=ROUND_HALF_UP)
            
            # 2. Gerar o multiplicador
            multiplicador = Decimal(str(random.uniform(0.1, 1.0)))
            
            # 3. Calcular o disponível e ARREDONDAR IMEDIATAMENTE
            limite_disp = (limite_total * multiplicador).quantize(TWO_PLACES, rounding=ROUND_HALF_UP)
            
            registro = (
                str(uuid.uuid4()),
                id_conta,
                random.choice(bandeiras),
                random.choice(categorias),
                str(random.randint(1000, 9999)),
                fake.credit_card_expire(),
                limite_total,      # Agora garantido com 2 casas
                limite_disp,       # Agora garantido com 2 casas
                random.randint(1, 28),
                random.choice(status_list)
            )
            dados_cartoes.append(registro)
            
    return spark.createDataFrame(dados_cartoes, schema=schema_cartao)

def gerar_dados_faturas(lista_id_cartoes):
    dados_faturas = []
    status_opcoes = ['ABERTA', 'FECHADA', 'PAGA', 'PAGAMENTO_PARCIAL', 'ATRASADA']
    
    # Constantes para precisão
    PRECISAO = Decimal("0.01")
    TAXA_ENCARGOS = Decimal("0.15")
    TAXA_MINIMO = Decimal("0.15")
    
    for id_cartao in lista_id_cartoes:
        # 1. Converter inputs iniciais e arredondar
        compras = Decimal(str(random.uniform(100, 8000))).quantize(PRECISAO, rounding=ROUND_HALF_UP)
        anterior = Decimal(str(random.choice([0, 0, 0, 500, 1200]))).quantize(PRECISAO, rounding=ROUND_HALF_UP)
        
        # 2. Cálculos (agora apenas entre Decimals)
        encargos = (anterior * TAXA_ENCARGOS).quantize(PRECISAO, rounding=ROUND_HALF_UP) if anterior > 0 else Decimal("0.00")
        ajustes = Decimal(str(random.choice([0, 0, 50, 100]))).quantize(PRECISAO, rounding=ROUND_HALF_UP)
        
        total = (compras + anterior + encargos) - ajustes
        minimo = (total * TAXA_MINIMO).quantize(PRECISAO, rounding=ROUND_HALF_UP)
        
        vencimento = fake.date_this_year()
        fechamento = vencimento - timedelta(days=10)
        
        registro = (
            str(uuid.uuid4()),
            id_cartao,
            vencimento.strftime("%Y-%m"),
            fechamento,
            vencimento,
            compras,
            anterior,
            encargos,
            ajustes,
            total,
            minimo,
            random.choice(status_opcoes)
        )
        dados_faturas.append(registro)
            
    return spark.createDataFrame(dados_faturas, schema=schema_fatura)

def gerar_dados_extrato(lista_id_contas, qtd_mov_por_conta=5):
    dados_extrato = []
    
    # Define a escala financeira (2 casas decimais)
    PRECISAO = Decimal("0.01")
    
    operacoes = [
        ('CREDITO', 'PIX Recebido', 'PIX'),
        ('DEBITO', 'Compra no Cartão', 'CARTAO'),
        ('DEBITO', 'Pagamento de Boleto', 'BOLETO'),
        ('CREDITO', 'Transferência Recebida', 'TED'),
        ('DEBITO', 'Saque Eletrônico', 'SAQUE')
    ]
    
    for id_conta in lista_id_contas:
        # 1. Já inicia o saldo com 2 casas decimais
        saldo_simulado = Decimal(str(random.uniform(1000, 5000))).quantize(PRECISAO, rounding=ROUND_HALF_UP)
        for _ in range(qtd_mov_por_conta):
            tp_op, ds, inst = random.choice(operacoes)
            # 2. Garante que o valor da movimentação tenha 2 casas decimais
            valor = Decimal(str(random.uniform(10, 500))).quantize(PRECISAO, rounding=ROUND_HALF_UP)
            if tp_op == 'DEBITO':
                saldo_simulado -= valor
            else:
                saldo_simulado += valor    
            ts = fake.date_time_between(start_date='-30d', end_date='now')
            registro = (
                str(uuid.uuid4()),
                id_conta,
                ts,
                ts.date(),
                valor,              # Agora seguro para o Spark
                tp_op,
                ds,
                saldo_simulado,     # Agora seguro para o Spark
                inst,
                str(uuid.uuid4())
            )
            dados_extrato.append(registro)
    return spark.createDataFrame(dados_extrato, schema=schema_extrato)

def gerar_dados_pagamentos(df_faturas):
    dados_pagamentos = []
    
    # Coletamos os dados necessários das faturas para gerar pagamentos realistas
    faturas = df_faturas.select("id_fatura", "vlr_total_fatura", "dt_vencimento").collect()
    
    meios = ['PIX', 'BOLETO', 'DEBITO_EM_CONTA', 'LOTERICA']
    tipos = ['TOTAL', 'PARCIAL', 'MINIMO']
    
    for fatura in faturas:
        # Simula se o pagamento foi feito no dia ou com variação
        dias_variacao = random.randint(-5, 5)
        data_pagamento_ts = fake.date_time_between_dates(
            datetime_start=fatura.dt_vencimento - timedelta(days=10),
            datetime_end=fatura.dt_vencimento + timedelta(days=5)
        )
        
        vlr_pago = float(fatura.vlr_total_fatura)
        tp_pg = 'TOTAL'
        
        # Simulação ocasional de pagamento parcial
        if random.random() < 0.2:
            vlr_pago = vlr_pago * random.uniform(0.2, 0.9)
            tp_pg = 'PARCIAL'

        atraso = data_pagamento_ts.date() > fatura.dt_vencimento
        
        registro = (
            str(uuid.uuid4()),
            fatura.id_fatura,
            data_pagamento_ts,
            data_pagamento_ts.date() + timedelta(days=1), # Liquidação D+1
            vlr_pago,
            random.choice(meios),
            tp_pg,
            atraso,
            str(uuid.uuid4()).upper().replace("-", "")[:20] # ID Autenticação
        )
        dados_pagamentos.append(registro)
            
    return spark.createDataFrame(dados_pagamentos, schema=schema_pagamento)

def gerar_dados_transacoes_cartao(lista_id_cartoes, qtd_trans_por_cartao=10):
    dados_transacoes = []
    
    categorias = ['ALIMENTAÇÃO', 'LAZER', 'TRANSPORTE', 'SAÚDE', 'EDUCAÇÃO', 'SERVIÇOS', 'VESTUÁRIO']
    estabelecimentos = ['Mercado Central', 'Posto Ipiranga', 'Netflix', 'Amazon', 'Farmácia Preço Baixo', 'Restaurante Sabor', 'Uber']
    status_opcoes = ['APROVADA', 'APROVADA', 'APROVADA', 'NEGADA', 'ESTORNADA']
    
    for id_cartao in lista_id_cartoes:
        for _ in range(qtd_trans_por_cartao):
            parcelas = random.choice([1, 1, 1, 2, 3, 10, 12])
            vlr_total = random.uniform(5.0, 2500.0)
            
            # Cálculo do valor da parcela convertido para Decimal
            vlr_parcela = decimal.Decimal(vlr_total / parcelas).quantize(decimal.Decimal('0.00'))
            
            num_parc = 1 if parcelas == 1 else random.randint(1, parcelas)
            
            registro = (
                str(uuid.uuid4()),
                id_cartao,
                fake.date_time_between(start_date='-60d', end_date='now'),
                vlr_parcela, # Agora é um objeto decimal.Decimal
                random.choice(estabelecimentos),
                random.choice(categorias),
                parcelas,
                num_parc,
                random.choice([True, False]),
                random.choice(status_opcoes)
            )
            dados_transacoes.append(registro)
            
    return spark.createDataFrame(dados_transacoes, schema=schema_transacoes_cartao)


# 3 - Insert de dados
def inserir_dados_delta_table(df, workspace, schema, nome_tabela, modo="append"):
    try:
        df.write.format("delta").mode("append").saveAsTable(f"{workspace}.{schema}.{nome_tabela}")
        print(f"Sucesso: Dados inseridos na tabela '{nome_tabela}'.")
    except Exception as e:
        print(f"Erro ao inserir dados: {e}")

In [0]:
# 1. Geramos 10 clientes
df_clientes = gerar_dados_clientes(10)

# 2. Extraímos a lista de IDs de clientes para usar como FK
ids_clientes = [row.id_cli for row in df_clientes.select("id_cli").collect()]

# 3. Geramos as contas vinculadas a esses IDs
df_contas = gerar_dados_contas(ids_clientes, qtd_por_cliente=1)

#4. Extrai ID da conta
ids_contas = [r.id_conta for r in df_contas.select("id_conta").collect()]

#5. Geramos os cartoes
df_cartoes = gerar_dados_cartoes(ids_contas)

#6. geramos os ids dos cartoes
ids_cartoes = [r.id_cartao for r in df_cartoes.select("id_cartao").collect()]

#7. geramos as faturas dos cartoes
df_faturas = gerar_dados_faturas(ids_cartoes)

#8. geramos as pagamentos de faturas do cartao
df_faturas_pagamento = gerar_dados_pagamentos(df_faturas)

#9. De posse dos ids das contas, geramos o extrato do mesmo
df_extrato = gerar_dados_extrato(ids_contas)

#10. Geramos a base de transacoes de cartao
df_transacoes = gerar_dados_transacoes_cartao(ids_cartoes)


In [0]:
inserir_dados_delta_table(df_clientes, 'capstonebricks', 'bronze', 'tb_cliente')

In [0]:
inserir_dados_delta_table(df_clientes, 'capstonebricks', 'bronze', 'tb_cliente', 'overwrite')


In [0]:
inserir_dados_delta_table(df_contas, 'capstonebricks', 'bronze', 'tb_conta')
inserir_dados_delta_table(df_cartoes, 'capstonebricks', 'bronze', 'tb_cartao')
inserir_dados_delta_table(df_faturas, 'capstonebricks', 'bronze', 'tb_fatura_cartao')
inserir_dados_delta_table(df_faturas_pagamento, 'capstonebricks', 'bronze', 'tb_pagamento_fatura')
inserir_dados_delta_table(df_extrato, 'capstonebricks', 'bronze', 'tb_extrato')
inserir_dados_delta_table(df_transacoes, 'capstonebricks', 'bronze', 'tb_transacoes_cartao')