# Projeto Engenharia de Dados - Câmara dos Deputados

Pipeline ponta a ponta com governança, explorando discursos e proposições da Câmara. Inclui storytelling, validação, enriquecimento, análises textuais e visuais de alta qualidade mesmo na Free Tier.


## 1. Setup do Ambiente

Configuração do ambiente, importação de bibliotecas e criação de diretório local para dados.


In [0]:
import os
import requests
import json
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, current_timestamp, month, explode, split, lower, trim
import matplotlib.pyplot as plt
import seaborn as sns

spark = SparkSession.builder.getOrCreate()
LOCAL_PATH = "./data"
if not os.path.exists(LOCAL_PATH):
    os.mkdir(LOCAL_PATH)
today = datetime.now().strftime('%Y-%m-%d')
BASE_URL = "https://dadosabertos.camara.leg.br/api/v2"
print(f"Data da execução: {today}")


## 2. Ingestão da API com Paginação (Bronze)

Busca paginada dos dados via API:
- Deputados (lista geral)
- Discursos de deputados (paginado para cada deputado)
- Proposições (ano 2025, paginado)


In [0]:
def baixar_paginado(url_base, params_base, nome_arquivo_prefixo, max_paginas=5):
    pagina = 1
    limite = 100
    dados_completos = []
    while pagina <= max_paginas:
        params = params_base.copy()
        params.update({"itens": limite, "pagina": pagina})
        resposta = requests.get(url_base, params=params)
        resposta.raise_for_status()
        dados = resposta.json().get("dados", [])
        if not dados:
            break
        dados_completos.extend(dados)
        print(f"{nome_arquivo_prefixo}: página {pagina} com {len(dados)} registros")
        pagina += 1
    arquivo_caminho = os.path.join(LOCAL_PATH, f"{nome_arquivo_prefixo}_{today}.json")
    with open(arquivo_caminho, "w", encoding="utf8") as f:
        json.dump(dados_completos, f, ensure_ascii=False)
    print(f"Arquivo salvo em: {arquivo_caminho}")
    return arquivo_caminho

# Baixar deputados
deputados_path = baixar_paginado(f"{BASE_URL}/deputados", {}, "deputados")

# Carregar lista de deputados
with open(deputados_path, 'r', encoding='utf8') as f:
    deputados_lista = json.load(f)

# Função para baixar discursos paginados por deputado
def baixar_discursos_paginados(id_deputado, max_paginas=5):
    pagina = 1
    limite = 100
    discursos_completos = []
    while pagina <= max_paginas:
        url_discursos = f"{BASE_URL}/deputados/{id_deputado}/discursos"
        params = {"itens": limite, "pagina": pagina}
        resp = requests.get(url_discursos, params=params)
        resp.raise_for_status()
        dados = resp.json().get("dados", [])
        if not dados:
            break
        discursos_completos.extend(dados)
        print(f"Deputado {id_deputado} - página {pagina} com {len(dados)} discursos")
        pagina += 1
    arquivo = os.path.join(LOCAL_PATH, f"discursos_deputado_{id_deputado}_{today}.json")
    with open(arquivo, "w", encoding="utf8") as f:
        json.dump(discursos_completos, f, ensure_ascii=False)
    print(f"Discursos deputado {id_deputado} salvos em {arquivo}")
    return arquivo

# Baixar discursos para os primeiros 10 deputados (ajuste conforme necessidade)
for dep in deputados_lista[:10]:
    baixar_discursos_paginados(dep['id'])

# Baixar proposições de 2025
proposicoes_path = baixar_paginado(f"{BASE_URL}/proposicoes", {"ano": 2025}, "proposicoes_2025")


## 3. Carregamento no Spark e Continuação

Carregamento dos dados baixados e preparação para transformações subsequentes.

In [0]:
# Carregar proposições
with open(proposicoes_path, "r", encoding="utf8") as f:
    proposicoes_json = json.load(f)
proposicoes_pd = pd.json_normalize(proposicoes_json)
df_proposicoes = spark.createDataFrame(proposicoes_pd)
df_proposicoes.printSchema()
df_proposicoes.show(3)

# Carregar todos discursos baixados
import glob
def carregar_discursos_completos():
    arquivos = glob.glob(f"{LOCAL_PATH}/discursos_deputado_*.json")
    todos_discursos = []
    for arquivo in arquivos:
        with open(arquivo, "r", encoding="utf8") as f:
            dados = json.load(f)
            for d in dados:
                todos_discursos.append(d)
    discursos_pd = pd.json_normalize(todos_discursos)
    return spark.createDataFrame(discursos_pd)

df_discursos = carregar_discursos_completos()
df_discursos.printSchema()
df_discursos.show(3)


## 4. Transformações Silver: Limpeza, Validação, Enriquecimento

Selecionamos campos, aplicamos validação, adicionamos `processado_em`, removendo discursos curtos.


In [0]:
from pyspark.sql.functions import col, length, current_timestamp

discursos_silver = df_discursos.select(
    col("dataHoraInicio"),
    col("tipoDiscurso"),
    col("sumario"),
    col("keywords"),
    col("transcricao")
).withColumn("processado_em", current_timestamp())

discursos_validos = discursos_silver.filter(length("transcricao") >= 100)
discursos_validos.show(3, truncate=False)


In [0]:
proposicoes_silver = df_proposicoes.select(
    col("id"),
    col("siglaTipo"),
    col("numero"),
    col("ano"),
    col("ementa")
).withColumn("processado_em", current_timestamp())

proposicoes_silver.show(3, truncate=False)


## 5. Modelagem Gold: Análises e Agrupamentos

Produzimos dataframes prontos para análise: discursos longos e frequências por proposição.



In [0]:
discursos_gold = discursos_validos.filter(length("transcricao") > 200)
proposicoes_gold = proposicoes_silver.groupBy("siglaTipo").count()
discursos_gold.show(3, truncate=False)
proposicoes_gold.show()


## 6. Persistência Local Free Tier

Outputs salvos em CSV para consulta posterior.


In [0]:
discursos_gold.toPandas().to_csv(os.path.join(LOCAL_PATH, "discursos_gold.csv"), index=False)
proposicoes_gold.toPandas().to_csv(os.path.join(LOCAL_PATH, "proposicoes_gold.csv"), index=False)

## Versão free não permite DBFS
#discursos_gold.write.mode("overwrite").parquet("./data/discursos_gold.parquet")
#proposicoes_gold.write.mode("overwrite").parquet("./data/proposicoes_gold.parquet")
##


## 7. Visualizações Avançadas e Storytelling

Visualizações com enriquecimento por UF, partido, meses e análise textual das palavras-chave.




In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import explode, split, lower, trim

# Customização dos gráficos
sns.set_theme(style="whitegrid")
plt.rcParams["axes.titlesize"] = 16
plt.rcParams["axes.labelsize"] = 13
plt.rcParams["xtick.labelsize"] = 12
plt.rcParams["ytick.labelsize"] = 12
plt.rcParams["figure.figsize"] = (8, 5)
plt.rcParams["figure.facecolor"] = "#fafafa"
plt.rcParams["savefig.facecolor"] = "#fafafa"
plt.rcParams['axes.spines.right'] = False
plt.rcParams['axes.spines.top'] = False


# Top 5 tipos de discurso
ax = sns.barplot(x="tipoDiscurso", y="count", data=df_tipos, palette="Blues_d")
for idx, value in enumerate(df_tipos["count"]):
    ax.text(idx, value + 0.05, str(int(value)), ha='center', fontsize=12, fontweight='bold')
ax.set_title("Top 5 Tipos de Discurso")
ax.set_xlabel("Tipo de Discurso")
ax.set_ylabel("Total de Discursos")
plt.tight_layout()
plt.savefig('./data/grafico_top_5_tipos.png', bbox_inches='tight')
plt.show()


# Distribuição de discursos por mês
ax = sns.lineplot(x="mes", y="count", data=df_mes, marker="o", color="navy")
for idx, value in enumerate(df_mes["count"]):
    ax.text(df_mes["mes"][idx], value + 0.5, str(int(value)), ha='center', fontsize=11)
ax.set_title("Discursos por Mês")
ax.set_xlabel("Mês")
ax.set_ylabel("Total de Discursos")
ax.set_xticks(df_mes["mes"])
plt.tight_layout()
plt.savefig('./data/grafico_discursos_mes.png', bbox_inches='tight')
plt.show()


# Top 10 palavras-chave dos discursos
ax = sns.barplot(x="count", y="keyword", data=df_kw_top, palette="crest_r")
for i, (kw, cnt) in enumerate(zip(df_kw_top["keyword"], df_kw_top["count"])):
    ax.text(cnt + 0.1, i, int(cnt), va='center', fontsize=11)
ax.set_title("Top 10 Palavras-chave dos Discursos")
ax.set_xlabel("Frequência")
ax.set_ylabel("Palavra-chave")
plt.tight_layout()
plt.savefig('./data/grafico_top_keywords.png', bbox_inches='tight')
plt.show()



# Proposições por tipo
palette = sns.color_palette("flare", len(df_tipo))
ax = sns.barplot(y="siglaTipo", x="count", data=df_tipo, palette=palette)
for idx, value in enumerate(df_tipo["count"]):
    ax.text(value + 15, idx, str(int(value)), va='center', fontsize=10)
ax.set_title("Proposições por Tipo (2025)")
ax.set_xlabel("Total")
ax.set_ylabel("Tipo")
plt.tight_layout()
plt.savefig('./data/grafico_proposicoes_tipo.png', bbox_inches='tight')
plt.show()




## 8. Perguntas Avançadas e Storytelling

O pipeline permite responder perguntas como:

- Qual partido apresenta mais discursos por deputado?
- Existe relação entre UF do deputado e engajamento nos debates?
- Quais são os temas com maior crescimento ao longo do tempo?
- Deputados que discursam mais são os mesmos que mais propõem projetos?

As visualizações permitem análises exploratórias para órgãos de controle, pesquisadores e sociedade.
