### Instalação de bibliotecas

In [None]:
%pip install pandas requests

### Baixando conteúdo do site dos Dados Abertos e salvando em arquivos csv

In [None]:
# import requests

# # Função para ler url e salvar conteúdo em csv 
# def fetch_and_save_in_csv(url: str, file_name: str):
#     response = requests.get(url)

#     if response.status_code == 200:
#         with open(file_name, 'wb') as file:
#             file.write(response.content)
#     else:
#         print(f'Falha ao baixar os dados. Status code: {response.status_code}')

# # Lendo urls do dados abertos de urls.txt e salvando em uma lista
# with open('urls.txt', 'r') as file:
#     urls_list = file.readlines()

# used_dir = "./works/"
# name_file = "pbf_ano"

# # Chamando a função fetch_and_save_in_csv para url em urls_list
# for index, url in enumerate(urls_list):
#     url = url.strip()
#     file_name = used_dir + name_file + "_" + str(index) + ".csv"
#     if url:
#         fetch_and_save_in_csv(url, file_name)

### Configurando caminhos


In [None]:
import os

# URL da fonte de dados
URL = 'https://aplicacoes.mds.gov.br/sagi/servicos/misocial/?fq=anomes_s:2024*&fl=codigo_ibge%2Canomes_s%2Cqtd_familias_beneficiarias_bolsa_familia_s%2Cvalor_repassado_bolsa_familia_s%2Cpbf_vlr_medio_benef_f&fq=valor_repassado_bolsa_familia_s%3A*&q=*%3A*&rows=100000&sort=anomes_s%20desc%2C%20codigo_ibge%20asc&wt=csv'

# Caminho local onde o arquivo será salvo
workdir = './works/'
workdir_files = os.listdir(workdir)
full_paths = [os.path.join(workdir, file) for file in workdir_files]
dataframes_list = []

# Teste com um único arquivo
default_file = 'bolsa-familia2024.csv'
file_path = workdir + default_file

# Caminho do driver jdbc
jdbc_driver_path = "/opt/trabalhos/etl-mdd/postgresql-42.7.1.jar"

### Importando JSON com informações de UF e Município

In [None]:
import json

uf_code_path = './utils/ibge-codes/uf-code.json'
municipios_code_path = './utils/ibge-codes/municipios-code.json'
uf_dict, municipios_dict = {}, {}

with open(uf_code_path, 'r') as file:
    uf_dict = json.load(file)

with open(municipios_code_path, 'r') as file:
    municipios_dict = json.load(file)


### Instalando pyspark


In [None]:
%pip install --user pyspark

### Criando sessão spark

In [None]:
from pyspark.sql import SparkSession
# Cria a sessão spark
spark_session = SparkSession.builder.appName('spark') \
                                    .config("spark.driver.extraClassPath", jdbc_driver_path) \
                                    .config("spark.executor.memory", "4g") \
                                    .getOrCreate()

### Lendo arquivo csv e montando dataframe do spark

In [None]:
# df = spark_session.read.options(header="true", delimiter=",", encoding="ISO-8859-1", inferSchema=True).csv(file_path)

for arquivo in full_paths:
    dataframes_list.append(spark_session.read.options(header="true", delimiter=",", encoding="ISO-8859-1", inferSchema=True).csv(arquivo))

column_changes = [
    ("ibge", "codigo_ibge"),
    ("anomes", "anomes_s"),
    ("qtd_familias_beneficiarias_bolsa_familia", "qtd_familias_beneficiarias_bolsa_familia_s"),
    ("valor_repassado_bolsa_familia", "valor_repassado_bolsa_familia_s")
]

### Adequando nomes de coluna
for index, ano_csv in enumerate(dataframes_list):
    for nome_antigo, nome_padrao in column_changes:
        if nome_antigo in ano_csv.columns:
            ano_csv = ano_csv.withColumnRenamed(nome_antigo, nome_padrao)

    dataframes_list[index] = ano_csv

for ano_csv in dataframes_list:
    ano_csv.show()   

# df.show()

In [None]:
# Retirado de: https://medium.com/@salibi/como-validar-o-c%C3%B3digo-de-munic%C3%ADpio-do-ibge-90dc545cc533#:~:text=O%20C%C3%B3digo%20de%20Munic%C3%ADpio%20do%20IBGE%20%C3%A9%20um%20identificador%20%C3%BAnico,o%20%C3%BAltimo%20d%C3%ADgito%2C%20um%20verificador.

def last_digit_ibge(cod6: str):
   city_exceptions = {
                      '220191': "2201919",
                      '290630': "2202251",
                      '220198': "2201988",
                      '261153': "2611533",
                      '311783': "3117836",
                      '315213': "3152131",
                      '430587': "4305871",
                      '520393': "5203939",
                      '520396': "5203962",
                      '220225': "2202251",
                     }
   
   if cod6 in city_exceptions:
      return city_exceptions.get(cod6)

   a = int(cod6[0])
   b = (int(cod6[1]) * 2) % 10 + (int(cod6[1]) * 2) // 10
   c = int(cod6[2])
   d = (int(cod6[3]) * 2) % 10 + (int(cod6[3]) * 2) // 10
   e = int(cod6[4])
   f = (int(cod6[5]) * 2) % 10 + (int(cod6[5]) * 2) // 10
   digit = (10 - (a + b + c + d + e + f) % 10) % 10
   
   return cod6 + str(digit)


### Criando coluna de média para dados antes de 2023

In [None]:
from pyspark.sql.functions import round

for index, ano_csv in enumerate(dataframes_list):
    if 'pbf_vlr_medio_benef_f' not in ano_csv.columns:
        valor_medio_bolsa = ano_csv.valor_repassado_bolsa_familia_s / ano_csv.qtd_familias_beneficiarias_bolsa_familia_s
        print(valor_medio_bolsa)
        
        dataframes_list[index] = ano_csv.withColumn('pbf_vlr_medio_benef_f', round(valor_medio_bolsa, 2))


### Funções para criação de colunas para UF, Município e Ano

In [None]:
from pyspark.sql.functions import udf, col, substring, lpad
from pyspark.sql.types import StringType

def cria_coluna_uf(codigo_ibge):
    return  uf_dict.get(str(codigo_ibge)[0:2])

cria_coluna_uf_udf = udf(cria_coluna_uf, StringType())

def cria_coluna_municipio(codigo_ibge):
    return  municipios_dict.get(last_digit_ibge(str(codigo_ibge)))

cria_coluna_municipio_udf = udf(cria_coluna_municipio, StringType())

def cria_coluna_ano(anomes_s):
    return  str(anomes_s)[0:4]

cria_coluna_ano_udf = udf(cria_coluna_ano, StringType())


### Adicionando colunas UF, Município e Ano

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

for index, ano_csv in enumerate(dataframes_list):
    ano_csv = ano_csv.withColumn("uf", cria_coluna_uf_udf(col("codigo_ibge")))
    ano_csv = ano_csv.withColumn("municipio", cria_coluna_municipio_udf(col("codigo_ibge")))
    ano_csv = ano_csv.withColumn("ano", cria_coluna_ano_udf(col("anomes_s")))
    ano_csv = ano_csv.withColumn("valor_repassado_bolsa_familia_s", F.col("valor_repassado_bolsa_familia_s").cast(IntegerType()))
    ano_csv = ano_csv.withColumn('mes', substring(col('anomes_s'), 5, 2).cast('int'))
    ano_csv = ano_csv.withColumn("mes_formatado", lpad(col("mes"), 2, "0"))
    ano_csv = ano_csv.drop('anomes_s', 'mes')
    ano_csv = ano_csv.withColumnRenamed('mes_formatado', 'mes')
    
    dataframes_list[index] = ano_csv
    

### Unificando dataframes de diferentes anos

In [None]:

dataframe_unificado =  dataframes_list[0]

for dataframe in dataframes_list[1:]:
    dataframe_unificado = dataframe_unificado.unionAll(dataframe)

dataframe_unificado.show()

dataframe_unificado = dataframe_unificado.coalesce(1)

output_path = "resultado/pbf-2014-2024.csv"
dataframe_unificado.write.mode("overwrite").csv(output_path, header=True)
# Stop the SparkSession
spark_session.stop()

### Configurando conexão com o banco

In [None]:
hostname_or_ip = "localhost"
port = "443"
db = "misocial"
user = "pbf"
password = "pbf"

db_url = "jdbc:postgresql://" + hostname_or_ip + ":" + port + "/" + db

properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver", 
}


### Transferindo dataframe para o banco

In [None]:
dataframe_unificado.write.jdbc(url=db_url, table="pbf", mode="overwrite", properties=properties)

### Desalocando sessão do spark


In [None]:
# Stopping spark session
spark_session.stop()