# CAMADA BRONZE

### Instala√ß√£o de Bibliotecas

Esta c√©lula instala as bibliotecas necess√°rias para o projeto:
- `pyspark==4.0.0`: A vers√£o espec√≠fica do Apache Spark para Python, utilizada para processamento de grandes volumes de dados.
- `openpyxl`: Uma biblioteca para ler e escrever arquivos Excel (.xlsx).



In [3]:
!pip install pyspark==4.0.0
!pip install openpyxl

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark==4.0.0
  Downloading pyspark-4.0.0.tar.gz (434.1 MB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m434.1/434.1 MB[0m [31m73.9 MB/s[0m  [33m0:00:04[0m:00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting py4j==0.10.9.9 (from pyspark==4.0.0)
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (pyproject.toml) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.0-py2.py3-none-any.whl size=434741299 sha256=2d169bba91da7f843d28ee8c48b477d8c79e7a5d0b5787c2d73bcd12e09a83a8
  Stored

In [2]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, lit




### Inicializa√ß√£o da SparkSession

Esta c√©lula inicializa a `SparkSession`, que √© o ponto de entrada para programar Spark com a API DataFrame e Dataset. Ela define o nome da aplica√ß√£o Spark.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = (
    SparkSession.builder
        .appName("PNAD_COVID_Traducao_09")
        .getOrCreate()
)





### Defini√ß√£o dos Caminhos dos Arquivos

Esta c√©lula define os caminhos para os arquivos de entrada:
- `micro_09_path`: Caminho para o arquivo CSV contendo os microdados da PNAD COVID de setembro de 2020.
- `micro_10_path`: Caminho para o arquivo CSV contendo os microdados da PNAD COVID de outubro de 2020.
- `micro_11_path`: Caminho para o arquivo CSV contendo os microdados da PNAD COVID de novembro de 2020.
- `dic_path_09`: Caminho para o arquivo Excel (.xls) que cont√©m o dicion√°rio de vari√°veis dos microdados.
- `dic_path_10`: Caminho para o arquivo Excel (.xls) que cont√©m o dicion√°rio de vari√°veis dos microdados.
- `dic_path_11`: Caminho para o arquivo Excel (.xls) que cont√©m o dicion√°rio de vari√°veis dos microdados.

In [4]:
# Caminho do microdados (CSV)
micro_09_path = "s3://lab-746762238259/data-input/microdados/PNAD_COVID_092020.csv"
micro_10_path = "s3://lab-746762238259/data-input/microdados/PNAD_COVID_102020.csv"
micro_11_path = "s3://lab-746762238259/data-input/microdados/PNAD_COVID_112020.csv"




### Carregamento dos Microdados CSV

Descri√ß√£o:
Carrega os arquivos CSV de microdados de cada m√™s em DataFrames do Spark, exibindo o n√∫mero de colunas e as primeiras linhas para verifica√ß√£o.

In [5]:
micro_09 = (
    spark.read.csv(micro_09_path, header=True, inferSchema=True, sep=",")
)

print("Microdados colunas:", len(micro_09.columns))
micro_09.show(5, truncate=False)

micro_10 = (
    spark.read.csv(micro_10_path, header=True, inferSchema=True, sep=",")
)

print("Microdados colunas:", len(micro_10.columns))
micro_10.show(5, truncate=False)

micro_11 = (
    spark.read.csv(micro_11_path, header=True, inferSchema=True, sep=",")
)

print("Microdados colunas:", len(micro_11.columns))
micro_11.show(5, truncate=False)

Microdados colunas: 145
+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+------------+------------+------+----+-----+------+------+------+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+
|Ano |UF |CAPITAL|RM_RIDE|V1008|V1012|V1013|V1016|Estrato|UPA      |V1022|V1023|V1030 

### Unifica√ß√£o dos Microdados
Esta c√©lula unifica os tr√™s DataFrames (setembro, outubro e novembro) em um √∫nico DataFrame, permitindo a an√°lise consolidada dos tr√™s meses. A fun√ß√£o unionByName √© usada para unir por nome de coluna, permitindo colunas ausentes.

In [6]:
from functools import reduce

micro_09 = spark.read.csv(micro_09_path, header=True, inferSchema=True)
micro_10 = spark.read.csv(micro_10_path, header=True, inferSchema=True)
micro_11 = spark.read.csv(micro_11_path, header=True, inferSchema=True)

dfs = [micro_09, micro_10, micro_11]

micro_unificado = reduce(
    lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True),
    dfs
)

micro_unificado.show(5, truncate=False)

+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+------------+------------+------+----+-----+------+------+------+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+-----+-----+-----+
|Ano |UF |CAPITAL|RM_RIDE|V1008|V1012|V1013|V1016|Estrato|UPA      |V1022|V1023|V1030 |V1031

### Convers√£o do Dicion√°rio Excel para CSV

O dicion√°rio original est√° em formato Excel. Esta c√©lula:
1.  L√™ o arquivo Excel para um DataFrame Pandas.
2.  Salva o DataFrame Pandas em um arquivo CSV tempor√°rio (`/content/dicionario_pnad.csv`).

Esta etapa √© necess√°ria para que o Spark possa ler o dicion√°rio de forma mais f√°cil e consistente.

In [7]:
import pandas as pd

# Caminho do dicion√°rio (CSV)
dic_path_09 = "s3://lab-746762238259/data-input/dicion√°rios/Dicionario_PNAD_COVID_092020_20220621.csv"
dic_path_10 = "s3://lab-746762238259/data-input/dicion√°rios/Dicionario_PNAD_COVID_102020_20220621.csv"
dic_path_11 = "s3://lab-746762238259/data-input/dicion√°rios/Dicionario_PNAD_COVID_112020_20220621.csv"

dic_excel_paths = {
    "09": dic_path_09,
    "10": dic_path_10,
    "11": dic_path_11
}

dic_csv_paths = []

for mes, path in dic_excel_paths.items():
    df_pd = pd.read_csv(path, header=None)

    csv_path = f"s3://lab-746762238259/data-input/dicion√°rios/dicionario_pnad_{mes}.csv"
    df_pd.to_csv(csv_path, index=False, header=False)

    dic_csv_paths.append(csv_path)




### Carregamento do Dicion√°rio no Spark

Descri√ß√£o:
Carrega os arquivos CSV do dicion√°rio convertidos em um DataFrame do Spark, atribuindo nomes gen√©ricos √†s colunas (A, B, C, D, E, F).

Esta etapa √© necess√°ria para que o Spark possa ler o dicion√°rio de forma mais f√°cil e consistente.

In [8]:
df_dict_raw = (
    spark.read.format("csv")
    .option("header", "false")
    .option("sep", ",")
    .load(dic_csv_paths)
)

df_dict = df_dict_raw.toDF("A", "B", "C", "D", "E", "F")
df_dict.show(20, truncate=False)


+--------------------------------------+-------+----+--------------------+-----+-------------------+
|A                                     |B      |C   |D                   |E    |F                  |
+--------------------------------------+-------+----+--------------------+-----+-------------------+
|Dicion√°rio das vari√°veis da PNAD COVID|null   |null|null                |null |null               |
|Tamanho                               |C√≥digo |null|null                |null |null               |
|da                                    |null   |null|null                |null |null               |
|vari√°vel"                             |Quesito|null|Categorias          |null |null               |
|null                                  |null   |n¬∫  |Descri√ß√£o           |Tipo |Descri√ß√£o          |
|Parte 1 - Identifica√ß√£o e Controle    |null   |null|null                |null |null               |
|4                                     |Ano    |null|Ano de refer√™ncia   |null 

### Identifica√ß√£o de Linhas de Vari√°veis

Descri√ß√£o:
Cria uma coluna is_var para identificar quais linhas do dicion√°rio cont√™m informa√ß√µes sobre vari√°veis (c√≥digos e descri√ß√µes), baseando-se na presen√ßa de valores nas colunas B, D e E.

In [9]:
df_dict = df_dict.withColumn(
    "is_var",
    (F.col("B").isNotNull()) &
    (F.length("B") > 0) &
    (F.col("D").isNotNull()) &
    (F.length("D") > 0) &
    (F.col("E").isNotNull()) &
    (F.length("E") > 0)
)





### Visualiza√ß√£o das Linhas do Dicion√°rio

Descri√ß√£o:
Exibe as colunas relevantes do dicion√°rio junto com a flag is_var para verifica√ß√£o visual da identifica√ß√£o correta das vari√°veis.

In [10]:
df_dict.select("B","D","E","F","is_var").show(50, truncate=False)


+-------+--------------------+-----+-----------------------------+------+
|B      |D                   |E    |F                            |is_var|
+-------+--------------------+-----+-----------------------------+------+
|null   |null                |null |null                         |false |
|C√≥digo |null                |null |null                         |false |
|null   |null                |null |null                         |false |
|Quesito|Categorias          |null |null                         |false |
|null   |Descri√ß√£o           |Tipo |Descri√ß√£o                    |false |
|null   |null                |null |null                         |false |
|Ano    |Ano de refer√™ncia   |null |null                         |false |
|UF     |Unidade da Federa√ß√£o|11   |Rond√¥nia                     |true  |
|null   |null                |12   |Acre                         |false |
|null   |null                |13   |Amazonas                     |false |
|null   |null                

### Constru√ß√£o das Dimens√µes (DIMs)

Descri√ß√£o:
Processa o dicion√°rio para extrair tabelas de dimens√£o (DIMs) para cada vari√°vel presente nos microdados. O processo inclui:

Identifica√ß√£o de blocos de vari√°veis

Extra√ß√£o do nome e descri√ß√£o de cada vari√°vel

Cria√ß√£o de DataFrames de dimens√£o com c√≥digos e descri√ß√µes

Filtragem para incluir apenas vari√°veis presentes nos microdados



In [11]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# =====================================================
# 1. Criar identificador de bloco (cada vari√°vel)
# =====================================================
w = Window.orderBy(F.monotonically_increasing_id())

df_blocos = (
    df_dict
    .withColumn(
        "bloco_id",
        F.sum(F.when(F.col("is_var") == True, 1).otherwise(0)).over(w)
    )
)

# =====================================================
# 2. Criar tabela auxiliar com o nome da vari√°vel por bloco
# =====================================================
df_vars = (
    df_blocos
    .filter(F.col("is_var") == True)
    .select(
        F.col("bloco_id"),
        F.col("B").alias("nome_variavel"),
        F.col("D").alias("descricao_variavel")
    )
)

# =====================================================
# 3. Anexar o nome da vari√°vel a todas as linhas do bloco
# =====================================================
df_dict_norm = (
    df_blocos
    .join(df_vars, on="bloco_id", how="left")
)

# =====================================================
# 4. Fun√ß√£o para criar DIM de uma vari√°vel
# =====================================================


def criar_dim(df_dict_norm, nome_var):
    return (
        df_dict_norm
        .filter(
            (F.col("nome_variavel") == nome_var) &
            (F.col("E").rlike(r"^\d+$"))   # <-- REMOVE is_var == False
        )
        .select(
            F.col("E").cast("int").alias(f"{nome_var}_codigo"),
            F.col("F").alias(f"{nome_var}_descricao")
        )
        .dropDuplicates()
        .orderBy(f"{nome_var}_codigo")
    )

# =====================================================
# 5. Descobrir vari√°veis que existem no microdados
# =====================================================
todas_vars = [
    r["nome_variavel"]
    for r in df_vars.select("nome_variavel").distinct().collect()
    if r["nome_variavel"] not in (None, "")
]

variaveis_no_micro = [v for v in todas_vars if v in micro_unificado.columns]

print("Vari√°veis presentes no micro_09:")
print(variaveis_no_micro)

# =====================================================
# 6. Criar DIMS
# =====================================================
dims = {}

for var in variaveis_no_micro:
    print(f"\nüîπ Construindo DIM para {var}")
    dim = criar_dim(df_dict_norm, var)
    dims[var] = dim
    dim.show(20, truncate=False)


Vari√°veis presentes no micro_09:
['A001B1', 'B0015', 'B0019', 'B00111', 'B0034', 'B0045', 'C005', 'C009', 'A009', 'C004', 'C01022', 'C012', 'C013', 'D0043', 'F002A4', 'B0031', 'B0033', 'B0041', 'B009A', 'B009E', 'B0101', 'B0106', 'C003', 'C0052', 'C007E1', 'C008', 'C0104', 'C011A22', 'F0022', 'UPA', 'V1022', 'B0017', 'B0036', 'B0104', 'C007D', 'C01021', 'C011A21', 'E0023', 'F001', 'F0061', 'V1012', 'Estrato', 'A005', 'B0012', 'B00113', 'B009C', 'C001', 'C014', 'E001', 'F006', 'UF', 'V1008', 'C007C', 'C007E', 'C007E2', 'C01012', 'C011A1', 'C015', 'F002A3', 'CAPITAL', 'V1032', 'A003', 'B009D', 'B009F', 'C007A', 'D0031', 'D0041', 'A002', 'A006', 'B00110', 'B009B', 'B0103', 'C0051', 'C0053', 'D0033', 'B0013', 'B0014', 'B002', 'B005', 'C007F', 'C01011', 'C011A2', 'D0061', 'E0024', 'V1016', 'A004', 'B0011', 'B0037', 'B0044', 'C002', 'C011A12', 'C016', 'D0021', 'D0023', 'F0021', 'V1013', 'A001', 'A007', 'B0018', 'B0035', 'B0042', 'B007', 'C006', 'F002A1', 'V1031', 'B0032', 'B0043', 'B0046', 

In [12]:
micro_unificado.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3://lab-746762238259/data-output/pnad_covid_bronze/") \
    .option("compression", "snappy") \
    .saveAsTable("workspace.tb_pnad_covid_bronze")

df_dict.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3://lab-746762238259/data-output/pnad_covid_bronze/") \
    .option("compression", "snappy") \
    .saveAsTable("workspace.tb_pnad_covid_bronze")




# CAMADA SILVER
### Tradu√ß√£o dos Valores dos Microdados

Descri√ß√£o:
Aplica a tradu√ß√£o dos c√≥digos para descri√ß√µes textuais no DataFrame unificado de microdados. Para cada vari√°vel que possui uma dimens√£o correspondente, substitui os c√≥digos num√©ricos por suas descri√ß√µes textuais usando mapeamento.

In [13]:
from pyspark.sql import functions as F
from itertools import chain

# micro_traduzido come√ßa como c√≥pia do micro original
micro_traduzido = micro_unificado

for var, dim in dims.items():
    print(f"üîÅ Traduzindo valores da coluna {var}...")

    # Coletar mapeamento codigo -> descricao
    mapping = (
        dim
        .select(
            F.col(f"{var}_codigo").cast("string"),
            F.col(f"{var}_descricao")
        )
        .dropna()
        .distinct()
        .collect()
    )

    if not mapping:
        print(f"‚ö†Ô∏è Nenhum mapeamento encontrado para {var}, pulando.")
        continue

    # Criar map literal (Spark)
    map_expr = F.create_map(
        *list(chain.from_iterable(
            [(F.lit(r[0]), F.lit(r[1])) for r in mapping]
        ))
    )

    # Aplicar tradu√ß√£o IN-PLACE
    micro_traduzido = micro_traduzido.withColumn(
        var,
        F.coalesce(
            map_expr[F.col(var).cast("string")],
            F.col(var).cast("string")
        )
    )

# ===========================
# Valida√ß√£o
# ===========================
print("\n=== Preview ap√≥s tradu√ß√£o IN-PLACE ===")
micro_traduzido.show(10, truncate=False)


üîÅ Traduzindo valores da coluna A001B1...
üîÅ Traduzindo valores da coluna B0015...
üîÅ Traduzindo valores da coluna B0019...
üîÅ Traduzindo valores da coluna B00111...
üîÅ Traduzindo valores da coluna B0034...
üîÅ Traduzindo valores da coluna B0045...
üîÅ Traduzindo valores da coluna C005...
üîÅ Traduzindo valores da coluna C009...
‚ö†Ô∏è Nenhum mapeamento encontrado para C009, pulando.
üîÅ Traduzindo valores da coluna A009...
üîÅ Traduzindo valores da coluna C004...
üîÅ Traduzindo valores da coluna C01022...
‚ö†Ô∏è Nenhum mapeamento encontrado para C01022, pulando.
üîÅ Traduzindo valores da coluna C012...
üîÅ Traduzindo valores da coluna C013...
üîÅ Traduzindo valores da coluna D0043...
‚ö†Ô∏è Nenhum mapeamento encontrado para D0043, pulando.
üîÅ Traduzindo valores da coluna F002A4...
üîÅ Traduzindo valores da coluna B0031...
üîÅ Traduzindo valores da coluna B0033...
üîÅ Traduzindo valores da coluna B0041...
üîÅ Traduzindo valores da coluna B009A...
üîÅ Traduzind

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Criar DataFrame com ID como primeira coluna
micro_traduzido = micro_traduzido.select(
    F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).alias("ID"),
    *micro_traduzido.columns  # todas as outras colunas
)




### Visualiza√ß√£o dos Dados Traduzidos

Descri√ß√£o:
Exibe as primeiras linhas do DataFrame de microdados ap√≥s a tradu√ß√£o, mostrando os valores j√° convertidos em formato leg√≠vel.

In [15]:
micro_traduzido.show(20, truncate=False)

+---+----+--------+-----------------------------+-------+-----+-----+-----+-----+-------+---------+------+-------+------+------------+------------+------+----+-------------------------------------------+-------------+-------------+------+----+------+-------+----------------------+----+--------------------------------------+------+-----------------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------------------------------------------------------------------------------------------------------------------+----+----+----+----+----+-----+-----+-----+----+---------------------------+-----+--------------------------------------+------------------------------------------------------------------+-------------------------------------------------+-----+------+-

### Visualiza√ß√£o das Colunas Traduzidos

Descri√ß√£o:
Exibe colunas do DataFrame de microdados ap√≥s a tradu√ß√£o, mostrando os valores j√° convertidos em formato leg√≠vel.

In [16]:
# =====================================================
# Traduzir nomes das colunas do microdados usando df_dict
# PySpark puro (sem Pandas, sem CSV)
# B (codigo) -> D (descricao normalizada)
# =====================================================

from pyspark.sql import functions as F
import unicodedata
import re

# =====================================================
# 1. Fun√ß√£o de normaliza√ß√£o (driver-side)
# =====================================================
def normaliza_nome_coluna(texto: str) -> str:
    if texto is None:
        return None

    texto = texto.strip().lower()

    # Remove acentos
    texto = unicodedata.normalize("NFKD", texto)
    texto = texto.encode("ascii", "ignore").decode("utf-8")

    # Mant√©m apenas [a-z0-9_]
    texto = re.sub(r"[^a-z0-9]+", "_", texto)
    texto = re.sub(r"_+", "_", texto).strip("_")

    return texto


# =====================================================
# 2. Extrair pares (codigo -> descricao) do df_dict
#    OBS: inclui linhas is_var=true e false
# =====================================================
pairs = (
    df_dict
    .select(
        F.col("B").cast("string").alias("codigo"),
        F.col("D").cast("string").alias("descricao")
    )
    .where(
        F.col("codigo").isNotNull() &
        F.col("descricao").isNotNull()
    )
    .dropDuplicates()
    .collect()   # necess√°rio para renomear colunas
)

# =====================================================
# 3. Mapa case-insensitive das colunas do micro
# =====================================================
micro_cols_lower = {c.lower(): c for c in micro_traduzido.columns}

# =====================================================
# 4. Criar rename_map (old_col -> new_col)
# =====================================================
rename_map = {}

for r in pairs:
    codigo = r["codigo"].strip().lower()
    descricao = r["descricao"].strip()

    if codigo in micro_cols_lower:
        old_col = micro_cols_lower[codigo]
        new_col = normaliza_nome_coluna(descricao)

        if new_col and new_col != old_col:
            rename_map[old_col] = new_col

# =====================================================
# 5. Aplicar renomea√ß√£o
# =====================================================
for old, new in rename_map.items():
    micro_traduzido = micro_traduzido.withColumnRenamed(old, new)





In [17]:
micro_traduzido.show(20, truncate=False)

+---+-----------------+--------------------+-----------------------------+------------------------------------------------------+------------------------------+-------------+---------------+---------------------------------+-------+---------+---------------------+------------+---------------------+-------------------------------+-------------------------------+--------------------+---------------+-------------------------------------------+-----------------+-----------------+-----------------+----------------+------+-----------+----------------------+----------------+-----------------------------------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------------------------------------+----------------------------+----------------------------+--------------------------------------+------------------------------------------------+------------------------------------+--

In [18]:
from collections import defaultdict

def rename_duplicate_columns(df):
    counts = defaultdict(int)
    new_columns = []

    for c in df.columns:
        counts[c] += 1
        if counts[c] == 1:
            new_columns.append(c)
        else:
            new_columns.append(f"{c}_{counts[c]}")

    return df.toDF(*new_columns)

micro_traduzido_corrigido = rename_duplicate_columns(micro_traduzido)





In [19]:
micro_traduzido_corrigido.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3://lab-746762238259/data-output/pnad_covid_silver/") \
    .option("compression", "snappy") \
    .saveAsTable("workspace.tb_pnad_covid_silver")




### Contagem de linhas de dados do dataframe

Descri√ß√£o:
Exibe a quantidade de linhas do dataframe

In [20]:
micro_traduzido.count()


1149197


# CAMADA GOLD
### Visualiza√ß√£o dos Dados com as Colunas

Descri√ß√£o:
Exibe as primeiras linhas do DataFrame de microdados ap√≥s a tradu√ß√£o, mostrando os valores j√° convertidos em formato leg√≠vel.

In [21]:
from pyspark.sql import functions as F

# =====================================================
# Lista FINAL de colunas desejadas (j√° corrigida)
# =====================================================
colunas_selecionadas = [
    "ano_de_referencia",
    "unidade_da_federacao",
    "capital",
    "regiao_metropolitana_e_regiao_administrativa_integrada_de_desenvolvimento",
    "semana_no_mes",
    "mes_da_pesquisa",
    "estrato",
    "upa",
    "situacao_do_domicilio",
    "tipo_de_area",
    "dominios_de_projecao",
    "condicao_no_domicilio",
    "idade_do_morador",
    "sexo",
    "cor_ou_raca",
    "escolaridade",
    "frequenta_escola",
    "na_semana_passada_em_quantos_dias_dedicou_se_as_atividades_escolares",
    "na_semana_passada_quanto_tempo_por_dia_gastou_fazendo_as_atividades_escolares_escolares",
    "na_semana_passada_teve_febre",
    "na_semana_passada_teve_tosse",
    "na_semana_passada_teve_dor_de_garganta",
    "na_semana_passada_teve_dificuldade_para_respirar",
    "na_semana_passada_teve_dor_de_cabeca",
    "na_semana_passada_teve_dor_no_peito",
    "na_semana_passada_teve_nausea",
    "na_semana_passada_teve_nariz_entupido_ou_escorrendo",
    "na_semana_passada_teve_fadiga",
    "na_semana_passada_teve_dor_nos_olhos",
    "na_semana_passada_teve_perda_de_cheiro_ou_sabor",
    "na_semana_passada_teve_dor_muscular",
    "na_semana_passada_teve_diarreia",
    "por_causa_disso_foi_a_algum_estabelecimento_de_saude",
    "local_que_buscou_atendimento_foi_posto_de_saude_unidade_basica_de_saude_equipe_de_saude_da_familia_medico_enfermeiro_tecnico_de_enfermagem_ou_agente_comunitario_de_saude",
    "local_que_buscou_atendimento_foi_pronto_socorro_do_sus_upa",
    "local_que_buscou_atendimento_foi_hospital_do_sus",
    "local_que_buscou_atendimento_foi_ambulatorio_ou_consultorio_privado_ou_ligado_as_forcas_armadas",
    "local_que_buscou_atendimento_foi_pronto_socorro_privado_ou_ligado_as_forcas_armadas",
    "local_que_buscou_atendimento_foi_hospital_privado_ou_ligado_as_forcas_armadas",
    "ao_procurar_o_hospital_teve_que_ficar_internado_por_um_dia_ou_mais",
    "durante_a_internacao_foi_sedado_entubado_e_colocado_em_respiracao_artificial_com_ventilador",
    "tem_algum_plano_de_saude_medico_seja_particular_de_empresa_ou_de_orgao_publico",
    "o_a_sr_a_fez_algum_teste_para_saber_se_estava_infectado_a_pelo_coronavirus",
    "fez_o_exame_coletado_com_cotonete_na_boca_e_ou_nariz_swab",
    "qual_o_resultado",
    "qual_o_resultado_2",
    "qual_o_resultado_3",
    "fez_o_exame_de_coleta_de_sangue_atraves_de_furo_no_dedo",
    "fez_o_exame_de_coleta_de_sangue_atraves_da_veia_da_braco",
    "algum_medico_ja_lhe_deu_o_diagnostico_de_diabetes",
    "algum_medico_ja_lhe_deu_o_diagnostico_de_hipertensao",
    "algum_medico_ja_lhe_deu_o_diagnostico_de_asma_bronquite_enfisema_doencas_respiratoria_cronica_ou_doenca_de_pulmao",
    "algum_medico_ja_lhe_deu_o_diagnostico_de_doencas_do_coracao_infarto_angina_insuficiencia_cardiaca_arritmia",
    "algum_medico_ja_lhe_deu_o_diagnostico_de_depressao",
    "algum_medico_ja_lhe_deu_o_diagnostico_de_cancer",
    "qual_foi_o_resultado_do_teste_na_semana_passada_devido_a_pandemia_do_coronavirus_em_que_medida_o_a_sr_a_restringiu_o_contato_com_as_pessoas",
    "na_semana_passada_por_pelo_menos_uma_hora_trabalhou_ou_fez_algum_bico",
    "na_semana_passada_estava_temporariamente_afastado_de_algum_trabalho",
    "qual_o_principal_motivo_deste_afastamento_temporario",
    "continuou_a_ser_remunerado_mesmo_que_parcialmente_por_esse_trabalho",
    "ha_quanto_tempo_esta_afastado_desse_trabalho",
    "tem_mais_de_um_trabalho",
    "no_trabalho_unico_ou_principal_que_tinha_nessa_semana_era",
    "esse_trabalho_era_na_area",
    "tem_carteira_de_trabalho_assinada_ou_e_funcionario_publico_estatutario",
    "que_tipo_de_trabalho_cargo_ou_funcao_voce_realiza_no_seu_trabalho_unico_ou_principal",
    "qual_e_a_principal_atividade_do_local_ou_empresa_em_que_voce_trabalha",
    "na_semana_passada_quantos_empregados_trabalhavam_nesse_negocio_empresa_que_tinha",
    "quantas_horas_por_semana_normalmente_trabalhava",
    "quantas_horas_na_semana_passada_de_fato_trabalhou",
    "na_semana_passada_o_a_sr_a_gostaria_de_ter_trabalhado_mais_horas_do_que_as_de_fato_trabalhadas",
    "quanto_recebia_ou_retirava_normalmente_em_todos_os_seus_trabalhos",
    "recebia_retirava_normalmente_em_dinheiro",
    "recebia_normalmente_em_produtos_e_mercadorias",
    "era_nao_remunerado",
    "quanto_recebia_ou_retirava_efetivamente_em_todos_os_seus_trabalhos",
    "recebia_retirava_efetivamente_em_dinheiro",
    "recebia_efetivamente_em_produtos_e_mercadorias",
    "na_maior_parte_do_tempo_na_semana_passada_esse_trabalho_unico_ou_principal_foi_exercido_no_mesmo_local_em_que_costuma_trabalhar",
    "na_semana_passada_o_a_sr_a_estava_em_trabalho_remoto_home_office_ou_teletrabalho",
    "o_a_sr_a_contribui_para_o_inss",
    "na_semana_passada_tomou_alguma_providencia_efetiva_para_conseguir_trabalho",
    "qual_o_principal_motivo_de_nao_ter_procurado_trabalho_na_semana_passada",
    "embora_voce_nao_tenha_procurado_trabalho_gostaria_de_ter_trabalhado_na_semana_passada",
    "rendimento_recebido_de_aposentadoria_e_pensao_por_todos_os_moradores",
    "rendimento_de_pensao_alimenticia_doacao_ou_mesada_em_dinheiro_de_pessoa_que_nao_morava_no_domicilio",
    "rendimentos_de_programa_bolsa_familia",
    "auxilios_emergenciais_relacionados_ao_coronavirus",
    "seguro_desemprego",
    "outros_rendimentos_como_aluguel_arrendamento_previdencia_privada_bolsa_de_estudos_rendimentos_de_aplicacao_financeira_etc",
    "durante_o_periodo_da_pandemia_alguem_deste_domicilio_solicitou_algum_emprestimo",
    "este_emprestimo_foi_adquirido_com_banco_ou_financeira",
    "este_emprestimo_foi_adquirido_com_parente_ou_amigo",
    "este_emprestimo_foi_adquirido_com_empregados_ou_patrao",
    "este_emprestimo_foi_adquirido_com_outro_local_ou_pessoa",
    "este_domicilio_e",
    "qual_foi_o_valor_mensal_do_aluguel_pago_ou_que_deveria_ter_sido_pago_no_mes_de_referencia",
    "numero_da_faixa_do_aluguel_pago",
    "quem_respondeu_ao_questionario",
    "numero_de_ordem_do_morador_que_prestou_as_informacoes"
]

# =====================================================
# 2. Sele√ß√£o segura (apenas colunas existentes)
# =====================================================
colunas_existentes = set(micro_traduzido_corrigido.columns)

colunas_validas = [
    F.col(c) for c in colunas_selecionadas if c in colunas_existentes
]

colunas_ausentes = [
    c for c in colunas_selecionadas if c not in colunas_existentes
]

if colunas_ausentes:
    print("‚ö†Ô∏è Colunas n√£o encontradas e ignoradas:")
    for c in colunas_ausentes:
        print(" -", c)

# =====================================================
# 3. DataFrame final
# =====================================================
df_selecionado = micro_traduzido_corrigido.select(colunas_validas)

# =====================================================
# 4. Visualiza√ß√£o
# =====================================================
df_selecionado.show(50, truncate=False)


‚ö†Ô∏è Colunas n√£o encontradas e ignoradas:
 - regiao_metropolitana_e_regiao_administrativa_integrada_de_desenvolvimento
+-----------------+--------------------+-----------------------------+-------------+---------------+-------+---------+---------------------+------------+--------------------+------------------------------------------------------+----------------+------+-----------+----------------------+----------------+--------------------------------------------------------------------+---------------------------------------------------------------------------------------+----------------------------+----------------------------+--------------------------------------+------------------------------------------------+------------------------------------+-----------------------------------+-----------------------------+---------------------------------------------------+-----------------------------+------------------------------------+-----------------------------------------------+

In [22]:
df_selecionado.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3://lab-746762238259/data-output/pnad_covid_gold/") \
    .option("compression", "snappy") \
    .saveAsTable("workspace.tb_pnad_covid_gold")


