### Compile IBGE municipal indexes
(Spark)

* Objetivo: filtrar apenas alguns indicadores para investigar

* Inputs:
    * /datasets/pesquisas_ibge_cidades/df_pesquisa/ (parquet)
    * /datasets/pesquisas_ibge_cidades/df_indicador/ (parquet)
    * /datasets/pesquisas_ibge_cidades/df_resultado/ (parquet)
* Outputs:
    * /datasets/prepared/df_ind_ibge_complete/ (parquet)
    * /datasets/prepared/pd_df_ind_ibge_complete.csv

In [1]:
# Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.window import Window

spark = SparkSession \
            .builder \
            .config("spark.sql.broadcastTimeout", "360000") \
            .config('spark.sql.execution.arrow.enabled', 'false') \
            .config("spark.driver.memory", '14G') \
            .config("spark.executor.memory", '14G') \
            .config("spark.driver.maxResultSize", '4G') \
            .getOrCreate()

In [2]:
import pandas as pd

pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

### 0. Constantes

In [3]:
main_path = 'D:/data_dash_covid/datasets/'

### 1. Read data

In [4]:
df_pesquisa = spark.read.parquet(main_path+'pesquisas_ibge_cidades/df_pesquisa/')\
                    .withColumnRenamed('id','id_pesquisa')\
                    .withColumnRenamed('nome','nome_pesquisa')

# print(df_pesquisa.count(), len(df_pesquisa.columns))

In [5]:
df_indicador = spark.read.parquet(main_path+'pesquisas_ibge_cidades/df_indicador/')\
                    .withColumnRenamed('id','id_indicador')\
                    .withColumnRenamed('nome','nome_indicador')

# print(df_indicador.count(), len(df_indicador.columns))

In [6]:
df_resultado = spark.read.parquet(main_path+'pesquisas_ibge_cidades/df_resultado/')

# print(df_resultado.count(), len(df_resultado.columns))

In [7]:
df_details_ind_ibge = spark.read.format('csv').option('header', 'true')\
                           .load('D:/data_dash_covid/datasets/prepared/details_ind_ibge/*.csv', sep=';')

# print(df_details_ind_ibge.count(), len(df_details_ind_ibge.columns))

### 2. Select Pesquisas

Dentre as 77 pesquisas disponíveis, selecionar as que têm a ver com Saúde / Educação / Economia

In [8]:
# df_pesquisa.toPandas()

In [9]:
ids_pesquisas_gerais = [1,22,23,33,36,37,44,45,10058,10059,10079,10091]
ids_pesquisas_saude = [17,30,32,39,47,10055,10087,10093]
ids_pesquisas_educacao = [13,40,10056,]
ids_pesquisas_economia = [21,29,38,46,10062,10063,10075,10083]

### 3. Join Resultados < Indicadores < Details < Pesquisas

Só é possível dar join (Pesq e Ind) pela de Resultados.
* Agrupa pelo maior ano disponível de cada indicador
* Join Pesquisas, Indicadores e Details
* Seleciona só os indicadores das pesquisas previamente escolhidas
* Remove os indicadores anteriores a 2008
* Remove os indicadores muito específicos (nível acima de 4)

In [11]:
df_resultado_max = df_resultado.groupBy('id_pesquisa','id_indicador')\
                                .agg(f.max('periodo').alias('periodo'))\
                                .join(df_indicador, 'id_indicador', 'left')\
                                .join(df_pesquisa, 'id_pesquisa', 'left')\
                                .filter(f.col('id_pesquisa').isin(ids_pesquisas_gerais + ids_pesquisas_saude + 
                                                                  ids_pesquisas_educacao + ids_pesquisas_economia))\
                                .filter(f.col('periodo') >= 2008)\
                                .join(df_details_ind_ibge, 'id_indicador', 'left')\
                                .filter(f.col('nivel_indicador') <= 4)\
                                .orderBy('id_pesquisa','id_indicador')

In [12]:
# Quantos indicadores afinal

df_resultado_max.groupBy('nivel_indicador')\
                .agg(f.countDistinct('id_indicador').alias('count'))\
                .orderBy(f.desc('count')).toPandas()

Unnamed: 0,nivel_indicador,count
0,4,110
1,3,108
2,2,39
3,1,22


### 4. Join df_resultado_max com o geral, para pegar o valor (resultado) de cada cidade

In [13]:
df_ind_ibge_complete = df_resultado_max.filter(f.col('nivel_indicador').isNotNull())\
                                       .join(df_resultado.select('id_indicador','periodo','id_cidade','resultado',),
                                             ['id_indicador','periodo'], 'left')

In [14]:
df_ind_ibge_complete.count()

1206670

In [15]:
df_ind_ibge_complete.printSchema()

root
 |-- id_indicador: integer (nullable = true)
 |-- periodo: integer (nullable = true)
 |-- id_pesquisa: integer (nullable = true)
 |-- nome_indicador: string (nullable = true)
 |-- nome_pesquisa: string (nullable = true)
 |-- arvore_indicador: string (nullable = true)
 |-- descri_indicador: string (nullable = true)
 |-- nivel_indicador: string (nullable = true)
 |-- nome_completo_indicador: string (nullable = true)
 |-- id_cidade: integer (nullable = true)
 |-- resultado: string (nullable = true)



In [16]:
df_ind_ibge_complete.write.parquet(main_path+'prepared/df_ind_ibge_complete/', mode='overwrite')

In [None]:
df_ind_ibge_complete.toPandas().to_csv(main_path+'prepared/pd_df_ind_ibge_complete.csv', index=False, encoding='utf-8')