### **IMPORTAR BIBLIOTECAS**

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 3, Finished, Available, Finished)

### **PROCESSAR ARQUIVOS DE DICIONÁRIOS DA CAMADA BRONZE**

In [2]:
files_dicionarios = mssparkutils.fs.ls('Files/bronze/dicionarios')

files_dicionarios = [file.name for file in files_dicionarios]

print(files_dicionarios)

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 4, Finished, Available, Finished)

['Dicionario_PNAD_COVID_052020_20220621.xls', 'Dicionario_PNAD_COVID_062020_20220621.xls', 'Dicionario_PNAD_COVID_072020_20220621.xls', 'Dicionario_PNAD_COVID_082020_20220621.xls', 'Dicionario_PNAD_COVID_092020_20220621.xls', 'Dicionario_PNAD_COVID_102020_20220621.xls', 'Dicionario_PNAD_COVID_112020_20220621.xls']


In [3]:
all_dicionarios = []

nomes_colunas = {
    'Dicionário das variáveis da PNAD COVID': 'col1',
    'Unnamed: 1': 'col2',
    'Unnamed: 2': 'col3',
    'Unnamed: 3': 'col4',
    'Unnamed: 4': 'col5',
    'Unnamed: 5': 'col6',
}

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), False),
    StructField("col3", StringType(), True),
    StructField("col4", StringType(), True),
    StructField("col5", StringType(), True),
    StructField("col6", StringType(), True)
])

for file in files_dicionarios:
    mes = file.split('_')[3]

    path = f'/lakehouse/default/Files/bronze/dicionarios/{file}'

    df_dicionario_pd = pd.read_excel(path).dropna(subset=['Unnamed: 5'])
    df_dicionario_pd = df_dicionario_pd.rename(columns=nomes_colunas)
    df_dicionario_pd = df_dicionario_pd[df_dicionario_pd['col3'] != 'nº']
    df_dicionario_pd['mes'] = mes

    all_dicionarios.append(df_dicionario_pd)

# Junta todos os DataFrames pandas em um só
df_concat_pd = pd.concat(all_dicionarios, ignore_index=True)

# Converte para Spark DataFrame
df_dicionarios = spark.createDataFrame(df_concat_pd, schema=schema.add("mes", StringType(), True))

# Continua com os tratamentos Spark
df_dicionarios = df_dicionarios.withColumn("row_id", monotonically_increasing_id())

colunas_para_preencher = ["col1", "col2", "col3", "col4"]

for c in colunas_para_preencher:
    df_dicionarios = df_dicionarios.withColumn(
        c,
        when((col(c) == "NaN") | (isnan(c)), None).otherwise(col(c))
    )

window_spec = Window.orderBy("row_id").rowsBetween(Window.unboundedPreceding, 0)

for c in colunas_para_preencher:
    df_dicionarios = df_dicionarios.withColumn(
        f"{c}_filled",
        last(c, ignorenulls=True).over(window_spec)
    ).drop(c).withColumnRenamed(f"{c}_filled", c)

df_dicionarios = df_dicionarios.drop("row_id").select("col1", "col2", "col3", "col4", "col5", "col6", "mes").distinct()

df_dicionarios = df_dicionarios.withColumn(
    "classificacao",
    when(col('col2') == 'CAPITAL', lit('capital'))
    .when(substring(col("col2"), 1, 1) == "A", lit("caracteristicas_moradores"))
    .when(substring(col("col2"), 1, 1) == "B", lit("covid19"))
    .when(substring(col("col2"), 1, 1) == "C", lit("trabalho"))
    .when(substring(col("col2"), 1, 1) == "D", lit("renda"))
    .when(substring(col("col2"), 1, 1) == "F", lit("habitacao"))
    .when(col('col2') == 'UF', lit('uf'))
    .otherwise(lit('gerais'))
)

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 5, Finished, Available, Finished)

  Exception thrown when converting pandas.Series (object) with name 'col1' to Arrow Array (string).
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


In [4]:
df_uf = df_dicionarios.filter(col('classificacao') == 'uf') \
                      .select('col5', 'col6') \
                      .distinct() \
                      .withColumnsRenamed(
                        {
                            'col5': 'id',
                            'col6': 'estado'
                        }
                      ) \
                      .withColumn('id', col('id').cast("Integer"))

df_capital = df_dicionarios.filter(col('classificacao') == 'capital') \
                            .select('col5', 'col6') \
                            .distinct() \
                            .withColumnsRenamed(
                                {
                                    'col5': 'id',
                                    'col6': 'capital'
                                }
                            ) \
                            .withColumn('id', col('id').cast('Integer'))

df_uf.createOrReplaceTempView('uf_tmp')
df_capital.createOrReplaceTempView('capital_tmp')

query = """
SELECT
    u.*,
    c.capital
FROM
    uf_tmp u
LEFT JOIN
    capital_tmp c
ON
    u.id = c.id
"""

df_uf = spark.sql(query).orderBy(col('id').asc())

df_uf.write \
     .format('parquet') \
     .mode('overwrite') \
     .save('Files/silver/dicionarios/dict_uf')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 6, Finished, Available, Finished)

In [5]:
df_trabalho = df_dicionarios.filter(col('classificacao') == 'trabalho') \
                            .select('col2', 'col3', 'col4', 'col5', 'col6', 'mes') \
                            .distinct() \
                            .withColumnsRenamed(
                                {
                                    'col2': 'id',
                                    'col3': 'n',
                                    'col4': 'pergunta',
                                    'col5': 'respostaId',
                                    'col6': 'resposta'
                                }
                            ) \
                            .withColumn('respostaId', col('respostaId').cast('Integer'))

df_trabalho.write \
           .format('parquet') \
           .mode('overwrite') \
           .save('Files/silver/dicionarios/dict_caracteristicas')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 7, Finished, Available, Finished)

In [6]:
df_covid19 = df_dicionarios.filter(col('classificacao') == 'covid19') \
                           .select('col2', 'col3', 'col4', 'col5', 'col6', 'mes') \
                           .distinct() \
                           .withColumnsRenamed(
                                {
                                    'col2': 'id',
                                    'col3': 'n',
                                    'col4': 'pergunta',
                                    'col5': 'respostaId',
                                    'col6': 'resposta'
                                }
                            ) \
                           .withColumn('respostaId', col('respostaId').cast('Integer'))

df_covid19.write \
           .format('parquet') \
           .mode('overwrite') \
           .save('Files/silver/dicionarios/dict_covid19')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 8, Finished, Available, Finished)

In [7]:
df_caract = df_dicionarios.filter(col('classificacao') == 'caracteristicas_moradores') \
                           .select('col2', 'col3', 'col4', 'col5', 'col6', 'mes') \
                           .distinct() \
                           .withColumnsRenamed(
                                {
                                    'col2': 'id',
                                    'col3': 'n',
                                    'col4': 'pergunta',
                                    'col5': 'respostaId',
                                    'col6': 'resposta'
                                }
                            ) \
                           .withColumn('respostaId', col('respostaId').cast('Integer'))

df_caract.write \
           .format('parquet') \
           .mode('overwrite') \
           .save('Files/silver/dicionarios/dict_caracteristicas')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 9, Finished, Available, Finished)

In [8]:
df_renda = df_dicionarios.filter(col('classificacao') == 'renda') \
                           .select('col2', 'col3', 'col4', 'col5', 'col6', 'mes') \
                           .distinct() \
                           .withColumnsRenamed(
                                {
                                    'col2': 'id',
                                    'col3': 'n',
                                    'col4': 'pergunta',
                                    'col5': 'respostaId',
                                    'col6': 'resposta'
                                }
                            ) \
                           .withColumn('respostaId', col('respostaId').cast('Integer'))

df_renda.write \
           .format('parquet') \
           .mode('overwrite') \
           .save('Files/silver/dicionarios/dict_renda')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 10, Finished, Available, Finished)

In [9]:
df_habitacao = df_dicionarios.filter(col('classificacao') == 'habitacao') \
                           .select('col2', 'col3', 'col4', 'col5', 'col6', 'mes') \
                           .distinct() \
                           .withColumnsRenamed(
                                {
                                    'col2': 'id',
                                    'col3': 'n',
                                    'col4': 'pergunta',
                                    'col5': 'respostaId',
                                    'col6': 'resposta'
                                }
                            ) \
                           .withColumn('respostaId', col('respostaId').cast('Integer'))

df_habitacao.write \
           .format('parquet') \
           .mode('overwrite') \
           .save('Files/silver/dicionarios/dict_habitacao')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 11, Finished, Available, Finished)

In [10]:
df_gerais = df_dicionarios.filter(col('classificacao') == 'gerais') \
                           .select('col2', 'col3', 'col4', 'col5', 'col6', 'mes') \
                           .distinct() \
                           .withColumnsRenamed(
                                {
                                    'col2': 'id',
                                    'col3': 'n',
                                    'col4': 'pergunta',
                                    'col5': 'respostaId',
                                    'col6': 'resposta'
                                }
                            ) \
                           .withColumn('respostaId', col('respostaId').cast('Integer'))

df_gerais.write \
           .format('parquet') \
           .mode('overwrite') \
           .save('Files/silver/dicionarios/dict_gerais')

StatementMeta(, 6a68f829-c22f-4a1f-b84b-08074eeefd3d, 12, Finished, Available, Finished)