In [0]:
#Montagem da tabela delta de cadastro deputado em formato delta table para camada silver

from pyspark.sql import functions as F
# Caminho do seu arquivo JSON
caminho_json = "dbfs:/Volumes/workspace/deputados/deputados/bronze/despesas/despesas_consolidadas_*.json"

# Lê o arquivo JSON
df = spark.read.option("multiline", "true").json(caminho_json)
#df.printSchema()


df_cad_dep = df.select(
    F.explode("despesas").alias("deputado_despesas")
).select(
    # Metadados do deputado
    F.col("deputado_despesas.idDeputado").alias("id_deputado"),
    F.col("deputado_despesas.totalDespesas").alias("total_despesas_deputado"),
    F.col("deputado_despesas.totalPaginas").alias("total_paginas"),
    F.col("deputado_despesas.dataExtracao").alias("data_extracao_deputado"),
    
    # Explode as despesas individuais
    F.explode("deputado_despesas.dados").alias("despesa")
).select(
    # Colunas do deputado
    "id_deputado",
    "total_despesas_deputado", 
    "total_paginas",
    "data_extracao_deputado",
    
    # Todas as colunas das despesas
    F.col("despesa.ano").alias("ano"),
    F.col("despesa.cnpjCpfFornecedor").alias("cnpj_cpf_fornecedor"),
    F.col("despesa.codDocumento").alias("cod_documento"),
    F.col("despesa.codLote").alias("cod_lote"),
    F.col("despesa.codTipoDocumento").alias("cod_tipo_documento"),
    F.col("despesa.dataDocumento").alias("data_documento"),
    F.col("despesa.mes").alias("mes"),
    F.col("despesa.nomeFornecedor").alias("nome_fornecedor"),
    F.col("despesa.numDocumento").alias("num_documento"),
    F.col("despesa.numRessarcimento").alias("num_ressarcimento"),
    F.col("despesa.parcela").alias("parcela"),
    F.col("despesa.tipoDespesa").alias("tipo_despesa"),
    F.col("despesa.tipoDocumento").alias("tipo_documento"),
    F.col("despesa.urlDocumento").alias("url_documento"),
    F.col("despesa.valorDocumento").alias("valor_documento"),
    F.col("despesa.valorGlosa").alias("valor_glosa"),
    F.col("despesa.valorLiquido").alias("valor_liquido")
)


#df_despesas.show(10, truncate=False)
# Adiciona timestamp de processamento
df_despesas = df_cad_dep \
    .withColumn("data_processamento", F.current_timestamp()) \
    .withColumn("ano_mes", F.date_format("data_documento",'yyyyMM')
)
# Define o caminho para a Delta Table
delta_table_path = "/Volumes/workspace/deputados/deputados/silver/despesas"

# Cria a Delta Table
df_despesas.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .option("delta.constraints.pk_id", "id_deputado IS NOT NULL") \
    .partitionBy("ano_mes") \
    .save(delta_table_path)

print("✅ Delta Table criada com sucesso!")
print(f" Local: {delta_table_path}")




In [0]:
# Registra como tabela no catalog (para usar com SQL)
df_despesas.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("workspace.deputados.silver_despesas_deputados")

print("✅ Tabela 'despesas_deputados' registrada no catalog!")

In [0]:
%sql

SELECT 
    ano,
    mes,
    tipo_despesa,
    COUNT(*) as quantidade,
    SUM(valor_liquido) as valor_total
FROM workspace.deputados.silver_despesas_deputados
GROUP BY ano, mes ,tipo_despesa
ORDER BY valor_total DESC