## Configurações

In [0]:
%pip install faker

## Inicialização do Ambiente e Esquemas

In [0]:
# --- Imports Padrão ---
import pandas as pd
import numpy as np
import uuid
import random
import calendar
import functools
import sys
from datetime import datetime, timedelta
from typing import Iterator, List
from faker import Faker

from pyspark.sql import DataFrame, Window, SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DoubleType, DateType, TimestampType, BooleanType

spark = SparkSession.builder.getOrCreate()

GLOBAL_SEED = 42
Faker.seed(GLOBAL_SEED) # Seed para a biblioteca Faker
print(f"INFO: Sessão Spark importada. Seed global definida: {GLOBAL_SEED}")


ANO_ESTATISTICA = 2024
LIMITE_MUNICIPIOS_PROCESSADOS = 30
FATOR_ESCALA_VOLUME = 0.0005
TX_POR_CLIENTE_ESPERADO = 150 
PROBABILIDADE_TRANSACAO_INTERMUNICIPAL = 0.20

# Dicionários de configuração 
config_geracao = {
    'PROB_CONTA_ALTO_RISCO': 0.05,
    'MAX_DIAS_CADASTRO_CHAVE_RISCO': 7,
    'DIAS_CHAVE_CONSIDERADA_RECENTE': 15,
    'PROBABILIDADE_FRAUDE_CONTA_DESTINO_RISCO': 0.60,
    'PROBABILIDADE_FRAUDE_CHAVE_RECENTE': 0.40,
    'PROBABILIDADE_FRAUDE_BASE': 0.35, 
    'IDADE_MINIMA_ALVO_ENG_SOCIAL': 55, 
    'PROBABILIDADE_FRAUDE_ENG_SOCIAL_ALVO': 0.80,
    'MULTIPLICADOR_MAGNITUDE_FRAUDE': 30.0,
    'PROBABILIDADE_ABAIXO_RADAR': 0.4,
    'VALORES_LIMITE_RADAR': [999.90, 499.90, 1999.90],
    'PROBABILIDADES_TIPO_FRAUDE': {
        "valor_atipico": 0.80,
        "engenharia_social": 0.0,
        "triangulacao_conta_laranja": 0.10,
        "consolidacao_fundos": 0.10
    },
    'MIN_FONTES_CONSOLIDACAO': 10,  
    'MAX_FONTES_CONSOLIDACAO': 30,  
    'JANELA_SEG_CONSOLIDACAO': 600, 
    'FANOUT_MIN_PROFUNDIDADE': 2,
    'FANOUT_MAX_PROFUNDIDADE': 3,
    'FANOUT_MIN_LARGURA': 2,
    'FANOUT_MAX_LARGURA': 4,
    'PROBABILIDADE_TESTE_CONTA': 0.3,
    'PROBABILIDADE_ATAQUE_MADRUGADA': 0.70,
}

perfis_de_uso_dict = {} 
LISTA_TIPOS_CONTA_LOCAL = [1, 2, 4] 
LISTA_ISPBS_LOCAL = ["12345678", "87654321"] 

SCHEMA_CLIENTES_UDF = StructType([
    StructField("nome", StringType(), True),
    StructField("registro_nacional", StringType(), True),
    StructField("nascido_em", DateType(), True)
])
SCHEMA_CLIENTES_FINAL = StructType([
    StructField("id", StringType(), False), 
    StructField("nome", StringType(), True),
    StructField("id_natureza", IntegerType(), True), StructField("registro_nacional", StringType(), True),
    StructField("nascido_em", DateType(), True), StructField("estado_ibge", IntegerType(), True),
    StructField("municipio_ibge", IntegerType(), True)
])
SCHEMA_CONTAS_UDF = StructType([
    StructField("id", StringType(), False), 
    StructField("saldo", DoubleType(), True),
    StructField("aberta_em", DateType(), True), StructField("agencia", StringType(), True),
    StructField("numero", StringType(), True), StructField("id_tipo_conta", IntegerType(), True),
    StructField("ispb_instituicao", StringType(), True), StructField("id_cliente", StringType(), True),
    StructField("is_high_risk", IntegerType(), True)
])
SCHEMA_CONTAS_FINAL = StructType([
    StructField("id", StringType(), False), 
    StructField("saldo", DoubleType(), True),
    StructField("aberta_em", DateType(), True), StructField("agencia", StringType(), True),
    StructField("numero", StringType(), True), StructField("id_tipo_conta", IntegerType(), True),
    StructField("ispb_instituicao", StringType(), True), StructField("id_cliente", StringType(), False), 
    StructField("is_high_risk", IntegerType(), True), StructField("estado_ibge", IntegerType(), True),
    StructField("municipio_ibge", IntegerType(), True)
])
SCHEMA_CHAVES_UDF = StructType([
    StructField("id", StringType(), False), 
    StructField("chave", StringType(), True),
    StructField("id_tipo_chave", IntegerType(), True), StructField("cadastrada_em", DateType(), True),
    StructField("id_conta", StringType(), True)
])
SCHEMA_CHAVES_PIX_FINAL = StructType([
    StructField("id", StringType(), False), 
    StructField("chave", StringType(), True),
    StructField("id_tipo_chave", IntegerType(), True), StructField("cadastrada_em", DateType(), True),
    StructField("id_conta", StringType(), False), 
    StructField("estado_ibge", IntegerType(), True),
    StructField("municipio_ibge", IntegerType(), True)
])
SCHEMA_PARES = StructType([
    StructField("id_conta_origem", StringType(), True),
    StructField("id_conta_destino", StringType(), True)
])
SCHEMA_TRANSACOES_UDF = StructType([
    StructField("id", StringType(), False), 
    StructField("valor", DoubleType(), True),
    StructField("data", TimestampType(), True), StructField("mensagem", StringType(), True),
    StructField("id_conta_origem", StringType(), True), StructField("id_conta_destino", StringType(), True),
    StructField("id_tipo_iniciacao_pix", IntegerType(), True), StructField("id_finalidade_pix", IntegerType(), True),
    StructField("is_fraud", IntegerType(), True), StructField("fraud_type", StringType(), True),
    StructField("id_transacao_cadeia_pai", StringType(), True)
])
SCHEMA_TRANSACOES_FINAL = StructType([
    StructField("id", StringType(), False), 
    StructField("valor", DoubleType(), True),
    StructField("data", TimestampType(), True), StructField("mensagem", StringType(), True),
    StructField("id_conta_origem", StringType(), True), StructField("id_conta_destino", StringType(), True),
    StructField("id_tipo_iniciacao_pix", IntegerType(), True), StructField("id_finalidade_pix", IntegerType(), True),
    StructField("is_fraud", IntegerType(), True), StructField("fraud_type", StringType(), True),
    StructField("id_transacao_cadeia_pai", StringType(), True),
    StructField("estado_ibge", IntegerType(), True)
])

print("INFO: CÉLULA 1 (Refatorada) - Configurações, Seed e Schemas carregados.")

# Funções para geração de Agentes (CLientes, Contas e Chaves PIX)

In [0]:

@F.pandas_udf(SCHEMA_CLIENTES_UDF)
def _gerar_detalhes_cliente_udf(id_natureza: pd.Series) -> pd.DataFrame:
    # Faker.seed() já foi chamado na Célula 1, garantindo determinismo aqui
    local_fake = Faker('pt_BR')
    nomes, registros, nascimentos = [], [], []
    for nat in id_natureza:
        if nat == 1:
            nomes.append(local_fake.name())
            registros.append(local_fake.cpf())
            nascimentos.append(local_fake.date_of_birth(minimum_age=18, maximum_age=80))
        else:
            nomes.append(local_fake.company())
            registros.append(local_fake.cnpj())
            nascimentos.append(local_fake.date_between(start_date='-20y', end_date='-1y'))
    return pd.DataFrame({"nome": nomes, "registro_nacional": registros, "nascido_em": nascimentos})

def _gerar_clientes(num_pf: int, num_pj: int, estado_ibge: int, municipio_ibge: int) -> DataFrame:
    df_base_pf = spark.range(num_pf).withColumn("id_natureza", F.lit(1))
    df_base_pj = spark.range(num_pj).withColumn("id_natureza", F.lit(2))
    df_base_clientes = df_base_pf.union(df_base_pj)
    return (df_base_clientes.withColumn("id", F.expr("uuid()"))
        .withColumn("detalhes", _gerar_detalhes_cliente_udf(F.col("id_natureza")))
        .select("id", F.col("detalhes.nome").alias("nome"), "id_natureza",
                F.col("detalhes.registro_nacional").alias("registro_nacional"),
                F.col("detalhes.nascido_em").alias("nascido_em"),
                F.lit(estado_ibge).alias("estado_ibge"), F.lit(municipio_ibge).alias("municipio_ibge")))

# UDF Python que aceita 'seed'
def _gerar_detalhes_conta_python(iterator: Iterator[pd.DataFrame], config: dict, tipos_conta: list, ispbs: list, ano_estatistica: int, seed: int) -> Iterator[pd.DataFrame]:
    # Garantir determinismo dentro da UDF
    random.seed(seed)
    np.random.seed(seed)
    local_fake = Faker('pt_BR')
    Faker.seed(seed) # Semeia a instância local do Faker
    
    data_limite_abertura = datetime(ano_estatistica, 1, 1).date()
    for lote in iterator:
        resultados = []
        for row in lote.itertuples(index=False):
            is_high_risk = 1 if random.random() < config['PROB_CONTA_ALTO_RISCO'] else 0
            if is_high_risk == 1:
                start_date_relativa = timedelta(days=180)
                data_inicio_recente = data_limite_abertura - start_date_relativa
                aberta_em = local_fake.date_between(start_date=data_inicio_recente, end_date=data_limite_abertura)
            else:
                aberta_em = local_fake.date_between(start_date='-10y', end_date=data_limite_abertura)
            if row.id_natureza == 1:
                saldo = round(np.random.lognormal(mean=6, sigma=1.5), 2)
                tipo_conta = random.choice(tipos_conta)
            else:
                saldo = round(np.random.lognormal(mean=9, sigma=1.8), 2)
                tipo_conta = random.choice([c for c in tipos_conta if c in [1, 3]])
            resultados.append({"id": str(uuid.uuid4()), "saldo": saldo, "aberta_em": aberta_em, "agencia": local_fake.numerify('####'), 
                               "numero": local_fake.numerify('#####-#'), "id_tipo_conta": tipo_conta, "ispb_instituicao": random.choice(ispbs),
                               "id_cliente": row.id_cliente, "is_high_risk": is_high_risk})
        yield pd.DataFrame(resultados)

def _gerar_contas(df_clientes: DataFrame, estado_ibge: int, municipio_ibge: int, seed: int) -> DataFrame:
    # Adicionada seed ao F.rand()
    df_clientes_com_num_contas = df_clientes.withColumn("num_contas", 
        F.when(F.col("id_natureza") == 1, F.floor(F.rand(seed=seed) * 2) + 1)
         .otherwise(F.floor(F.rand(seed=seed + 1) * 5) + 1))
         
    df_contas_base = df_clientes_com_num_contas.select(F.col("id").alias("id_cliente"), "id_natureza", F.explode(F.sequence(F.lit(1), F.col("num_contas"))))
    

    gerador_com_contexto = functools.partial(_gerar_detalhes_conta_python, 
                                             config=config_geracao, 
                                             tipos_conta=LISTA_TIPOS_CONTA_LOCAL, 
                                             ispbs=LISTA_ISPBS_LOCAL, 
                                             ano_estatistica=ANO_ESTATISTICA,
                                             seed=seed) # <-- Seed injetada
                                             
    return (df_contas_base.mapInPandas(gerador_com_contexto, schema=SCHEMA_CONTAS_UDF)
            .withColumn("estado_ibge", F.lit(estado_ibge)).withColumn("municipio_ibge", F.lit(municipio_ibge)))

def _gerar_detalhes_chave_udf(iterator: Iterator[pd.DataFrame], config: dict, seed: int) -> Iterator[pd.DataFrame]:

    random.seed(seed)
    np.random.seed(seed)
    local_fake = Faker('pt_BR')
    Faker.seed(seed) 

    for lote in iterator:
        resultados = []
        for row in lote.itertuples(index=False):
            try:
                data_abertura_obj = pd.to_datetime(row.aberta_em).date()
                dias_para_cadastrar = random.randint(1, config['MAX_DIAS_CADASTRO_CHAVE_RISCO']) if hasattr(row, 'is_high_risk') and row.is_high_risk == 1 else random.randint(1, 90)
                cadastrada_em = data_abertura_obj + timedelta(days=dias_para_cadastrar)
                if row.id_natureza == 1:
                    tipos_possiveis = {1: row.registro_nacional, 2: local_fake.email(), 3: local_fake.phone_number(), 4: str(uuid.uuid4())}
                else:
                    tipos_possiveis = {5: row.registro_nacional, 2: local_fake.company_email(), 4: str(uuid.uuid4())}
                tipo_chave = random.choice(list(tipos_possiveis.keys()))
                resultados.append({"id": str(uuid.uuid4()), "chave": tipos_possiveis[tipo_chave], "id_tipo_chave": tipo_chave, "cadastrada_em": cadastrada_em, "id_conta": row.id_conta})
            except Exception as e: 
                sys.stderr.write(f"ERRO NA GERAÇÃO DE CHAVES: {e}, DADOS: {row}\n")
                raise e
        if resultados: yield pd.DataFrame(resultados)

def _gerar_chaves_pix(df_contas_completo: DataFrame, df_clientes_completo: DataFrame, estado_ibge: int, municipio_ibge: int, seed: int) -> DataFrame:
    df_contas_com_cliente = df_contas_completo.join(df_clientes_completo, df_contas_completo.id_cliente == df_clientes_completo.id, "inner").select(df_contas_completo.id.alias("id_conta"), "id_natureza", "registro_nacional", "aberta_em", df_contas_completo.is_high_risk)
    
    # Passando a seed para a UDF
    gerador_com_contexto = functools.partial(_gerar_detalhes_chave_udf, 
                                             config=config_geracao,
                                             seed=seed) # <-- Seed injetada
                                             
    return (df_contas_com_cliente.mapInPandas(gerador_com_contexto, schema=SCHEMA_CHAVES_UDF)
            .withColumn("estado_ibge", F.lit(estado_ibge)).withColumn("municipio_ibge", F.lit(municipio_ibge)))



# Funções de Geração de Eventos (Transações) e sazonalidade

In [0]:
def _obter_params_tempo(ano: int, mes: int) -> tuple:
    num_dias = calendar.monthrange(ano, mes)[1]
    primeiro_dia = datetime(ano, mes, 1)
    ultimo_dia = datetime(ano, mes, num_dias, 23, 59, 59)
    return primeiro_dia, (ultimo_dia - primeiro_dia).total_seconds()

# Função interna agora usa 'random_instance'
def _aplicar_horario_suspeito(data_transacao, config, random_instance):
    if random_instance.random() < config.get('PROBABILIDADE_ATAQUE_MADRUGADA', 0.70):
        return data_transacao.replace(hour=random_instance.randint(1, 4), minute=random_instance.randint(0, 59))
    return data_transacao

def _obter_perfil_sazonal_mes(ano: int, mes: int) -> dict:
    pesos_dia_semana = { 0: 1.2, 1: 1.1, 2: 1.1, 3: 1.1, 4: 1.3, 5: 0.8, 6: 0.7 }
    pesos_comemorativos = { (1, 1): 0.5, (5, 10): 2.0, (6, 12): 1.8, (8, 11): 1.8, (10, 12): 1.5, (11, 25): 3.0, (12, 20): 2.0, (12, 23): 2.5, (12, 24): 3.0, (12, 25): 1.0, (12, 31): 1.5 }
    num_dias = calendar.monthrange(ano, mes)[1]
    perfil_final = {}
    for dia in range(1, num_dias + 1):
        data = datetime(ano, mes, dia)
        dia_semana = data.weekday()
        peso = pesos_dia_semana.get(dia_semana, 1.0)
        if (mes, dia) in pesos_comemorativos: peso = pesos_comemorativos[(mes, dia)]
        perfil_final[dia] = peso
    return perfil_final

def _gerar_detalhes_transacao_python_vetorizado(
    iterator: Iterator[pd.DataFrame], ano: int, mes: int, 
    config: dict, perfis_uso: list, 
    perfil_sazonal: dict, seed: int 
) -> Iterator[pd.DataFrame]:
    

    import random
    import uuid
    from datetime import timedelta
    from itertools import chain
    primeiro_dia, _ = _obter_params_tempo(ano, mes) 
    colunas_finais_transacoes = ["id", "valor", "data", "mensagem", "id_conta_origem", "id_conta_destino", "id_tipo_iniciacao_pix", "id_finalidade_pix", "is_fraud", "fraud_type", "id_transacao_cadeia_pai"]
    PROB_RUIDO = 0.25; MENSAGENS_RUIDO = ["Pagamento de Boleto", "Lanchonete", "Uber", "Recarga de Celular"]
    

    try:
        primeiro_lote = next(iterator)
    except StopIteration:

        return
        
    partition_id = int(primeiro_lote['pid'].iloc[0])
    lote_seed = int(seed) + partition_id 
    

    local_random = random.Random(lote_seed)
    local_np_random = np.random.RandomState(lote_seed)
    
   
    
    for lote in chain([primeiro_lote], iterator):
        n = len(lote)
        if n == 0: continue
        
        
        pool_de_contas_reais = pd.concat([
            lote['id_conta_origem'], 
            lote['id_conta_destino']
        ]).unique().tolist()
        
        if not pool_de_contas_reais:
            pool_de_contas_reais = [str(uuid.uuid4()) for _ in range(10)]
        
        
        num_dias = calendar.monthrange(ano, mes)[1]; dias_do_mes = list(range(1, num_dias + 1))
        pesos = [perfil_sazonal.get(dia, 1.0) for dia in dias_do_mes]
        probabilidades = np.array(pesos) / np.sum(pesos)
        
       
        dias_sorteados = local_np_random.choice(dias_do_mes, size=n, p=probabilidades)
        segundos_no_dia = local_np_random.uniform(0, 86399, n)
        
        deltas_dias = pd.to_timedelta(dias_sorteados - 1, unit='D')
        deltas_segundos = pd.to_timedelta(segundos_no_dia, unit='s')
        lote['data'] = primeiro_dia + deltas_dias + deltas_segundos
        
    
        lote['pagador_nascido_em'] = pd.to_datetime(lote['pagador_nascido_em'], errors='coerce')
        lote['pagador_idade'] = (lote['data'] - lote['pagador_nascido_em']).dt.days / 365.25
        lote['pagador_idade'] = lote['pagador_idade'].fillna(-1) 
        lote['chave_destino_cadastrada_em'] = pd.to_datetime(lote['chave_destino_cadastrada_em'])
        delta_dias = (lote['data'].dt.date - lote['chave_destino_cadastrada_em'].dt.date).dt.days 
        lote['is_high_risk'] = lote['is_high_risk'].fillna(0).astype(int)
        
        idade_alvo = config.get('IDADE_MINIMA_ALVO_ENG_SOCIAL', 65)
        prob_fraude_alvo = config.get('PROBABILIDADE_FRAUDE_ENG_SOCIAL_ALVO', 0.75)
        
        cond_idade_alvo = (lote['pagador_idade'] >= idade_alvo)
        cond_conta_risco = (lote['is_high_risk'] == 1)
        cond_chave_recente = ((delta_dias >= 0) & (delta_dias <= config['DIAS_CHAVE_CONSIDERADA_RECENTE']))
        
        prob_fraude_dinamica = np.select(
            [cond_idade_alvo, cond_conta_risco, cond_chave_recente], 
            [prob_fraude_alvo, config['PROBABILIDADE_FRAUDE_CONTA_DESTINO_RISCO'], config['PROBABILIDADE_FRAUDE_CHAVE_RECENTE']], 
            default=config['PROBABILIDADE_FRAUDE_BASE']
        )
        
       
        lote['is_fraud'] = (local_np_random.rand(n) < prob_fraude_dinamica).astype(int)
        
       
        multiplicadores = np.select([lote['is_fraud'] == 1, (~(lote['is_fraud'] == 1)) & (local_np_random.rand(n) < 0.04)], [config['MULTIPLICADOR_MAGNITUDE_FRAUDE'], 2.5], default=1.0)
        valores_calculados = np.maximum(0.01, local_np_random.lognormal(mean=np.log(150), sigma=0.8, size=n) * multiplicadores).round(2)
        condicao_abaixo_radar = (lote['is_fraud'] == 1) & (local_np_random.rand(n) < config.get('PROBABILIDADE_ABAIXO_RADAR', 0.4))
        
        
        lote['valor'] = np.where(condicao_abaixo_radar, local_np_random.choice(config.get('VALORES_LIMITE_RADAR', [999.90]), n), valores_calculados)
        
        
        probs_tipo_fraude = config.get('PROBABILIDADES_TIPO_FRAUDE'); 
        tipos_normais = {k: v for k, v in probs_tipo_fraude.items() if k != 'engenharia_social'}
        if sum(tipos_normais.values()) == 0: tipos_normais = {'valor_atipico': 1.0}
        total_prob_normal = sum(tipos_normais.values())
        tipos_normais_norm = {k: v / total_prob_normal for k, v in tipos_normais.items()}
        
        # Usa o gerador semeado da partição
        tipos_fraude_aleatorios = local_np_random.choice(list(tipos_normais_norm.keys()), n, p=list(tipos_normais_norm.values()))
        
        lote['fraud_type'] = np.where(
            (lote['is_fraud'] == 1) & (cond_idade_alvo), 
            "engenharia_social", 
            np.where(lote['is_fraud'] == 1, tipos_fraude_aleatorios, None) 
        )
        
        # --- PREPARAÇÃO FINAL DO LOTE (Determinístico)
        lote['id'] = [str(uuid.uuid4()) for _ in range(n)]; lote['mensagem'] = "Pagamento via Pix"; 
        lote['id_tipo_iniciacao_pix'] = local_np_random.randint(1, 4, n); 
        lote['id_finalidade_pix'] = local_np_random.randint(1, 5, n); 
        lote['id_transacao_cadeia_pai'] = None
        
        # Remove a coluna 'pid' antes de finalizar
        colunas_para_dropar = ['chave_destino_cadastrada_em', 'is_high_risk', 'id_tipo_conta_origem', 'id_tipo_conta_destino', 'pagador_nascido_em', 'pagador_idade', 'pid']
        lote_final = lote.drop(columns=[col for col in colunas_para_dropar if col in lote.columns])
        
        # --- LÓGICA DE MICRO-LOTE ---
        resultados_micro_lote = []
        TAMANHO_MICRO_LOTE = 2000
        
        # --- LÓGICA DE GERAÇÃO DE CADEIA (FAN-OUT E FAN-IN) ---
        # Usa o gerador semeado da partição (local_random)
        for row in lote_final.itertuples(index=False):
            
            # --- LÓGICA DE FAN-OUT (Dispersão) ---
            if hasattr(row, 'is_fraud') and row.is_fraud and row.fraud_type == "triangulacao_conta_laranja":
                if local_random.random() < config.get('PROBABILIDADE_TESTE_CONTA', 0.3):
                    resultados_micro_lote.append({"id": str(uuid.uuid4()), "valor": round(local_random.uniform(0.01, 1.00), 2), "data": row.data - timedelta(minutes=local_random.randint(1, 5)), "mensagem": "Teste", "id_conta_origem": row.id_conta_origem, "id_conta_destino": row.id_conta_destino, "id_tipo_iniciacao_pix": 1, "id_finalidade_pix": 1, "is_fraud": 0, "fraud_type": None, "id_transacao_cadeia_pai": None})
                
                row_dict = row._asdict(); 
                row_dict['data'] = _aplicar_horario_suspeito(row_dict['data'], config, local_random) # <-- Instância passada
                
                min_prof = config.get('FANOUT_MIN_PROFUNDIDADE', 2); max_prof = config.get('FANOUT_MAX_PROFUNDIDADE', 4)
                profundidade_alvo = local_random.randint(min_prof, max_prof)
                id_fraude_raiz = row_dict['id']; resultados_micro_lote.append(row_dict)
                contas_nivel_anterior = {row_dict['id_conta_destino']: row_dict['valor']}; id_pai_nivel_anterior = {row_dict['id_conta_destino']: id_fraude_raiz}; data_nivel_anterior = {row_dict['id_conta_destino']: row_dict['data']}
                
                for nivel_atual in range(2, profundidade_alvo + 1):
                    contas_proximo_nivel = {}; id_pai_proximo_nivel = {}; data_proximo_nivel = {}
                    if not contas_nivel_anterior: break
                    for conta_origem, valor_origem in contas_nivel_anterior.items():
                        min_larg = config.get('FANOUT_MIN_LARGURA', 2); max_larg = config.get('FANOUT_MAX_LARGURA', 5)
                        num_subs = local_random.randint(min_larg, max_larg)
                        
                        # Usa o gerador semeado da partição
                        valores_divididos = local_np_random.dirichlet(np.ones(num_subs)) * valor_origem
                        
                        for k in range(num_subs):
                            id_transacao_filha = str(uuid.uuid4())
                            id_conta_destino_filha = local_random.choice(pool_de_contas_reais) 
                            segundos_offset = local_random.uniform(60 * (nivel_atual-1), 3600 * (nivel_atual-1))
                            data_transacao_filha = data_nivel_anterior[conta_origem] + timedelta(seconds=segundos_offset); 
                            data_transacao_filha = _aplicar_horario_suspeito(data_transacao_filha, config, local_random) # <-- Instância passada
                            
                            transacao_filha = {"id": id_transacao_filha, "valor": round(max(0.01, valores_divididos[k]), 2), "data": data_transacao_filha, "mensagem": f"Dispersão N{nivel_atual} (Parte {k+1}/{num_subs})", "id_conta_origem": conta_origem, "id_conta_destino": id_conta_destino_filha, "id_tipo_iniciacao_pix": local_random.randint(1, 3), "id_finalidade_pix": local_random.randint(1, 4), "is_fraud": 1, "fraud_type": row.fraud_type, "id_transacao_cadeia_pai": id_pai_nivel_anterior[conta_origem]}
                            resultados_micro_lote.append(transacao_filha)
                            contas_proximo_nivel[id_conta_destino_filha] = transacao_filha["valor"]; id_pai_proximo_nivel[id_conta_destino_filha] = id_transacao_filha; data_proximo_nivel[id_conta_destino_filha] = data_transacao_filha
                            
                            if local_random.random() < PROB_RUIDO:
                                for _ in range(local_random.randint(1, 3)):
                                    segundos_offset_ruido = local_random.uniform(10, 3600)
                                    id_conta_destino_ruido = local_random.choice(pool_de_contas_reais) 
                                    resultados_micro_lote.append({"id": str(uuid.uuid4()), "valor": round(local_random.uniform(7.5, 75.0), 2), "data": data_transacao_filha + timedelta(seconds=segundos_offset_ruido), "mensagem": random.choice(MENSAGENS_RUIDO), "id_conta_origem": id_conta_destino_filha, "id_conta_destino": id_conta_destino_ruido, "id_tipo_iniciacao_pix": 1, "id_finalidade_pix": 1, "is_fraud": 0, "fraud_type": None, "id_transacao_cadeia_pai": None})
                    contas_nivel_anterior = contas_proximo_nivel; id_pai_nivel_anterior = id_pai_proximo_nivel; data_nivel_anterior = data_proximo_nivel
            
            # --- LÓGICA DE FAN-IN (Consolidação) ---
            elif hasattr(row, 'is_fraud') and row.is_fraud and row.fraud_type == "consolidacao_fundos":
                row_dict = row._asdict()
                row_dict['data'] = _aplicar_horario_suspeito(row_dict['data'], config, local_random) # <-- Instância passada
                resultados_micro_lote.append(row_dict)
                id_fraude_raiz = row_dict['id']; data_fraude_raiz = row_dict['data']
                id_conta_destino_final = row_dict['id_conta_destino'] 
                
                min_fontes = config.get('MIN_FONTES_CONSOLIDACAO', 3); max_fontes = config.get('MAX_FONTES_CONSOLIDACAO', 10); janela_seg = config.get('JANELA_SEG_CONSOLIDACAO', 3600)
                num_fontes_extras = local_random.randint(min_fontes, max_fontes)
                
                for k in range(num_fontes_extras):
                    id_transacao_filha = str(uuid.uuid4())
                    id_conta_origem_filha = local_random.choice(pool_de_contas_reais) 
                    segundos_offset = local_random.uniform(1, janela_seg)
                    data_transacao_filha = data_fraude_raiz + timedelta(seconds=segundos_offset)
                    data_transacao_filha = _aplicar_horario_suspeito(data_transacao_filha, config, local_random) # <-- Instância passada
                    
                    # Usa o gerador semeado da partição
                    valor_filha = np.maximum(0.01, local_np_random.lognormal(mean=np.log(150), sigma=0.8) * config.get('MULTIPLICADOR_MAGNITUDE_FRAUDE', 30)).round(2)
                    
                    transacao_filha = {"id": id_transacao_filha, "valor": valor_filha, "data": data_transacao_filha, "mensagem": f"Consolidação Nível 1 (Fonte {k+1}/{num_fontes_extras})", "id_conta_origem": id_conta_origem_filha, "id_conta_destino": id_conta_destino_final, "id_tipo_iniciacao_pix": local_random.randint(1, 3), "id_finalidade_pix": local_random.randint(1, 4), "is_fraud": 1, "fraud_type": row.fraud_type, "id_transacao_cadeia_pai": id_fraude_raiz}
                    resultados_micro_lote.append(transacao_filha)
            
            else:
                resultados_micro_lote.append(row._asdict())
            
            if len(resultados_micro_lote) >= TAMANHO_MICRO_LOTE:
                df_final = pd.DataFrame(resultados_micro_lote)
                yield df_final[colunas_finais_transacoes]
                resultados_micro_lote = []
                
    if resultados_micro_lote: 
        df_final = pd.DataFrame(resultados_micro_lote)
        yield df_final[colunas_finais_transacoes]

def gerar_transacoes(
    df_contas_local: DataFrame, 
    df_clientes_local: DataFrame, 
    df_chaves_recentes_local: DataFrame, num_contas_local: int,
    volume_total: int, estado_ibge: int, municipio_ibge: int, ano: int, mes: int, 
    municipios_processados: list,
    perfil_sazonal: dict,
    seed: int # Esta é a 'base_seed' (ex: GLOBAL_SEED + i + mes)
) -> DataFrame:
    
    # Garantir determinismo na escolha do município
    random.seed(seed)
    
    # --- LÓGICA DE GERAÇÃO DE PARES (Determinística e Otimizada) ---
    volume_intermunicipal = int(volume_total * PROBABILIDADE_TRANSACAO_INTERMUNICIPAL)
    volume_local = volume_total - volume_intermunicipal
    df_pares_locais = spark.createDataFrame([], SCHEMA_PARES)
    df_pares_intermunicipais = spark.createDataFrame([], SCHEMA_PARES)
    
    if volume_local > 0 and num_contas_local > 1:
        N_BUCKETS = max(100, int(num_contas_local / 10000)) 
        
        # Adicionada seed ao F.rand()
        df_origens = df_contas_local.select(
            F.col("id").alias("id_conta_origem"), 
            (F.rand(seed=seed) * N_BUCKETS).cast("int").alias("join_key")
        )
        df_destinos = df_contas_local.select(
            F.col("id").alias("id_conta_destino"), 
            (F.rand(seed=seed + 1) * N_BUCKETS).cast("int").alias("join_key") # Seed diferente
        )
        df_pares_locais = df_origens.join(df_destinos, "join_key") \
                                  .filter(F.col("id_conta_origem") != F.col("id_conta_destino")) \
                                  .select("id_conta_origem", "id_conta_destino") \
                                  .limit(volume_local) # Limit após o join é OK aqui
    
    outros_municipios = [m for m in municipios_processados if m != municipio_ibge]
    
    if volume_intermunicipal > 0 and outros_municipios:
        municipio_alvo = random.choice(outros_municipios) # Determinístico (random.seed() acima)
        

        fraction_origem = min(1.0, (volume_intermunicipal * 1.2) / max(1, num_contas_local))
        
        contas_origem = df_contas_local.select("id") \
            .sample(withReplacement=False, fraction=fraction_origem, seed=seed) \
            .limit(volume_intermunicipal) \
            .withColumnRenamed("id", "id_conta_origem") \
            .withColumn("join_key", F.monotonically_increasing_id())

        contas_destino_externas = spark.table("transacoes_db.copper.contas") \
            .filter(F.col("municipio_ibge") == municipio_alvo) \
            .select("id") \
            .sample(withReplacement=False, fraction=fraction_origem, seed=seed + 1) \
            .limit(volume_intermunicipal) \
            .withColumnRenamed("id", "id_conta_destino") \
            .withColumn("join_key", F.monotonically_increasing_id())
            
        df_pares_intermunicipais = contas_origem.join(contas_destino_externas, "join_key") \
                                              .select("id_conta_origem", "id_conta_destino")
    
    df_pares_total = df_pares_locais.union(df_pares_intermunicipais)
    if df_pares_total.isEmpty(): return spark.createDataFrame([], SCHEMA_TRANSACOES_FINAL)
    -
    df_pares_enriquecidos = (df_pares_total
        .join(df_contas_local.alias("orig"), df_pares_total.id_conta_origem == F.col("orig.id"), "inner")
        .join(df_clientes_local.alias("cli_orig"), F.col("orig.id_cliente") == F.col("cli_orig.id"), "inner")
        .join(df_contas_local.alias("dest"), df_pares_total.id_conta_destino == F.col("dest.id"), "inner")
        .join(df_chaves_recentes_local.alias("chaves"), df_pares_total.id_conta_destino == F.col("chaves.id_conta"), "left")
        .select("id_conta_origem", "id_conta_destino", F.col("orig.id_tipo_conta").alias("id_tipo_conta_origem"),
                F.col("dest.id_tipo_conta").alias("id_tipo_conta_destino"), F.col("chaves.chave_destino_cadastrada_em"),
                F.col("dest.is_high_risk"),
                F.col("cli_orig.nascido_em").alias("pagador_nascido_em")
               ))

    
    df_pares_com_pid = df_pares_enriquecidos.withColumn("pid", F.spark_partition_id())
    

    gerador_com_contexto = functools.partial(
        _gerar_detalhes_transacao_python_vetorizado, 
        ano=ano, mes=mes, config=config_geracao, 
        perfis_uso=perfis_de_uso_dict,
        perfil_sazonal=perfil_sazonal,
        seed=seed 
    )
    
    df_transacoes_bruto = df_pares_com_pid.mapInPandas(gerador_com_contexto, schema=SCHEMA_TRANSACOES_UDF)
    return df_transacoes_bruto.withColumn("estado_ibge", F.lit(estado_ibge))

# Funções utilitárias e de Orquestração

In [0]:

def salvar_dataframe_em_delta(df: DataFrame, nome_tabela_completo: str, modo: str = "append"):
    if df is None or df.isEmpty():
        print(f"AVISO: DataFrame para a tabela '{nome_tabela_completo}' está vazio. Nenhuma ação tomada.")
        return
    try:
        print(f"INFO: Salvando dados na tabela Delta: {nome_tabela_completo} (modo: {modo})...")
        df.write.format("delta").mode(modo).saveAsTable(nome_tabela_completo)
        print(f"✅ SUCESSO: Dados salvos em {nome_tabela_completo}.")
    except Exception as e:
        print(f"❌ ERRO ao salvar '{nome_tabela_completo}': {e}")
        raise e

# Funções agora aceitam e passam a 'seed'
def gerar_e_salvar_populacao(num_pf: int, num_pj: int, estado_ibge: int, municipio_ibge: int, seed: int) -> None:
    print(f"INFO: Gerando e materializando população para {municipio_ibge} (PF: {num_pf}, PJ: {num_pj})...")
    
    # (Não precisa de seed, _gerar_clientes usa UDF semeada na Célula 1)
    df_clientes_gerado = _gerar_clientes(num_pf, num_pj, estado_ibge, municipio_ibge)
    salvar_dataframe_em_delta(df_clientes_gerado, "transacoes_db.copper.clientes", modo="append")
    
    # Recarregar para garantir consistência
    df_clientes_materializado = spark.table("transacoes_db.copper.clientes").filter(F.col("municipio_ibge") == municipio_ibge)
    
    # Passando a seed
    df_contas_gerado = _gerar_contas(df_clientes_materializado, estado_ibge, municipio_ibge, seed=seed)
    salvar_dataframe_em_delta(df_contas_gerado, "transacoes_db.copper.contas", modo="append")
    
    df_contas_materializado = spark.table("transacoes_db.copper.contas").filter(F.col("municipio_ibge") == municipio_ibge)
    
    # Passando a seed
    df_chaves_pix = _gerar_chaves_pix(df_contas_materializado, df_clientes_materializado, estado_ibge, municipio_ibge, seed=seed + 1)
    salvar_dataframe_em_delta(df_chaves_pix, "transacoes_db.copper.chaves_pix", modo="append")
    
    print(f"INFO: População para o município {municipio_ibge} adicionada com sucesso.")

def limpar_tabelas_de_destino():
    print("INFO: Apagando tabelas de destino para recriação...")
    for tabela in ["clientes", "contas", "chaves_pix", "transacoes"]:
        spark.sql(f"DROP TABLE IF EXISTS transacoes_db.copper.{tabela}")
    print("INFO: ✅ Limpeza concluída.")

def criar_tabelas_de_destino():
    print("INFO: Criando tabelas de destino com os schemas corretos...")
    tabelas = {
        "transacoes_db.copper.clientes": (SCHEMA_CLIENTES_FINAL, ["estado_ibge", "municipio_ibge"]),
        "transacoes_db.copper.contas": (SCHEMA_CONTAS_FINAL, ["estado_ibge", "municipio_ibge"]),
        "transacoes_db.copper.chaves_pix": (SCHEMA_CHAVES_PIX_FINAL, ["estado_ibge", "municipio_ibge"]),
        "transacoes_db.copper.transacoes": (SCHEMA_TRANSACOES_FINAL, ["estado_ibge"])
    }
    for nome, (schema, part_cols) in tabelas.items():
        spark.createDataFrame([], schema).write.format("delta").partitionBy(part_cols).mode("overwrite").saveAsTable(nome)
        print(f"INFO: Tabela '{nome}' criada com sucesso.")


# Orquestração

In [0]:
# =============================================================================
# CÉLULA 3: ORQUESTRAÇÃO (REFATORADA E DETERMINÍSTICA)
# =============================================================================
try:
    # 1. Preparação das Tabelas
    limpar_tabelas_de_destino()
    criar_tabelas_de_destino()
    print("=============================================================================")
    print(f"INFO: Iniciando processo de geração ANUAL (Ano: {ANO_ESTATISTICA})...")
    
    # 2. Leitura e Seleção de Municípios
    print("INFO: Lendo volumes anuais da tabela agregada...")
    df_volumes_anuais = spark.table("transacoes_db.pix_baseline_metricas.volumes_anuais_por_municipio").filter(F.col("Ano") == ANO_ESTATISTICA)
    
    print("INFO: Rankeando municípios para seleção...")
    # Rank é determinístico, não precisa de seed
    df_ranks_anuais = df_volumes_anuais.withColumn("rank_pagador_anual", F.rank().over(Window.orderBy(F.col("volume_pagador_anual").desc())))
    
    municipios_a_processar_lista = df_ranks_anuais.orderBy(F.col("rank_pagador_anual").asc()).limit(LIMITE_MUNICIPIOS_PROCESSADOS).collect()
    total_municipios = len(municipios_a_processar_lista)
    print(f"INFO: {total_municipios} municípios selecionados para processar.")
    
    id_municipios_selecionados = [row["cod_ibge_municipio"] for row in municipios_a_processar_lista]

    print("INFO: Coletando estatísticas mensais para os municípios selecionados...")
    # Coletar para o driver é uma boa otimização para < 100k linhas (Databricks Free)
    stats_mensal_pd = (spark.table("transacoes_db.pix_baseline_metricas.relacao_pagadores_recebedores")
                          .filter((F.col("ano") == ANO_ESTATISTICA) & (F.col("Municipio_Ibge").isin(id_municipios_selecionados)))
                          .select("Municipio_Ibge", "Mes", "total_tx_pf_pagador", "total_tx_pj_pagador").toPandas())
    stats_mensal_pd['total_tx_pagador'] = stats_mensal_pd['total_tx_pf_pagador'] + stats_mensal_pd['total_tx_pj_pagador']
    print("INFO: Estatísticas mensais coletadas com sucesso.")

    # =============================================================================
    # PASSO 1: GERAR TODA A POPULAÇÃO PRIMEIRO
    # =============================================================================
    print("\n--- PASSO 1: GERANDO POPULAÇÃO PARA TODOS OS MUNICÍPIOS ---")
    
    municipios_ja_processados = [] 

    for i, municipio_row in enumerate(municipios_a_processar_lista):
        codigo_municipio = municipio_row["cod_ibge_municipio"]
        codigo_estado = municipio_row["cod_ibge_estado"]
        nome_municipio = municipio_row["municipio_nome"]
        
        print(f"\nGerando População {i+1}/{total_municipios}: {nome_municipio} ({codigo_municipio})")
        
        fator_escala_final = FATOR_ESCALA_VOLUME
        volume_pf_anual = int(municipio_row["total_pf_anual"] * fator_escala_final)
        volume_pj_anual = int(municipio_row["total_pj_anual"] * fator_escala_final)
        num_pf = max(1, int(volume_pf_anual / (TX_POR_CLIENTE_ESPERADO * 12)))
        num_pj = max(1, int(volume_pj_anual / (TX_POR_CLIENTE_ESPERADO * 12)))

        # ! REATORAÇÃO: Passando seed única por município
        gerar_e_salvar_populacao(
            num_pf=num_pf, num_pj=num_pj, 
            estado_ibge=codigo_estado, municipio_ibge=codigo_municipio,
            seed=GLOBAL_SEED + i # Seed única
        )
        municipios_ja_processados.append(codigo_municipio)

    print("\n--- PASSO 1 CONCLUÍDO: Todas as populações foram salvas. ---")
  
    print("\n--- PASSO 2 (REMOVIDO): Pool de contas será gerado dentro da UDF. ---")

    # =============================================================================
    # PASSO 3: GERAR TRANSAÇÕES (SEM CACHE - Otimizado)
    # =============================================================================
    print("\n--- PASSO 3: GERANDO TRANSAÇÕES MÊS A MÊS ---")

    for i, municipio_row in enumerate(municipios_a_processar_lista):
        codigo_municipio = municipio_row["cod_ibge_municipio"]
        codigo_estado = municipio_row["cod_ibge_estado"]
        nome_municipio = municipio_row["municipio_nome"]
        
        print(f"\n================== Iniciando Transações {i+1}/{total_municipios}: {nome_municipio} ({codigo_municipio}) ==================")
        
        fator_escala_final = FATOR_ESCALA_VOLUME

        # Esta é a leitura otimizada (fora do loop mensal)
        print("INFO: Lendo dados do município para o loop mensal (sem cache)...")
        df_contas_do_municipio = spark.table("transacoes_db.copper.contas") \
                                      .filter(F.col("municipio_ibge") == codigo_municipio)
        
        df_clientes_do_municipio = spark.table("transacoes_db.copper.clientes") \
                                        .filter(F.col("municipio_ibge") == codigo_municipio)
        
        num_contas_do_municipio = df_contas_do_municipio.count()
        if num_contas_do_municipio == 0:
            print(f"AVISO: Nenhuma conta encontrada para {nome_municipio}. Pulando...")
            continue
        
        df_chaves_do_municipio = spark.table("transacoes_db.copper.chaves_pix") \
                                       .filter(F.col("municipio_ibge") == codigo_municipio)
        
        window_chaves = Window.partitionBy("id_conta").orderBy(F.col("cadastrada_em").desc())
        
        df_chaves_recentes_do_municipio = (df_chaves_do_municipio
                                            .withColumn("rank", F.rank().over(window_chaves))
                                            .filter(F.col("rank") == 1)
                                            .select("id_conta", F.col("cadastrada_em").alias("chave_destino_cadastrada_em")))

        for mes in range(1, 13):
            print(f"\n--- Processando Mês {mes}/{ANO_ESTATISTICA} para {nome_municipio} ---")
            
            stats_mensal = stats_mensal_pd[(stats_mensal_pd['Municipio_Ibge'] == codigo_municipio) & (stats_mensal_pd['Mes'] == mes)]
            if stats_mensal.empty: 
                print(f"AVISO: Sem estatísticas para {mes}/{ANO_ESTATISTICA}. Pulando."); continue
            
            volume_total_original = stats_mensal["total_tx_pagador"].iloc[0]
            volume_total = int(volume_total_original * fator_escala_final)
            if volume_total <= 0: 
                print(f"AVISO: Volume de transações base para {mes}/{ANO_ESTATISTICA} é 0. Pulando."); continue
                
            print(f"       Volume Original: {volume_total_original} | Volume BASE Alvo: {volume_total}")
            
            print(f"        INFO: Gerando perfil de sazonalidade para o Mês {mes}...")
            perfil_sazonal_mes = _obter_perfil_sazonal_mes(ANO_ESTATISTICA, mes)
            
            # ! REATORAÇÃO: Passando seed única (município + mês)
            df_transacoes = gerar_transacoes(
                df_contas_local=df_contas_do_municipio,
                df_clientes_local=df_clientes_do_municipio,
                df_chaves_recentes_local=df_chaves_recentes_do_municipio,
                num_contas_local=num_contas_do_municipio,
                volume_total=volume_total, 
                estado_ibge=codigo_estado, 
                municipio_ibge=codigo_municipio,
                ano=ANO_ESTATISTICA, 
                mes=mes, 
                municipios_processados=municipios_ja_processados,
                perfil_sazonal=perfil_sazonal_mes,
                seed=GLOBAL_SEED + i + mes # Seed única por task
            )
            salvar_dataframe_em_delta(df_transacoes, "transacoes_db.copper.transacoes", modo="append")
        
        print(f"INFO: Processamento do município {nome_municipio} concluído.")

finally:
    print("\nINFO: O script chegou ao fim.")

print("\n=============================================================================")
print("INFO: Processo de geração de dados sintéticos (determinístico) concluído.")
print("=============================================================================")