In [1]:
from dotenv import load_dotenv
import os
load_dotenv()
print("API KEY carregada")

import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

api_key = os.getenv('api_key')

# --- 1. Inicialização do Spark ---
# Cria uma sessão Spark, que é o ponto de entrada para qualquer funcionalidade do Spark.
# O appName ajuda a identificar sua aplicação na UI do Spark.
spark = SparkSession.builder \
    .appName("ColetaAPIGovernoAuxilioEmergencial") \
    .getOrCreate()

print("Sessão Spark iniciada com sucesso.")

# --- 2. Extração (Extract) ---

# Parâmetros para a API do Portal da Transparência
ano_mes = "201907" # Data dos dados que serão coletados padrão ANO/MÊS 
codigo_ibge = "3150109" # Código IBGE insira aqui o código do município desejado
pagina = 1 # Página dos dados

#url da api que será consultada
api_url = f"https://api.portaldatransparencia.gov.br/api-de-dados/bolsa-familia-por-municipio?mesAno={ano_mes}&codigoIbge={codigo_ibge}&pagina={pagina}"

# Chave da API no header para a consulta
headers = {
    "Content-Type": "application/json",
    "chave-api-dados": api_key
}

print(f"Coletando dados da API: {api_url}")

try:
    response = requests.get(api_url, headers=headers)
    response.raise_for_status()  # Lança um erro para respostas com status 4xx/5xx
    dados_json = response.json()
    print("Dados coletados com sucesso!")
except requests.exceptions.RequestException as e:
    print(f"Erro ao chamar a API: {e}")
    spark.stop()

# Se a resposta estiver vazia, não há o que processar.
if not dados_json:
    print("Nenhum dado retornado pela API. Encerrando o processo.")
    spark.stop()

# --- 3. Transformação (Transform) ---
# Agora, vamos converter os dados JSON em um DataFrame Spark e tratá-los.

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("dataDisponibilizacao", StringType(), True),
    StructField("municipio", StructType([
        StructField("codigoIBGE", StringType(), True),
        StructField("nomeIBGE", StringType(), True),
        StructField("pais", StringType(), True),
        StructField("uf", StructType([
            StructField("sigla", StringType(), True),
            StructField("nome", StringType(), True)
        ]))
    ])),
    StructField("tipo", StructType([
        StructField("id", IntegerType(), True),
        StructField("descricao", StringType(), True),
        StructField("descricaoDetalhada", StringType(), True)
    ])),
    StructField("valor", DoubleType(), True),
    StructField("quantidadeBeneficiados", IntegerType(), True)
])

# Criamos um RDD (Resilient Distributed Dataset) a partir do JSON e depois um DataFrame.
rdd = spark.sparkContext.parallelize(dados_json)
df_bruto = spark.read.json(rdd, schema=schema)

print("Schema do DataFrame bruto:")
df_bruto.printSchema()
df_bruto.show(5, False)

# Aplicando transformações para limpar e organizar os dados
df_tratado = df_bruto.select(
    col("id").alias("id_pagamento"),
    to_date(col("dataDisponibilizacao"), "dd/MM/yyyy").alias("data_disponibilizacao"),
    col("municipio.codigoIBGE").alias("codigo_ibge"),
    col("municipio.nomeIBGE").alias("nome_municipio"),
    col("municipio.uf.sigla").alias("uf"),
    col("tipo.descricao").alias("tipo_beneficio"),
    col("valor").alias("valor_total_pago"),
    col("quantidadeBeneficiados").alias("qtd_beneficiados")
).withColumn("ano_mes_referencia", lit(ano_mes)) # Adiciona uma coluna com o período da consulta

print("Schema do DataFrame tratado:")
df_tratado.printSchema()

print("Amostra dos dados tratados:")
df_tratado.show(5, truncate=False)

# --- 4. Carga (Load) ---
# Salvamos o DataFrame tratado no HDFS em formato Parquet.
# Parquet é um formato colunar otimizado para performance em análises com Spark.

# O caminho no HDFS onde os dados serão salvos.
caminho_hdfs = f"./dados/gov/bolsa_familia/codigo_ibge={codigo_ibge}/ano_mes={ano_mes}"

print(f"Salvando dados tratados no HDFS em: {caminho_hdfs}")

# 'overwrite' substitui os dados se o diretório já existir.
# 'partitionBy' é útil para organizar os dados, mas como já filtramos por ano_mes,
# incluímos ele no caminho para criar uma partição manual.
df_tratado.write.mode("overwrite").parquet(caminho_hdfs)

print("Dados salvos com sucesso no HDFS!")

spark.stop()
print("Sessão Spark finalizada.")

API KEY carregada
Sessão Spark iniciada com sucesso.
Coletando dados da API: https://api.portaldatransparencia.gov.br/api-de-dados/bolsa-familia-por-municipio?mesAno=201907&codigoIbge=3150109&pagina=1
Dados coletados com sucesso!
Schema do DataFrame bruto:
root
 |-- id: integer (nullable = true)
 |-- dataDisponibilizacao: string (nullable = true)
 |-- municipio: struct (nullable = true)
 |    |-- codigoIBGE: string (nullable = true)
 |    |-- nomeIBGE: string (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- uf: struct (nullable = true)
 |    |    |-- sigla: string (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |-- tipo: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- descricao: string (nullable = true)
 |    |-- descricaoDetalhada: string (nullable = true)
 |-- valor: double (nullable = true)
 |-- quantidadeBeneficiados: integer (nullable = true)

+--------+--------------------+-------------------------------------------+

In [27]:
import requests
from dotenv import load_dotenv
import os

api_key = os.getenv("api_key")

r = requests.get(
    "https://api.portaldatransparencia.gov.br/api-de-dados/auxilio-emergencial-por-municipio?mesAno=202101&codigoIbge=3550308&pagina=1",
    headers = {
    "chave-api-dados": api_key
})

print(r.status_code)
print(r.text[:500])


401
{"Erro na API":"Chave de API não informada! Para obter a chave acesse http://www.portaldatransparencia.gov.br/api-de-dados/cadastrar-email"}
