In [0]:
# ============================================
# GOLD LAYER - ANÁLISES CONTÁBEIS
# ============================================
# Objetivo: Criar métricas contábeis prontas
# para dashboards e relatórios
# ============================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window

SOURCE_TABLE = "finance_silver.transacoes_silver"

print(" Gold Layer Métricas Contábeis")
print(f" Origem: {SOURCE_TABLE}")
print("\n Cria 3 tabelas Gold:")
print("   1. DRE Mensal (Demonstração do Resultado)")
print("   2. Fluxo de Caixa Mensal")
print("   3. Indicadores Financeiros")

In [0]:
# ============================================
# Tabela Gold 1: DRE Mensal
# ============================================
# DRE = Demonstração do Resultado do Exercício
# mostra receitas, custos, despesas, lucro

print(" Criando DRE mmensal: \n")

df_silver = spark.read.table(SOURCE_TABLE)

df_dre = df_silver.groupBy("ano", "mes", "ano_mes").agg(
    
    F.sum(
        F.when(F.col("categoria") == "Receita", F.col("valor"))
        .otherwise(0)
    ).alias("receita_bruta"),
    
    F.sum(
        F.when(F.col("categoria") == "Fornecedor", F.col("valor"))
        .otherwise(0)
    ).alias("custo_mercadoria"),
    
    F.sum(
        F.when(F.col("categoria").isin(["Salário", "Despesa Operacional"]), F.col("valor"))
        .otherwise(0)
    ).alias("despesas_operacionais"),
    
    F.sum(
        F.when(F.col("categoria") == "Impostos", F.col("valor"))
        .otherwise(0)
    ).alias("impostos"),
    
    F.sum(
        F.when(F.col("categoria") == "Investimento", F.col("valor"))
        .otherwise(0)
    ).alias("investimentos"),
    
    F.count("*").alias("total_transacoes")
    
).withColumn(

    "lucro_bruto",
    F.col("receita_bruta") - F.col("custo_mercadoria")
    
).withColumn(
    "lucro_operacional",
    F.col("lucro_bruto") - F.col("despesas_operacionais")
    
).withColumn(
    "lucro_liquido",
    F.col("lucro_operacional") - F.col("impostos")
    
).withColumn(
    "margem_liquida_pct",
    F.round((F.col("lucro_liquido") / F.col("receita_bruta")) * 100, 2)
    
).orderBy("ano", "mes")

df_dre.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("finance_gold.dre_mensal")

print("Tabela DRE Mensal criada: finance_gold.dre_mensal")
print(f"Meses processados: {df_dre.count()}")

print("\n Amostra DRE Mensal:")
display(df_dre.select(
    "ano_mes", "receita_bruta", "custo_mercadoria", 
    "lucro_bruto", "lucro_liquido", "margem_liquida_pct"
).limit(6))


In [0]:
# ============================================
# Tabela Gold 2: Fluco de caixa Mensal
# ============================================

print("Cria o Fluxo de Caixa Mensal: \n")

df_fluxo = df_silver.groupBy("ano", "mes", "ano_mes").agg(
    
    F.sum(
        F.when(F.col("tipo") == "entrada", F.col("valor"))
        .otherwise(0)
    ).alias("entradas"),
    
    F.sum(
        F.when(F.col("tipo") == "saida", F.col("valor"))
        .otherwise(0)
    ).alias("saidas"),
    
).withColumn(
    "fluxo_mensal",
    F.col("entradas") - F.col("saidas")
    
).orderBy("ano", "mes")

from pyspark.sql.window import Window

window_spec = Window.orderBy("ano", "mes").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_fluxo = df_fluxo.withColumn(
    "saldo_acumulado",
    F.sum("fluxo_mensal").over(window_spec)
)

df_fluxo.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("finance_gold.fluxo_caixa_mensal")

print("Tabela Fluxo de Caixa criada: finance_gold.fluxo_caixa_mensal")
print(f"Meses processados: {df_fluxo.count()}")

print("\n Amostra Fluxo de Caixa:")
display(df_fluxo.select(
    "ano_mes", "entradas", "saidas", "fluxo_mensal", "saldo_acumulado"
).limit(12))

In [0]:
# ============================================
# Tabela Gold 3: Indicadores Financeiros
# ============================================

print(" Criando Indicadores Financeiros...\n")

df_indicadores = df_silver.groupBy("ano").agg(
    
    F.sum(
        F.when(F.col("categoria") == "Receita", F.col("valor"))
    ).alias("receita_total_ano"),
    
    F.sum(
        F.when(F.col("tipo") == "saida", F.col("valor"))
    ).alias("despesa_total_ano"),
    
    F.avg(
        F.when(F.col("categoria") == "Receita", F.col("valor"))
    ).alias("ticket_medio_receita"),
    
    F.max("valor").alias("maior_transacao"),
    F.min("valor").alias("menor_transacao"),
    F.count("*").alias("total_transacoes_ano"),
    
    F.sum(
        F.when(F.col("alto_valor") == True, 1).otherwise(0)
    ).alias("transacoes_alto_valor")
    
).withColumn(
    "lucro_ano",
    F.col("receita_total_ano") - F.col("despesa_total_ano")
    
).withColumn(
    "margem_ano_pct",
    F.round((F.col("lucro_ano") / F.col("receita_total_ano")) * 100, 2)
    
).withColumn(
    "pct_transacoes_alto_valor",
    F.round((F.col("transacoes_alto_valor") / F.col("total_transacoes_ano")) * 100, 2)
)

df_indicadores.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("finance_gold.indicadores_anuais")

print("Tabela Indicadores criada: finance_gold.indicadores_anuais")

# Mostrar
print("\n Indicadores Financeiros:")
display(df_indicadores)

print("\n Camada Gold Ok")
print(" 3 tabelas criadas:")
print("   1. finance_gold.dre_mensal")
print("   2. finance_gold.fluxo_caixa_mensal")
print("   3. finance_gold.indicadores_anuais")


In [0]:
# ============================================
# Validanddo a Camada Gold
# ============================================

print(" Validação da camada Gold \n")
print("="*60)

print("\n1️. Validando DRE Mensal:")
df_dre_val = spark.read.table("finance_gold.dre_mensal")
print(f"  Total de meses: {df_dre_val.count()}")
print(f"  Período: {df_dre_val.agg(F.min('ano_mes')).collect()[0][0]} até {df_dre_val.agg(F.max('ano_mes')).collect()[0][0]}")

meses_sem_receita = df_dre_val.filter(F.col("receita_bruta") == 0).count()
print(f"   Meses sem receita: {meses_sem_receita} (ideal: 0)")

print("\n 2️. Validando Fluxo de Caixa:")
df_fluxo_val = spark.read.table("finance_gold.fluxo_caixa_mensal")
print(f"   Total de meses: {df_fluxo_val.count()}")

saldo_final = df_fluxo_val.orderBy(F.desc("ano"), F.desc("mes")).select("saldo_acumulado").first()[0]
print(f"   Saldo acumulado final: R$ {saldo_final:,.2f}")

print("\n 3️. Validando Indicadores:")
df_ind_val = spark.read.table("finance_gold.indicadores_anuais")
print(f"   Anos processados: {df_ind_val.count()}")

print("\n Resumo por Ano:")
df_ind_val.select(
    "ano", 
    "receita_total_ano", 
    "despesa_total_ano", 
    "lucro_ano", 
    "margem_ano_pct"
).show()

print("\n 4️. Validando Consistência:")

dre_receita_2024 = df_dre_val.filter(F.col("ano") == 2024).agg(F.sum("receita_bruta")).collect()[0][0]
ind_receita_2024 = df_ind_val.filter(F.col("ano") == 2024).select("receita_total_ano").collect()[0][0]

print(f"   Receita 2024 (DRE): R$ {dre_receita_2024:,.2f}")
print(f"   Receita 2024 (Indicadores): R$ {ind_receita_2024:,.2f}")
print(f"   Diferença: R$ {abs(dre_receita_2024 - ind_receita_2024):,.2f}")

if abs(dre_receita_2024 - ind_receita_2024) < 1:
    print(" Consistência OK!")
else:
    print("  Pequena diferença (pode ser arredondamento)")

print("\n" + "="*60)
print(" Camada Gold Validada e Ok")
print("="*60)