In [0]:
# -*- coding: utf-8 -*-
# Databricks Notebook - PySpark/Python

# 1. Configuração e Carregamento da Tabela
# O nome da tabela é o que aparece no seu Catalog (workspace.default.nome_da_tabela)
# Substitua o nome abaixo pelo nome completo da tabela que você criou no Databricks.
table_name = "workspace.default.despesas_contratadas_candidatos_2024_sc"

# Carregar a tabela em um DataFrame do Spark
try:
    df = spark.table(table_name)
    print(f"Tabela carregada com sucesso. Total de registros brutos: {df.count()}")
except Exception as e:
    print(f"ERRO ao carregar a tabela. Verifique o nome: {table_name}")
    print(f"Detalhes do erro: {e}")
    # Se o nome da tabela for diferente, ajuste a variável table_name
    # Exemplo: table_name = "default.despesas_contratadas_candidatos_2024_sc"
    # ou table_name = "seu_catalogo.seu_schema.seu_nome_de_tabela"
    # Se você não estiver usando o Unity Catalog, o nome pode ser apenas:
    # table_name = "despesas_contratadas_candidatos_2024_sc"
    # Tente o nome completo que aparece no seu Catalog.
    # return

# A coluna de despesa é 'VR_DESPESA_CONTRATADA'
expense_column = "VR_DESPESA_CONTRATADA"

from pyspark.sql.functions import col, sum, desc, round

# 2. Filtragem Geral para Limpeza de Dados
# Objetivo: Focar em despesas válidas e com informações de fornecedor minimamente preenchidas.
df_filtrado = df.filter(
    # 1. Garantir que o valor da despesa seja positivo e não nulo
    (col(expense_column).isNotNull()) & 
    (col(expense_column) > 0) &
    
    # 2. Remover registros com dados de fornecedor ou município incompletos/genéricos
    (col("NM_MUNICIPIO_FORNECEDOR").isNotNull()) &
    (col("NM_MUNICIPIO_FORNECEDOR") != "#NULO") &
    (col("NM_MUNICIPIO_FORNECEDOR") != "-4") & # Adicionando -4 como valor genérico
    (col("NM_MUNICIPIO_FORNECEDOR") != "-1")
)

print(f"Total de registros após a filtragem de limpeza: {df_filtrado.count()}")

# O restante da análise será feito no df_filtrado
df_analise = df_filtrado

# 3. Análise Exploratória - Soma das Despesas por Dimensões Chave
# CORREÇÃO: Adicionando round(..., 2) para corrigir o erro de precisão de ponto flutuante

# --- Análise 1: Despesas por Partido (SG_PARTIDO) ---
print("\n--- Análise 1: Despesas por Partido ---")
df_partido = df_analise.groupBy("SG_PARTIDO").agg(
    round(sum(col(expense_column)), 2).alias("Total_Despesas")
).orderBy(desc("Total_Despesas"))

# Mostrar os 10 principais partidos
df_partido.show(10, truncate=False)

# Criar uma View Temporária para visualização no Databricks SQL/Gráficos
df_partido.createOrReplaceTempView("despesas_por_partido")

# --- Análise 2: Despesas por Cargo (DS_CARGO) ---
print("\n--- Análise 2: Despesas por Cargo ---")
df_cargo = df_analise.groupBy("DS_CARGO").agg(
    round(sum(col(expense_column)), 2).alias("Total_Despesas")
).orderBy(desc("Total_Despesas"))

# Mostrar todos os cargos
df_cargo.show(truncate=False)

# Criar uma View Temporária para visualização no Databricks SQL/Gráficos
df_cargo.createOrReplaceTempView("despesas_por_cargo")

# --- Análise 3: Despesas por Tipo de Fornecedor (DS_TIPO_FORNECEDOR) ---
print("\n--- Análise 3: Despesas por Tipo de Fornecedor ---")
df_fornecedor = df_analise.groupBy("DS_TIPO_FORNECEDOR").agg(
    round(sum(col(expense_column)), 2).alias("Total_Despesas")
).orderBy(desc("Total_Despesas"))

# Mostrar os 10 principais tipos de fornecedor
df_fornecedor.show(10, truncate=False)

# Criar uma View Temporária para visualização no Databricks SQL/Gráficos
df_fornecedor.createOrReplaceTempView("despesas_por_fornecedor")

# --- Análise 4: Despesas por Município (NM_MUNICIPIO_FORNECEDOR) ---
print("\n--- Análise 4: Despesas por Município ---")
df_municipio = df_analise.groupBy("NM_MUNICIPIO_FORNECEDOR").agg(
    round(sum(col(expense_column)), 2).alias("Total_Despesas")
).orderBy(desc("Total_Despesas"))

# Mostrar os 10 principais municípios
df_municipio.show(10, truncate=False)

# Criar uma View Temporária para visualização no Databricks SQL/Gráficos
df_municipio.createOrReplaceTempView("despesas_por_municipio")

# 4. Geração de Gráficos (Usando Matplotlib/Pandas para visualização em Notebooks)
# ATENÇÃO: Isso só é recomendado para DataFrames menores (após a agregação).

import matplotlib.pyplot as plt
import seaborn as sns

# Configuração de estilo
sns.set_style("whitegrid")

# --- Gráfico 1: Despesas por Partido (Gráfico de Barras) ---
try:
    # Coletar os dados (apenas os 10 primeiros para um gráfico limpo)
    df_partido_pd = df_partido.limit(10).toPandas()

    plt.figure(figsize=(12, 6))
    sns.barplot(x="SG_PARTIDO", y="Total_Despesas", data=df_partido_pd, palette="viridis")
    plt.title("Top 10 Partidos por Total de Despesas Contratadas (Dados Limpos)")
    plt.xlabel("Partido")
    plt.ylabel("Total de Despesas (R$)")
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()
except Exception as e:
    print(f"Aviso: Não foi possível gerar o Gráfico de Barras (Partido). Erro: {e}")

# --- Gráfico 2: Despesas por Cargo (Gráfico de Pizza) ---
try:
    # Coletar os dados (todos os cargos, mas filtrar os muito pequenos)
    df_cargo_pd = df_cargo.toPandas()
    
    # Calcular a porcentagem
    total_geral = df_cargo_pd['Total_Despesas'].sum()
    df_cargo_pd['Porcentagem'] = (df_cargo_pd['Total_Despesas'] / total_geral) * 100
    
    # Agrupar categorias pequenas em 'Outros'
    threshold = 1.0 # Agrupar tudo abaixo de 1%
    df_cargo_pie = df_cargo_pd[df_cargo_pd['Porcentagem'] >= threshold]
    outros_soma = df_cargo_pd[df_cargo_pd['Porcentagem'] < threshold]['Total_Despesas'].sum()
    
    if outros_soma > 0:
        outros_porcentagem = (outros_soma / total_geral) * 100
        # Usar loc para evitar SettingWithCopyWarning
        df_cargo_pie.loc[len(df_cargo_pie)] = ['Outros', outros_soma, outros_porcentagem]

    plt.figure(figsize=(10, 10))
    plt.pie(
        df_cargo_pie['Total_Despesas'], 
        labels=df_cargo_pie['DS_CARGO'], 
        autopct='%1.1f%%', 
        startangle=90, 
        wedgeprops={'edgecolor': 'black'}
    )
    plt.title("Distribuição Percentual das Despesas por Cargo (Dados Limpos)")
    plt.tight_layout()
    plt.show()
except Exception as e:
    print(f"Aviso: Não foi possível gerar o Gráfico de Pizza (Cargo). Erro: {e}")

# --- Gráfico 3: Despesas por Tipo de Fornecedor (Gráfico de Barras Horizontais) ---
try:
    # Coletar os dados (apenas os 10 principais)
    df_fornecedor_pd = df_fornecedor.limit(10).toPandas()

    plt.figure(figsize=(12, 8))
    sns.barplot(x="Total_Despesas", y="DS_TIPO_FORNECEDOR", data=df_fornecedor_pd, palette="magma")
    plt.title("Top 10 Tipos de Fornecedor por Total de Despesas Contratadas (Dados Limpos)")
    plt.xlabel("Total de Despesas (R$)")
    plt.ylabel("Tipo de Fornecedor")
    plt.tight_layout()
    plt.show()
except Exception as e:
    print(f"Aviso: Não foi possível gerar o Gráfico de Barras Horizontais (Fornecedor). Erro: {e}")

# FIM DO SCRIPT
print("\nScript de análise e visualização concluído. Verifique as Views Temporárias e os gráficos gerados.")