<a href="https://colab.research.google.com/github/CarolineAndradeR/Atividade_nao_avaliativas_FIAP/blob/main/pnad_covid.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine, text
import os, glob, re

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# ============================================================
# 0. Conecxão ao PostgreSQL no AWS RDS
# ============================================================

usuario = "postgres"
senha = "Jhb7p5x8$#"
host = "databasefase3.c5iysuyqw1x4.us-east-1.rds.amazonaws.com"
porta = 5432
banco = "pnad_covid"

engine = create_engine(f"postgresql://{usuario}:{senha}@{host}:{porta}/{banco}")

# fat_pnad_covid

In [11]:
# ============================================================
# 2. Definir a pasta com os arquivos CSV
# ============================================================
pasta_microdados = "/content/drive/MyDrive/Arquivos Challenge/Microdados"

# Lista todos os arquivos CSV encontrados
arquivos_csv = glob.glob(os.path.join(pasta_microdados, "*.csv"))

# Ordena os arquivos (opcional, mas garante ordem de mês)
arquivos_csv.sort()

# Seleciona apenas os 3 primeiros arquivos
arquivos_csv = arquivos_csv[:3]

print("Arquivos selecionados:", len(arquivos_csv))
print("Arquivos:", [os.path.basename(a) for a in arquivos_csv])

Arquivos selecionados: 3
Arquivos: ['PNAD_COVID_052020.csv', 'PNAD_COVID_062020.csv', 'PNAD_COVID_072020.csv']


In [12]:
# ============================================================
# 3. Função para extrair apenas o mês do nome do arquivo
# ============================================================
import os, re

def extract_month(arquivo):
    """
    Extrai o mês (MM) do nome do arquivo.
    Exemplo: 'pnad_covid_052020.csv' -> retorna 5
    """
    nome_arquivo = os.path.basename(arquivo)
    match = re.search(r'(\d{2})\d{4}', nome_arquivo)
    if match:
        return int(match.group(1))
    else:
        return None


In [13]:
# ============================================================
# 4 e 5. Leitura e concatenação otimizadas — versão corrigida
# ============================================================
import pandas as pd
import gc

lista = []

# Função para extrair o mês
def extract_month(arquivo):
    nome = os.path.basename(arquivo)
    match = re.search(r'(\d{2})\d{4}', nome)
    return int(match.group(1)) if match else None

# Dtypes leves para economizar memória
dtypes_otimizados = {
    "UF": "category",
    "CAPITAL": "category",
    "RM_RIDE": "category"
}

for arquivo in arquivos_csv:
    mes = extract_month(arquivo)
    if mes is None:
        print(f"⚠️ Mês não identificado em: {arquivo}")
        continue

    print(f"Lendo: {os.path.basename(arquivo)} | Mês: {mes}")

    # Força separador vírgula
    try:
        df_tmp = pd.read_csv(
            arquivo,
            sep=",",
            encoding="utf-8",
            low_memory=False,
            dtype=dtypes_otimizados
        )
    except Exception:
        # Tenta detecção automática caso algum arquivo esteja diferente
        df_tmp = pd.read_csv(
            arquivo,
            sep=None,
            engine="python",
            low_memory=False
        )

    # Adiciona coluna de mês
    df_tmp["mes"] = mes
    lista.append(df_tmp)

    # Libera memória
    gc.collect()

# Concatena tudo
df_concat = pd.concat(lista, ignore_index=True)
del lista
gc.collect()

print(f"Linhas totais: {len(df_concat):,}")
print(f"Colunas detectadas: {len(df_concat.columns)}")

Lendo: PNAD_COVID_052020.csv | Mês: 5
Lendo: PNAD_COVID_062020.csv | Mês: 6
Lendo: PNAD_COVID_072020.csv | Mês: 7
Linhas totais: 1,114,742
Colunas detectadas: 146


In [14]:
print(df_concat.columns.tolist()[:20])

['Ano', 'UF', 'CAPITAL', 'RM_RIDE', 'V1008', 'V1012', 'V1013', 'V1016', 'Estrato', 'UPA', 'V1022', 'V1023', 'V1030', 'V1031', 'V1032', 'posest', 'A001', 'A001A', 'A001B1', 'A001B2']


In [15]:
df_concat.head()

Unnamed: 0,Ano,UF,CAPITAL,RM_RIDE,V1008,V1012,V1013,V1016,Estrato,UPA,...,E001,E0021,E0022,E0023,E0024,F002A1,F002A2,F002A3,F002A4,F002A5
0,2020,11,11,,1,4,5,1,1110011,110015970,...,,,,,,,,,,
1,2020,11,11,,1,4,5,1,1110011,110015970,...,,,,,,,,,,
2,2020,11,11,,1,4,5,1,1110011,110015970,...,,,,,,,,,,
3,2020,11,11,,1,4,5,1,1110011,110015970,...,,,,,,,,,,
4,2020,11,11,,3,2,5,1,1110011,110015970,...,,,,,,,,,,


In [16]:
# ============================================================
# 6. Padronizar nomes e remover colunas desnecessárias
# ============================================================
df_concat.columns = [c.strip().lower() for c in df_concat.columns]

# Colunas a remover (verifica existência antes)
colunas_remover = {"estrato", "upa", "posest"}
colunas_existentes = list(colunas_remover & set(df_concat.columns))

if colunas_existentes:
    df_concat.drop(columns=colunas_existentes, inplace=True)

gc.collect()
print(f"Colunas após limpeza: {len(df_concat.columns)}")


Colunas após limpeza: 143


In [17]:
print(df_concat.columns.tolist()[:20])

['ano', 'uf', 'capital', 'rm_ride', 'v1008', 'v1012', 'v1013', 'v1016', 'v1022', 'v1023', 'v1030', 'v1031', 'v1032', 'a001', 'a001a', 'a001b1', 'a001b2', 'a001b3', 'a002', 'a003']


In [18]:
# ============================================================
# 7. Filtrar colunas sem nulos e selecionar apenas úteis
# ============================================================
nan_counts = df_concat.isna().sum()
colunas_sem_nan = nan_counts[nan_counts == 0].index.tolist()

colunas_id = ["ano", "uf", "capital", "rm_ride", "mes"]
colunas_sem_nan = [c for c in colunas_sem_nan if c not in colunas_id]
colunas_selecionadas = colunas_id + colunas_sem_nan

# Seleciona e copia somente as colunas úteis
df_util = df_concat[colunas_selecionadas]
del df_concat
gc.collect()

print(f"Colunas selecionadas: {len(df_util.columns)}")


Colunas selecionadas: 45


In [19]:
# ============================================================
# 8. Transformar de colunas para linhas (melt)
# ============================================================
colunas_para_derreter = [c for c in df_util.columns if c not in colunas_id]

df_melt = pd.melt(
    df_util,
    id_vars=colunas_id,
    value_vars=colunas_para_derreter,
    var_name="variavel",
    value_name="valor"
)

del df_util
gc.collect()

print(df_melt.head())

    ano  uf capital rm_ride  mes variavel  valor
0  2020  11      11     NaN    5    v1008    1.0
1  2020  11      11     NaN    5    v1008    1.0
2  2020  11      11     NaN    5    v1008    1.0
3  2020  11      11     NaN    5    v1008    1.0
4  2020  11      11     NaN    5    v1008    3.0


In [20]:
# ============================================================
# 9. Renomear colunas conforme padrão definido
# ============================================================
mapeamento_colunas = {
    "ano": "ano",
    "mes": "mes",
    "uf": "id_uf",
    "capital": "id_capital",
    "rm_ride": "rm_ride",
    "variavel": "id_variavel",
    "valor": "id_categoria"
}

df_melt.rename(columns=mapeamento_colunas, inplace=True)

print(df_melt.columns.tolist())

['ano', 'id_uf', 'id_capital', 'rm_ride', 'mes', 'id_variavel', 'id_categoria']


In [21]:
df_melt.head()

Unnamed: 0,ano,id_uf,id_capital,rm_ride,mes,id_variavel,id_categoria
0,2020,11,11,,5,v1008,1.0
1,2020,11,11,,5,v1008,1.0
2,2020,11,11,,5,v1008,1.0
3,2020,11,11,,5,v1008,1.0
4,2020,11,11,,5,v1008,3.0


In [22]:
# ============================================================
# 10. Subir a tabela fato para o RDS com segurança e eficiência
# ============================================================

nome_tabela = "fat_microdados"
chunksize = 1000000

print(f"Verificando e removendo a tabela '{nome_tabela}' caso exista...")

# Tenta dropar a tabela antes de recriar
with engine.connect() as conn:
    conn.execute(text(f"DROP TABLE IF EXISTS {nome_tabela};"))
    conn.commit()

print(f"Tabela '{nome_tabela}' removida com sucesso (se existia).")
print(f"Iniciando upload em blocos de {chunksize} linhas...")

total_linhas = len(df_melt)

# Loop para enviar em blocos
for i, start in enumerate(range(0, total_linhas, chunksize)):
    end = start + chunksize
    chunk = df_melt.iloc[start:end].copy()

    # Envia o primeiro chunk como "replace" e os demais como "append"
    modo = "replace" if i == 0 else "append"
    chunk.to_sql(nome_tabela, engine, if_exists=modo, index=False)

    print(f"Chunk {i+1}: {len(chunk)} linhas enviadas ({min(end, total_linhas)}/{total_linhas})")

    # Libera memória do bloco
    del chunk
    gc.collect()

print(f"Upload completo da tabela '{nome_tabela}' com {total_linhas:,} linhas.")

Verificando e removendo a tabela 'fat_microdados' caso exista...
Tabela 'fat_microdados' removida com sucesso (se existia).
Iniciando upload em blocos de 1000000 linhas...
Chunk 1: 1000000 linhas enviadas (1000000/44589680)
Chunk 2: 1000000 linhas enviadas (2000000/44589680)
Chunk 3: 1000000 linhas enviadas (3000000/44589680)
Chunk 4: 1000000 linhas enviadas (4000000/44589680)
Chunk 5: 1000000 linhas enviadas (5000000/44589680)
Chunk 6: 1000000 linhas enviadas (6000000/44589680)
Chunk 7: 1000000 linhas enviadas (7000000/44589680)
Chunk 8: 1000000 linhas enviadas (8000000/44589680)
Chunk 9: 1000000 linhas enviadas (9000000/44589680)
Chunk 10: 1000000 linhas enviadas (10000000/44589680)
Chunk 11: 1000000 linhas enviadas (11000000/44589680)
Chunk 12: 1000000 linhas enviadas (12000000/44589680)
Chunk 13: 1000000 linhas enviadas (13000000/44589680)
Chunk 14: 1000000 linhas enviadas (14000000/44589680)
Chunk 15: 1000000 linhas enviadas (15000000/44589680)
Chunk 16: 1000000 linhas enviadas (1

# dim_dicionario

In [4]:
# -----------------------------
# 1. Leitura e preparação do Excel
# -----------------------------
def ler_excel_sem_cabecalho(file_path):
    """Lê o arquivo Excel ignorando cabeçalhos fixos da PNAD COVID."""
    return pd.read_excel(file_path, header=None)

In [5]:
# -----------------------------
# 2. Identificação da variável principal
# -----------------------------
def atualizar_variavel_atual(row, current_vars):
    """Atualiza os dados da variável principal ou subvariável."""
    id_var, num_var, desc = current_vars

    if pd.notna(row[1]) and pd.notna(row[3]):  # Nova variável principal
        id_var = str(row[1])
        num_var = str(row[2]) if pd.notna(row[2]) else None
        desc = str(row[3])

    elif pd.isna(row[1]) and (pd.notna(row[2]) or pd.notna(row[3])):  # Sub ou continuação
        if pd.notna(row[2]):  # Subvariável
            num_var = str(row[2])
            desc = str(row[3]) if pd.notna(row[3]) else desc
        elif pd.notna(row[3]):  # Continuação de descrição
            desc = f"{desc} {row[3]}" if desc else str(row[3])

    return id_var, num_var, desc


In [6]:
# -----------------------------
# 3. Extração das categorias (valores de código e descrição)
# -----------------------------
def extrair_categoria(row, current_vars, data):
    """Adiciona categorias à lista de dados."""
    id_var, num_var, desc = current_vars
    if pd.notna(row[4]) and pd.notna(row[5]):
        data.append({
            'id_variavel': id_var,
            'numero_variavel': num_var,
            'descricao': desc,
            'id_categoria': str(row[4]),
            'descricao_categoria': str(row[5])
        })

In [7]:
# -----------------------------
# 4. Função principal de processamento do dicionário
# -----------------------------
def processar_dicionario(file_path):
    """Processa o dicionário PNAD COVID e extrai variáveis e categorias."""
    df = ler_excel_sem_cabecalho(file_path)
    data = []
    current_vars = (None, None, None)  # id_variavel, numero_variavel, descricao

    for index, row in df.iterrows():
        if index < 3:  # Ignora cabeçalhos
            continue
        current_vars = atualizar_variavel_atual(row, current_vars)
        extrair_categoria(row, current_vars, data)

    return pd.DataFrame(data)

In [8]:
# -----------------------------
# 5. Processamento em lote e exportação final
# -----------------------------
def gerar_tabela_dimensao(pasta_doc, meses_fato):
    """Gera e salva a tabela dimensão consolidada a partir de múltiplos dicionários."""
    arquivos = [
        arq for arq in glob.glob(os.path.join(pasta_doc, "*.xls"))
        if any(mes in arq for mes in meses_fato)
    ]
    arquivos.sort()

    all_dfs = [processar_dicionario(arq) for arq in arquivos]
    final_df = pd.concat(all_dfs, ignore_index=True)
    final_df.to_csv("tabela_dimensao.csv", index=False)

    print("Tabela dimensão criada com sucesso e salva em 'tabela_dimensao.csv'")
    print(final_df.head(10).to_markdown(index=False))
    return final_df

In [9]:
# -----------------------------
# 6. Execução
# -----------------------------
pasta_doc = "/content/drive/MyDrive/Arquivos Challenge/Documentacao"
meses_fato = ["052020", "062020", "072020"]

final_df = gerar_tabela_dimensao(pasta_doc, meses_fato)

Tabela dimensão criada com sucesso e salva em 'tabela_dimensao.csv'
| id_variavel   | numero_variavel   | descricao            |   id_categoria | descricao_categoria   |
|:--------------|:------------------|:---------------------|---------------:|:----------------------|
| UF            |                   | Unidade da Federação |             11 | Rondônia              |
| UF            |                   | Unidade da Federação |             12 | Acre                  |
| UF            |                   | Unidade da Federação |             13 | Amazonas              |
| UF            |                   | Unidade da Federação |             14 | Roraima               |
| UF            |                   | Unidade da Federação |             15 | Pará                  |
| UF            |                   | Unidade da Federação |             16 | Amapá                 |
| UF            |                   | Unidade da Federação |             17 | Tocantins             |
| UF          

In [10]:
# ============================================================
# 7. Subir os dados para o RDS
# ============================================================
nome_tabela = "dim_dicionario"

final_df.to_sql(nome_tabela, engine, if_exists="replace", index=False)
print(f"Tabela '{nome_tabela}' criada/recriada com sucesso no RDS!")

Tabela 'dim_dicionario' criada/recriada com sucesso no RDS!
