# Tech Challenge 3
## ETL PNAD Covid19


## Introdução

Este projeto tem como objetivo analisar o comportamento da população brasileira durante a pandemia da COVID-19, com foco em aspectos clínicos, comportamentais e econômicos, utilizando como base os microdados da pesquisa PNAD COVID-19 disponibilizada pelo IBGE.

A análise visa apoiar a tomada de decisão de um hospital, simulando um cenário em que seja necessário planejar ações estratégicas para um eventual novo surto da doença, considerando indicadores relevantes extraídos dos dados.

Fonte: PNAD-COVID-19 do IBGE (https://covid19.ibge.gov.br/pnad-covid/)

## Arquitetura e Pipeline de Dados (AWS)


1. Ingestão dos Dados

* Os dados da PNAD COVID-19 foram armazenados no Amazon S3, organizados por camadas lógicas.

* O S3 atuou como Data Lake, centralizando os dados brutos e processados.

2. Processamento e Transformação (AWS Glue + Spark)

  Utilização de AWS Glue (Spark) para:

* Leitura dos dados brutos;

* Padronização de tipos de dados;

* Tratamento de valores nulos e inconsistências;

* Criação da tabela fato analítica.

3. Modelagem Analítica

Construção de uma tabela fato consolidada, contendo:

* Respostas da pesquisa;

* Informações geográficas (UF, capital, região);

* Características demográficas (sexo, raça, escolaridade, idade).

Criação de dimensões auxiliares para:

* Perguntas e respostas;

* Sexo, raça e escolaridade;

* UF, capital e região.

###  BIbliotecas

Importação das bibliotecas necessárias para manipulação.


In [None]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, trim
from pyspark.sql.functions import col
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session e5eae2d0-41d8-4392-adf2-4a172883b65e.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session e5eae2d0-41d8-4392-adf2-4a172883b65e.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session e5eae2d0-41d8-4392-adf2-4a172883b65e.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: None
Setting new worker type to: G.1X


You are already connected to a glueetl session e5eae2d0-41d8-4392-adf2-4a172883b65e.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: None
Setting new number of workers to: 5



### Leitura dos dados (Extração)

Leitura da base de dados original

Os dados são carregados a partir da camada de dados brutos


In [None]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|workspace|
+---------+


In [None]:
spark.sql("SHOW TABLES IN workspace").show()




+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|workspace|_rawbase_dicionar...|      false|
|workspace|      fato_analitica|      false|
|workspace|  fato_analitica_geo|      false|
|workspace|     fato_pnad_covid|      false|
|workspace|raw_base_microdad...|      false|
+---------+--------------------+-----------+


### Tratamento inicial dos dados

Tratamento inicial dos dados
* Seleção das colunas relevantes para a análise
* Padronização de nomes de colunas
* Relacionamento entre as tabelas Dicionario e a base de dados para criação das fatos e dimensões


In [None]:
dicionario = spark.table("workspace._rawbase_dicionarios_pnad_covid")
microdados = spark.table("workspace.raw_base_microdados_pnad_covid")






### Schema do Dicionário


In [None]:
dicionario.printSchema()
dicionario.show(10, truncate=False)


root
 |-- tamanho: long (nullable = true)
 |-- códigodavariável: string (nullable = true)
 |-- nº: string (nullable = true)
 |-- descrição: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- codigo auxiliar quesito: string (nullable = true)
 |-- descrição quesito: string (nullable = true)

+-------+----------------+---------+--------------------+---------+-----------------------+-----------------+
|tamanho|códigodavariável|nº       |descrição           |tipo     |codigo auxiliar quesito|descrição quesito|
+-------+----------------+---------+--------------------+---------+-----------------------+-----------------+
|null   |"Código         |null     |null                |null     |null                   |null             |
|null   |null            |null     |null                |null     |null                   |null             |
|null   |nº              |Descrição|Tipo                |Descrição|Codigo Auxiliar Quesito|Descrição Quesito|
|null   |                |        

## Criação das Fatos e Dimensões

###  Dim Pergunta

A Dim pergunta traz o código da pergunta e a descrição da pergunta.

In [None]:
dim_pergunta = (
    dicionario
    .select(
        col("códigodavariável").alias("cod_pergunta"),
        col("descrição").alias("descricao_pergunta")
    )
    .where(col("códigodavariável").isNotNull())
    .distinct()
)





In [None]:
dim_pergunta.printSchema()
dim_pergunta.show(10, truncate=False)


root
 |-- cod_pergunta: string (nullable = true)
 |-- descricao_pergunta: string (nullable = true)

+------------+----------------------------------------------------------------------------------------+
|cod_pergunta|descricao_pergunta                                                                      |
+------------+----------------------------------------------------------------------------------------+
|C009        |Quantas horas, na semana passada, de fato trabalhou?                                    |
|D0023       |Somatório dos valores recebidos                                                         |
|D0031       |Rendimentos de Programa Bolsa Família                                                   |
|D0061       |Seguro desemprego                                                                       |
|E0023       |Este empréstimo foi adquirido com empregados ou patrão                                  |
|F0022       |Número da faixa do aluguel pago                       

###  Dim Resposta

A Dim Resposta traz o código da pergunta, o código da resposta e a descrição da resposta.

In [None]:

dim_resposta = (
    dicionario
    .withColumnRenamed("descrição quesito", "cod_pergunta")
    .withColumnRenamed("tipo", "cod_resposta")
    .withColumnRenamed("codigo auxiliar quesito", "descricao_resposta")
    .select("cod_pergunta", "cod_resposta", "descricao_resposta")
    .where(col("cod_pergunta").isNotNull())
    .where(trim(col("cod_pergunta")) != "")
    .distinct()
)




In [None]:
dim_resposta.printSchema()
dim_resposta.show(10, truncate=False)

root
 |-- cod_pergunta: string (nullable = true)
 |-- cod_resposta: string (nullable = true)
 |-- descricao_resposta: string (nullable = true)

+------------+------------+------------------------------------+
|cod_pergunta|cod_resposta|descricao_resposta                  |
+------------+------------+------------------------------------+
|C0052       |            |Não aplicável                       |
|C0053       |            |Não aplicável                       |
|C007A       |            |Não aplicável                       |
|C007C       |03          |Auxiliar de escritório, escriturário|
|C007C       |07          |Balconista, vendedor de loja        |
|C007D       |15          |Atividades imobiliárias             |
|C007D       |19          |Educação                            |
|C009A       |2           |Não                                 |
|C010        |1           |Indica se o quesito foi respondido  |
|C01022      |            |Não aplicável                       |
+----------

### Dim_UF

A Dim UF traz o código UF e qual estado representa

In [None]:
dim_uf = (
    dim_resposta
    .filter(col("cod_pergunta") == "UF")
    .select(
        col("cod_resposta").cast("int").alias("uf"),
        col("descricao_resposta").alias("nome_uf")
    )
)





In [None]:
dim_uf.printSchema()
dim_uf.show(10, truncate=False)

root
 |-- uf: integer (nullable = true)
 |-- nome_uf: string (nullable = true)

+---+-------------------+
|uf |nome_uf            |
+---+-------------------+
|14 |Roraima            |
|24 |Rio Grande do Norte|
|33 |Rio de Janeiro     |
|41 |Paraná             |
|43 |Rio Grande do Sul  |
|51 |Mato Grosso        |
|27 |Alagoas            |
|50 |Mato Grosso do Sul |
|52 |Goiás              |
|21 |Maranhão           |
+---+-------------------+
only showing top 10 rows


### Dim Capital

A Dim Capital traz o código da capital e o nome da capital relacionada

In [None]:
dim_capital = (
    dim_resposta
    .filter(col("cod_pergunta") == "CAPITAL")
    .select(
        col("cod_resposta").cast("int").alias("capital"),
        col("descricao_resposta").alias("descricao_capital")
    )
)





In [None]:
dim_capital.printSchema()
dim_capital.show(10, truncate=False)

root
 |-- capital: integer (nullable = true)
 |-- descricao_capital: string (nullable = true)

+-------+--------------------------------+
|capital|descricao_capital               |
+-------+--------------------------------+
|33     |Município de Rio de Janeiro (RJ)|
|12     |Município de Rio Branco (AC)    |
|22     |Município de Teresina (PI)      |
|42     |Município de Florianópolis (SC) |
|50     |Município de Campo Grande (MS)  |
|11     |Município de Porto Velho (RO)   |
|13     |Município de Manaus (AM)        |
|17     |Município de Palmas (TO)        |
|23     |Município de Fortaleza (CE)     |
|15     |Município de Belém (PA)         |
+-------+--------------------------------+
only showing top 10 rows


### Dim_Sexo

A Dim Sexo traz o código sexo e qual valore ela representa (Mulher ou Homem)

In [None]:
dim_sexo = (
    dim_resposta
    .filter(col("cod_pergunta") == "A003")
    .select(
        col("cod_resposta").alias("cod_sexo"),
        col("descricao_resposta").alias("descricao_sexo")
    )
    .where(col("cod_sexo").isNotNull())
    .distinct()
)






In [None]:
dim_sexo.printSchema()
dim_sexo.show(10, truncate=False)

root
 |-- cod_sexo: string (nullable = true)
 |-- descricao_sexo: string (nullable = true)

+--------+--------------+
|cod_sexo|descricao_sexo|
+--------+--------------+
|2       |Mulher        |
|1       |Homem         |
+--------+--------------+


### Dim_Raca

A Dim Raça traz o código da raça e qual raça ela representa

In [1]:
dim_raca = (
    dim_resposta
    .filter(col("cod_pergunta") == "A004")
    .select(
        col("cod_resposta").alias("cod_raca"),
        col("descricao_resposta").alias("descricao_raca")
    )
    .where(col("cod_raca").isNotNull())
    .distinct()
)


NameError: name 'dim_resposta' is not defined

In [None]:
dim_raca.printSchema()
dim_raca.show(10, truncate=False)

root
 |-- cod_raca: string (nullable = true)
 |-- descricao_raca: string (nullable = true)

+--------+--------------+
|cod_raca|descricao_raca|
+--------+--------------+
|1       |Branca        |
|4       |Parda         |
|5       |Indígena      |
|9       |Ignorado      |
|3       |Amarela       |
|2       |Preta         |
+--------+--------------+


### Criar Dim_escolaridade

A Dim escolaridade traz o código da escolaridade e a descrição que ele representa

In [None]:
dim_escolaridade = (
    dim_resposta
    .filter(col("cod_pergunta") == "A005")
    .select(
        col("cod_resposta").alias("cod_escolaridade"),
        col("descricao_resposta").alias("descricao_escolaridade")
    )
    .where(col("cod_escolaridade").isNotNull())
    .distinct()
)




In [None]:
dim_escolaridade.printSchema()
dim_escolaridade.show(10, truncate=False)

root
 |-- cod_escolaridade: string (nullable = true)
 |-- descricao_escolaridade: string (nullable = true)

+----------------+------------------------------------+
|cod_escolaridade|descricao_escolaridade              |
+----------------+------------------------------------+
|3               |Fundamental completa                |
|1               |Sem instrução                       |
|7               |Superior completo                   |
|2               |Fundamental incompleto              |
|4               |Médio incompleto                    |
|8               |Pós-graduação, mestrado ou doutorado|
|5               |Médio completo                      |
|6               |Superior incompleto                 |
+----------------+------------------------------------+


## Criar Fato

Identificação das perguntas que serão usadas.

Conforme proposta do projeto, foram selecionadas 20 perguntas

In [None]:
perguntas = [
"B0011",
"B0012",
"B0013",
"B0014",
"B0015",
"B0016",
"B0017",
"B0019",
"B0037",
"B002",
"B0032",
"B0033",
"B0034",
"B005",
"C001",
"C002",
"C003",
"C005",
"C006",
"E001"
]






Criação da fato principal contendo todos os códigos necessários para a futura criação das tabelas de análise

In [None]:
expr = ", ".join([f"'{p}', {p}" for p in perguntas])

fato_long = microdados.selectExpr(
    "ano",
    "uf",
    "capital",
    f"stack({len(perguntas)}, {expr}) as (cod_pergunta, cod_resposta)"
)





In [None]:
expr = ", ".join([f"'{p}', {p}" for p in perguntas])

fato_base = microdados.selectExpr(
    "ano",
    "v1013 as mes",
    "uf",
    "capital",

    # características da população
    "A002 as idade",
    "A003 as sexo",
    "A004 as raca",
    "A005 as escolaridade",

    # perguntas (formato long)
    f"stack({len(perguntas)}, {expr}) as (cod_pergunta, cod_resposta)"
)







Tratativa para elimitar respostas nulas

In [None]:
fato_base.where(col("cod_resposta").isNotNull()).show(20, truncate=False)

+----+---+---+-------+-----+----+----+------------+------------+------------+
|ano |mes|uf |capital|idade|sexo|raca|escolaridade|cod_pergunta|cod_resposta|
+----+---+---+-------+-----+----+----+------------+------------+------------+
|2020|9  |11 |11     |36   |1   |4   |5           |B0011       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0012       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0013       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0014       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0015       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0016       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0017       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |B0019       |2           |
|2020|9  |11 |11     |36   |1   |4   |5           |C001        |1           |
|2020|9  |11 |11     |36   |1   |4   |5           |C006        |

In [None]:
fato_base = fato_base.filter(col("cod_resposta").isNotNull())






Relacionamento das tabelas fato e dimensões (Pergunta e Resposta)

In [None]:
fato_analitica = (
    fato_base
    .join(dim_pergunta, "cod_pergunta", "left")
    .join(dim_resposta, ["cod_pergunta", "cod_resposta"], "left")
)





In [None]:
fato_analitica.where(col("cod_resposta").isNotNull()).show(20, truncate=False)

+------------+------------+----+---+---+-------+-----+----+----+------------+----------------------------------------------------------------------------------+------------------+
|cod_pergunta|cod_resposta|ano |mes|uf |capital|idade|sexo|raca|escolaridade|descricao_pergunta                                                                |descricao_resposta|
+------------+------------+----+---+---+-------+-----+----+----+------------+----------------------------------------------------------------------------------+------------------+
|B0015       |2           |2020|9  |33 |33     |61   |2   |1   |6           |Na semana passada teve dor de cabeça?                                             |Não               |
|B0019       |2           |2020|9  |33 |33     |61   |2   |1   |6           |Na semana passada teve fadiga?                                                    |Não               |
|B0015       |2           |2020|9  |33 |33     |71   |1   |1   |7           |Na semana passada teve 

Relacionamento entre as tabelas fato e dimensões (UF e Capital)

In [None]:
fato_analitica_geo = (
    fato_analitica
    .join(dim_uf, "uf", "left")
    .join(dim_capital, "capital", "left")
)





In [None]:
fato_analitica_geo.where(col("cod_resposta").isNotNull()).show(20, truncate=False)

+-------+---+------------+------------+----+---+-----+----+----+------------+----------------------------------------------------------------------------------+------------------+--------------+--------------------------------+
|capital|uf |cod_pergunta|cod_resposta|ano |mes|idade|sexo|raca|escolaridade|descricao_pergunta                                                                |descricao_resposta|nome_uf       |descricao_capital               |
+-------+---+------------+------------+----+---+-----+----+----+------------+----------------------------------------------------------------------------------+------------------+--------------+--------------------------------+
|33     |33 |B0015       |2           |2020|9  |61   |2   |1   |6           |Na semana passada teve dor de cabeça?                                             |Não               |Rio de Janeiro|Município de Rio de Janeiro (RJ)|
|33     |33 |B0019       |2           |2020|9  |61   |2   |1   |6           |Na semana p

### Inclusão das caracteristicas

In [None]:
fato_analitica_final = (
    fato_analitica_geo
    # características da população
    .join(dim_sexo, fato_analitica_geo.sexo == dim_sexo.cod_sexo, "left")
    .join(dim_raca, fato_analitica_geo.raca == dim_raca.cod_raca, "left")
    .join(
        dim_escolaridade,
        fato_analitica_geo.escolaridade == dim_escolaridade.cod_escolaridade,
        "left"
    )
)






### Fato final

In [None]:
fato_analitica_final.where(col("cod_resposta").isNotNull()).show(20, truncate=False)

+-------+---+------------+------------+----+---+-----+----+----+------------+----------------------------------------------------------------------------------+------------------+--------------+--------------------------------+--------+--------------+--------+--------------+----------------+----------------------+
|capital|uf |cod_pergunta|cod_resposta|ano |mes|idade|sexo|raca|escolaridade|descricao_pergunta                                                                |descricao_resposta|nome_uf       |descricao_capital               |cod_sexo|descricao_sexo|cod_raca|descricao_raca|cod_escolaridade|descricao_escolaridade|
+-------+---+------------+------------+----+---+-----+----+----+------------+----------------------------------------------------------------------------------+------------------+--------------+--------------------------------+--------+--------------+--------+--------------+----------------+----------------------+
|33     |33 |B0015       |2           |2020|9  |61  

## Extração da base tratada

Extração da 1ª versão da fato para análise de forma parcionada

In [None]:
(
    fato_analitica
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv("s3://lab-232736165724/data-output/fato_analitica/")
)






### Extração da 2ª versão

In [None]:
(
    fato_analitica_geo
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv("s3://lab-232736165724/data-output/fato_analitica_geo/")
)




### Extração da versão final

In [None]:
(
    fato_analitica_final
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv("s3://lab-232736165724/data-output/fato_analitica_final/")
)




## Conclusão