# API Data Processing with SparkSQL / Processamento de Dados da API com SparkSQL

## Initialize Spark Session / Inicializar Sessão Spark

In [None]:
from pyspark.sql import SparkSession

# Inicialização da sessão Spark
spark = SparkSession.builder     .appName("API Data Processing")     .getOrCreate()

print("Spark Session Initialized")

## Fetch Data from API / Buscar Dados da API

In [None]:
import requests
import pandas as pd

# Função para buscar dados da API e converter para Spark DataFrame
def fetch_data_from_api(url, spark):
    response = requests.get(url)
    data = response.json()
    pdf = pd.DataFrame(data['response']['docs'])
    return spark.createDataFrame(pdf)

# URL da API
url = "https://aplicacoes.mds.gov.br/sagi/servicos/misocial/?fl=codigo_ibge%2Canomes_s%20suas_repasse_mun_vl_gestao_suas_f%20suas_repasse_mun_vl_psb_f%20suas_repasse_mun_vl_pse_alta_complexidade_f%20suas_repasse_mun_vl_pse_media_complexidade_f%20suas_repasse_mun_vl_grupo_programas_f%20suas_repasse_mun_vl_sem_classificacao_f%20suas_repasse_mun_vl_fundo_a_fundo_f%20suas_repasse_mun_vl_total_fundo_f%2C%20suas_repasse_mun_vl_gestao_pbf_cadun_f%20suas_repasse_mun_vl_pcf_f%20suas_repasse_mun_vl_gestao_pab_f%20suas_repasse_mun_vl_pse_f%20suas_repasse_mun_dt_atualizacao_s&fq=suas_repasse_mun_dt_atualizacao_s%3A*&q=*%3A*&rows=100000&sort=anomes_s%20desc%2C%20codigo_ibge%20asc&wt=json&fq=anomes:%5B202401%20TO%20202404%5D&omitHeader=true"

# Buscando dados da API
df = fetch_data_from_api(url, spark)
df.show(5)

## Generate Fake Data / Gerar Dados Fictícios

In [None]:
from pyspark.sql.functions import lit
from faker import Faker
import pandas as pd

# Criar dados fictícios com Faker
fake = Faker()
faker_data = pd.DataFrame({
    'codigo_ibge': [row['codigo_ibge'] for row in df.select('codigo_ibge').distinct().collect()],
    'nome_municipio': [fake.city() for _ in range(df.select('codigo_ibge').distinct().count())]
})

# Converter para Spark DataFrame
faker_df = spark.createDataFrame(faker_data)
faker_df.show(5)

## Perform Joins / Realizar Joins

In [None]:
# Operações de Join
df_join = df.join(faker_df, on='codigo_ibge', how='inner')
df_left_join = df.join(faker_df, on='codigo_ibge', how='left')
df_join.show(5)
df_left_join.show(5)

## Filter by Latest Date / Filtrar pela Última Data

In [None]:
# Filtrando os registros pela última data
from pyspark.sql.functions import max as spark_max

max_date = df_left_join.agg(spark_max('suas_repasse_mun_dt_atualizacao_s')).collect()[0][0]
df_filtered = df_left_join.filter(df_left_join['suas_repasse_mun_dt_atualizacao_s'] == max_date)
df_filtered.show(5)

## Save to CSV / Salvar em CSV

In [None]:
# Salvando em CSV
df_filtered.coalesce(1).write.csv('filtered_data_spark.csv', header=True)