<a href="https://colab.research.google.com/github/FabioRFarias/MagnaSistemas/blob/main/Case_IBGE_Magna_Sistemas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Fazer as importações necessárias (exceto Spark que precisa da preparação prévia)
import os
import requests
from io import BytesIO
import zipfile
import pandas as pd


In [None]:
# Instalar Java
def install_java():
  !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null      # instalar openjdk
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"   # definir variável de ambiente
  !java -version       #indicar a versão do Java
install_java()

In [10]:
# download spark3.5.0
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
# descompactar
!tar xf spark-3.5.0-bin-hadoop3.tgz
# definir variável de ambiente
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
# instalar findspark
!pip install -q findspark
# Importar findspark (e inicializar) e demais itens de Pyspark necessários
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col

In [7]:
# como não achei uma API mais certa para uso acabei usando o FTP dp próprio IBGE entretanto ele só possuia arquivos em excel
# e o PySpark não tem suporte nativo a ele por isso algumas trnasformações iniciais foram realizadas no Python

# Criar uma sessão Spark
spark = SparkSession.builder.appName("IBGE_PIB").getOrCreate()

# URL do arquivo ZIP contendo os dados do PIB
url_pib_2021 = "https://ftp.ibge.gov.br/Pib_Municipios/2021/base/base_de_dados_2010_2021_xlsx.zip"

# Baixar o arquivo ZIP
response = requests.get(url_pib_2021)
with zipfile.ZipFile(BytesIO(response.content)) as z:
    z.extractall("/tmp")

# Listar os arquivos extraídos
extracted_files = ["/tmp/" + file for file in z.namelist()]

# Função para extrair dados do PIB usando Pandas (esta preparado para mais de um dataframe)
def extract_pib_data_pandas(files):
    # Ler arquivos XLS/XLSX usando Pandas
    dfs = [pd.read_excel(file) for file in files]

    # Concatenar DataFrames
    combined_df = pd.concat(dfs, ignore_index=True)

    # Converter DataFrame do Pandas para DataFrame do Spark
    spark_df = spark.createDataFrame(combined_df)

    # Selecionar apenas as colunas desejadas e renomeá-las
    spark_df = spark_df.select("ano", "código do município", "Produto Interno Bruto per capita, \na preços correntes\n(R$ 1,00)"). \
                        withColumnRenamed("ano", "ano"). \
                        withColumnRenamed("código do município", "cod_municipio"). \
                        withColumnRenamed("Produto Interno Bruto per capita, \na preços correntes\n(R$ 1,00)", "PIB")

    # Limpar registros duplicados
    spark_df = spark_df.dropDuplicates()

    # Filtrar dados para ano >= 2019 - para casar com população
    spark_df = spark_df.filter(spark_df.ano >= 2019)

    return spark_df

# Extrair e exibir os dados do PIB
pib_df_2021_spark = extract_pib_data_pandas(extracted_files)
pib_df_2021_spark.show()


+----+-------------+--------+
| ano|cod_municipio|     PIB|
+----+-------------+--------+
|2019|      1500404|10091.01|
|2019|      1504000|14645.33|
|2019|      1713700|16747.82|
|2019|      2100154| 6734.48|
|2019|      2105104|  5960.7|
|2019|      2303402|  7457.7|
|2019|      2314003| 9770.93|
|2019|      2705101|12398.77|
|2019|      2916807|14304.94|
|2019|      3122454| 6948.03|
|2019|      3130101|18185.47|
|2019|      3150901|12909.87|
|2019|      3158102| 8245.56|
|2019|      3168507| 12761.7|
|2019|      3301876|18973.37|
|2019|      3507159| 15069.0|
|2019|      3511607|25483.25|
|2019|      3531506|29753.61|
|2019|      3547502|25058.05|
|2019|      4124103|27059.87|
+----+-------------+--------+
only showing top 20 rows



In [8]:
# URLs dos arquivos de população usei os 3 últimos anos
urls = [
    "https://ftp.ibge.gov.br/Perfil_Municipios/2021/Base_de_Dados/Base_MUNIC_2021.xlsx",
    "https://ftp.ibge.gov.br/Perfil_Municipios/2020/Base_de_Dados/Base_MUNIC_2020.xlsx",
    "https://ftp.ibge.gov.br/Perfil_Municipios/2019/Base_de_Dados/Base_MUNIC_2019_20210817.xlsx"
]

# Função para baixar o arquivo a partir da URL
def baixar_arquivo(url):
    response = requests.get(url)
    return BytesIO(response.content)

# Função para criar o DataFrame a partir do arquivo Excel e parte do nome da planilha/URL
def criar_dataframe(arquivo, parte_nome_planilha):
    for sheet_name in pd.ExcelFile(arquivo).sheet_names:
        if parte_nome_planilha.lower() in sheet_name.lower() or sheet_name.lower() in parte_nome_planilha.lower():
            df = pd.read_excel(arquivo, sheet_name=sheet_name)
            df['Ano'] = int(ano)  # Adiciona a coluna "Ano"
            return df
    raise ValueError(f"Planilha contendo '{parte_nome_planilha}' não encontrada no arquivo.")

# Lista para armazenar os DataFrames
dataframes = []

# Loop através das URLs
for url in urls:
    # Determinar o ano do arquivo
    ano = url.split("/")[-3]  # Obter o ano a partir da URL

    # Baixar o arquivo
    arquivo = baixar_arquivo(url)

    # Determinar a parte do nome da planilha com base no ano - tem um erro de digitação no nome da planilha do ano de 2020
    parte_nome_planilha = "Variáveis externa" if "2020" in ano else "Variáveis externas"

    # Criar o DataFrame e realizar transformações
    df = criar_dataframe(arquivo, parte_nome_planilha)

    # Renomear colunas - os nomes de cabeçalho nã obatem em alguns casos para que seja feito o "union"
    df.rename(columns={"NOME MUNIC": "Municipio", "Mun": "Municipio", "POP EST": "Populacao", "Pop estimada 2021": "Populacao", "CLASSE POP": "Faixa_pop", "REGIAO": "Regiao", "COD UF": "Cod UF"}, inplace=True)

    # Adicionar DataFrame à lista
    dataframes.append(df)

# Juntar os DataFrames em um único DataFrame
df_final = pd.concat(dataframes, ignore_index=True)

# Exibir informações do DataFrame final
print("DataFrame Final:")
print(df_final.head())

DataFrame Final:
    CodMun  UF  Cod UF             Municipio  Populacao  \
0  1100015  RO      11  Alta Floresta DOeste      22516   
1  1100023  RO      11             Ariquemes     111148   
2  1100031  RO      11                Cabixi       5067   
3  1100049  RO      11                Cacoal      86416   
4  1100056  RO      11            Cerejeiras      16088   

               Faixa_pop     Regiao   Ano  
0    4 - 20001 até 50000  1 - Norte  2021  
1  6 - 100001 até 500000  1 - Norte  2021  
2     2 - 5001 até 10000  1 - Norte  2021  
3   5 - 50001 até 100000  1 - Norte  2021  
4    3 - 10001 até 20000  1 - Norte  2021  


In [11]:
# Esquema para o DataFrame do PySpark - População
schema = StructType([
    StructField("CodMun", IntegerType(), True),
    StructField("UF", StringType(), True),
    StructField("Cod UF", IntegerType(), True),
    StructField("Municipio", StringType(), True),
    StructField("Populacao", IntegerType(), True),
    StructField("Faixa_pop", StringType(), True),
    StructField("Regiao", StringType(), True),
    StructField("Ano", IntegerType(), True)
])

# Criar DataFrame do PySpark
df_pyspark = spark.createDataFrame(df_final, schema=schema)

# Exibir DataFrame do PySpark
print("DataFrame Final no PySpark:")
df_pyspark.show()

DataFrame Final no PySpark:
+-------+---+------+--------------------+---------+--------------------+---------+----+
| CodMun| UF|Cod UF|           Municipio|Populacao|           Faixa_pop|   Regiao| Ano|
+-------+---+------+--------------------+---------+--------------------+---------+----+
|1100015| RO|    11|Alta Floresta DOeste|    22516| 4 - 20001 até 50000|1 - Norte|2021|
|1100023| RO|    11|           Ariquemes|   111148|6 - 100001 até 50...|1 - Norte|2021|
|1100031| RO|    11|              Cabixi|     5067|  2 - 5001 até 10000|1 - Norte|2021|
|1100049| RO|    11|              Cacoal|    86416|5 - 50001 até 100000|1 - Norte|2021|
|1100056| RO|    11|          Cerejeiras|    16088| 3 - 10001 até 20000|1 - Norte|2021|
|1100064| RO|    11|   Colorado do Oeste|    15213| 3 - 10001 até 20000|1 - Norte|2021|
|1100072| RO|    11|          Corumbiara|     7052|  2 - 5001 até 10000|1 - Norte|2021|
|1100080| RO|    11|       Costa Marques|    19255| 3 - 10001 até 20000|1 - Norte|2021|
|110

In [12]:
# Criar modelo de dados

# 1. Criar Tabelas Dimensionais

# Dimensão Município
dim_municipio = df_pyspark.select("CodMun", "Municipio", "UF", "Regiao").distinct()

# Dimensão Faixa Populacional
dim_faixa_pop = df_pyspark.select("Faixa_pop").distinct()

# Dimensão Ano
dim_ano = df_pyspark.select("Ano").distinct()

# Dimensão Região
dim_regiao = df_pyspark.select("Regiao").distinct()

# Dimensão Código Município (para PIB)
dim_cod_municipio_pib = pib_df_2021_spark.select("cod_municipio").distinct()

# 2. Criar Tabelas de Fato

# Fato População
fato_populacao = df_pyspark.select("Ano", "CodMun", "Populacao", "Faixa_pop")

# Fato PIB
fato_pib = pib_df_2021_spark.select("ano", "cod_municipio", "PIB")

# 3. Criar Relacionamentos

# Relacionamento Ano (Fato População e Fato PIB)
rel_ano = fato_populacao.join(fato_pib, fato_populacao["Ano"] == fato_pib["ano"]).drop(fato_pib["ano"])

# Relacionamento Município (Fato População e Dimensão Município)
rel_municipio = rel_ano.join(dim_municipio, rel_ano["CodMun"] == dim_municipio["CodMun"]).drop(dim_municipio["CodMun"])

# Relacionamento Faixa Populacional (Fato População e Dimensão Faixa Populacional)
rel_faixa_pop = rel_municipio.join(dim_faixa_pop, rel_municipio["Faixa_pop"] == dim_faixa_pop["Faixa_pop"]).drop(dim_faixa_pop["Faixa_pop"])

# Relacionamento Região (Fato População e Dimensão Região)
rel_regiao = rel_faixa_pop.join(dim_regiao, rel_faixa_pop["Regiao"] == dim_regiao["Regiao"]).drop(dim_regiao["Regiao"])

# Relacionamento Código Município (Fato PIB e Dimensão Código Município PIB)
rel_cod_municipio_pib = fato_pib.join(dim_cod_municipio_pib, fato_pib["cod_municipio"] == dim_cod_municipio_pib["cod_municipio"]).drop(dim_cod_municipio_pib["cod_municipio"])

# Exibir as Tabelas Criadas
print("Dimensão Município:")
dim_municipio.show()

print("Dimensão Faixa Populacional:")
dim_faixa_pop.show()

print("Dimensão Ano:")
dim_ano.show()

print("Dimensão Região:")
dim_regiao.show()

print("Dimensão Código Município (para PIB):")
dim_cod_municipio_pib.show()

print("Fato População:")
fato_populacao.show()

print("Fato PIB:")
fato_pib.show()

print("Relacionamento Ano:")
rel_ano.show()

print("Relacionamento Município:")
rel_municipio.show()

print("Relacionamento Faixa Populacional:")
rel_faixa_pop.show()

print("Relacionamento Região:")
rel_regiao.show()

print("Relacionamento Código Município (para PIB):")
rel_cod_municipio_pib.show()

Dimensão Município:
+-------+--------------------+---+----------------+
| CodMun|           Municipio| UF|          Regiao|
+-------+--------------------+---+----------------+
|2107100|              Morros| MA|    2 - Nordeste|
|2111003|    São João Batista| MA|    2 - Nordeste|
|2202000|    Buriti dos Lopes| PI|    2 - Nordeste|
|2411601|  São Bento do Norte| RN|    2 - Nordeste|
|2601102|           Araripina| PE|    2 - Nordeste|
|2607901|Jaboatão dos Guar...| PE|    2 - Nordeste|
|2806206|             Salgado| SE|    2 - Nordeste|
|2909208|     Coronel João Sá| BA|    2 - Nordeste|
|2930154|    Serra do Ramalho| BA|    2 - Nordeste|
|3121407|Desterro de Entre...| MG|     3 - Sudeste|
|3123601|         Elói Mendes| MG|     3 - Sudeste|
|3506300|Bernardino de Campos| SP|     3 - Sudeste|
|3525409|           Jeriquara| SP|     3 - Sudeste|
|3527900|             Lutécia| SP|     3 - Sudeste|
|3543105|   Ribeirão Corrente| SP|     3 - Sudeste|
|3554755|             Trabiju| SP|     3 - S