In [0]:
# Configurações iniciais
processo_nome = 'silver_ue_calculation'
status_final = 'sucesso'
detalhes_log = 'Cálculo da UE e DQ concluídos.'
total_records = 0

try:
    # 1. INICIO DO PROCESSO - LOGGING
    spark.sql(f"""
    INSERT INTO pl_delivery_analysis.log_processamento (processo, etapa, status, timestamp, detalhes)
    VALUES ('{processo_nome}', 'inicio', 'executando', CURRENT_TIMESTAMP(), 'Iniciando cálculo da UE e DQ checks');
    """)

    # 2. LOGICA CORE: CALCULO DA UE (COM DEDUPLICAÇÃO E CASTING)
    query_silver = """
    CREATE OR REPLACE TABLE pl_delivery_analysis.tbl_fact_pedidos_silver AS
    WITH constantes AS (
        SELECT 
            0.18 AS comissao_plataforma,
            0.70 AS repasse_entregador,
            0.02 AS taxa_transacao
    ),
    -- 1. DEDUPLICAR DELIVERIES: Pega a última tentativa ou a que foi entregue
    deliveries_dedup AS (
        SELECT 
            order_id, 
            driver_id,
            delivery_status
        FROM (
            SELECT 
                order_id, 
                driver_id,
                delivery_status,
                ROW_NUMBER() OVER (
                    PARTITION BY order_id 
                    ORDER BY 
                        CASE WHEN delivery_status = 'DELIVERED' THEN 1 ELSE 2 END, 
                        driver_id 
                ) as rn
            FROM pl_delivery_analysis.tbl_fact_deliveries_bronze
            WHERE delivery_status = 'DELIVERED'
        ) 
        WHERE rn = 1
    ),
    -- 2. DEDUPLICAR PAYMENTS: Pega o pagamento principal
    payments_dedup AS (
        SELECT 
            order_id, 
            payment_method
        FROM (
            SELECT 
                order_id,
                payment_method,
                ROW_NUMBER() OVER (
                    PARTITION BY order_id 
                    ORDER BY CAST(payment_amount AS DECIMAL(10,2)) DESC
                ) as rn
            FROM pl_delivery_analysis.tbl_fact_payments_bronze
        ) 
        WHERE rn = 1
    ),
    -- 3. JOIN FINAL E VALIDAÇÃO DE ENTRADA
    pedidos_limpos AS (
        SELECT
            t1.order_id,
            t1.store_id,
            d.driver_id,
            t1.created_at AS created_at_ts_str,
            p.payment_method,
            d.delivery_status,
            
            -- Casting de STRING para DECIMAL para cálculo
            CAST(t1.subtotal AS DECIMAL(10,2)) AS subtotal_bruto,
            CAST(t1.delivery_fee AS DECIMAL(10,2)) AS delivery_fee_cliente,
            
            t4.comissao_plataforma,
            t4.repasse_entregador,
            t4.taxa_transacao
        FROM pl_delivery_analysis.tbl_fact_orders_bronze t1
        INNER JOIN deliveries_dedup d ON t1.order_id = d.order_id
        INNER JOIN payments_dedup p ON t1.order_id = p.order_id
        CROSS JOIN constantes t4
        WHERE 
            -- FILTRO DE DQ: Descartar pedidos com valores monetários nulos ou negativos
            CAST(t1.subtotal AS DECIMAL(10,2)) IS NOT NULL AND
            CAST(t1.delivery_fee AS DECIMAL(10,2)) IS NOT NULL AND
            CAST(t1.subtotal AS DECIMAL(10,2)) >= 0
    )
    SELECT
        order_id,
        store_id,
        driver_id,
        payment_method,
        created_at_ts_str,
        subtotal_bruto,
        delivery_fee_cliente,
        (subtotal_bruto + delivery_fee_cliente) AS gmv_total,
        (subtotal_bruto * comissao_plataforma) + (delivery_fee_cliente * (1 - repasse_entregador)) AS receita_liquida_plataforma,
        (delivery_fee_cliente * repasse_entregador) AS cogs_logistico_simulado,
        (subtotal_bruto * taxa_transacao) AS cogs_transacao_simulado,
        (
            (subtotal_bruto * comissao_plataforma) + (delivery_fee_cliente * (1 - repasse_entregador)) 
            - (delivery_fee_cliente * repasse_entregador) 
            - (subtotal_bruto * taxa_transacao)
        ) AS lucro_bruto_unitario
    FROM pedidos_limpos;
    """
    spark.sql(query_silver)

    # Conta o total de registros
    total_records = spark.sql("SELECT COUNT(*) AS total FROM pl_delivery_analysis.tbl_fact_pedidos_silver").collect()[0]['total']

    # 3. DATA QUALITY CHECKS (APÓS CRIAÇÃO DA SILVER)
    print("Executando Data Quality Checks...")

    # A. CHECK NULLS (Lucro Bruto Unitário)
    df_null = spark.sql("""
    SELECT CAST(SUM(CASE WHEN lucro_bruto_unitario IS NULL THEN 1 ELSE 0 END) AS DECIMAL(15,2)) AS null_count
    FROM pl_delivery_analysis.tbl_fact_pedidos_silver
    """)
    null_count = df_null.collect()[0]['null_count']

    # B. CHECK NEGATIVOS (Pedidos Não-Rentáveis)
    df_neg = spark.sql("""
    SELECT CAST(SUM(CASE WHEN lucro_bruto_unitario < 0 THEN 1 ELSE 0 END) AS DECIMAL(15,2)) AS neg_count
    FROM pl_delivery_analysis.tbl_fact_pedidos_silver
    """)
    neg_count = df_neg.collect()[0]['neg_count']

    # C. CHECK EXTREMOS (Lucro Bruto > 50% do GMV Total) - Nova Validação de Negócio
    df_extremo = spark.sql("""
    SELECT CAST(SUM(CASE WHEN lucro_bruto_unitario > (gmv_total * 0.50) THEN 1 ELSE 0 END) AS DECIMAL(15,2)) AS extremo_count
    FROM pl_delivery_analysis.tbl_fact_pedidos_silver
    """)
    extremo_count = df_extremo.collect()[0]['extremo_count']

    if total_records > 0:
        pct_neg = (neg_count / total_records) * 100
        pct_extremo = (extremo_count / total_records) * 100
    else:
        pct_neg = 0.0
        pct_extremo = 0.0

    # 4. INSERIR METRICAS (Incluindo a nova métrica de extremo)
    spark.sql(f"""
    INSERT INTO pl_delivery_analysis.metricas_qualidade (metrica, valor, data_calculo)
    VALUES 
    ('silver_lucro_bruto_nulls', {null_count}, CURRENT_TIMESTAMP()),
    ('silver_pedidos_nao_rentaveis', {pct_neg}, CURRENT_TIMESTAMP()),
    ('silver_pedidos_lucro_extremo', {pct_extremo}, CURRENT_TIMESTAMP());
    """)

except Exception as e:
    status_final = 'falha'
    erro_msg = str(e).replace("'", "") 
    detalhes_log = f"Erro na execução: {erro_msg}"
    print(f"ERRO CRITICO: {detalhes_log}")
    raise e

finally:
    # 5. LOG FINAL
    spark.sql(f"""
    INSERT INTO pl_delivery_analysis.log_processamento (processo, etapa, status, timestamp, detalhes)
    VALUES ('{processo_nome}', 'fim', '{status_final}', CURRENT_TIMESTAMP(), '{detalhes_log} - Registros: {total_records}');
    """)