# Tech Challenge — PNAD COVID 

In [2]:
# === 0) Setup de ambiente ===
import sys, subprocess
def pip_install(pkg):
    try:
        __import__(pkg.split("==")[0].replace("-", "_"))
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"])

for pkg in ["pandas", "pyarrow", "awswrangler", "boto3"]:
    pip_install(pkg)

import re, boto3, pandas as pd, awswrangler as wr

AWS_REGION = "us-east-1"
S3_BUCKET  = "tc-pnad-covid-gabrielarodrigues"

# Prefixos
BRONZE = f"s3://{S3_BUCKET}/bronze/"
SILVER = f"s3://{S3_BUCKET}/silver/pnad_covid/"
GOLD_PATH = f"s3://{S3_BUCKET}/gold/pnad_covid_kpis/"
GOLD_FULL_PATH = f"s3://{S3_BUCKET}/gold/pnad_covid_kpis_full/"
ATHENA_OUTPUT = f"s3://{S3_BUCKET}/athena-output/"

boto3.setup_default_session(region_name=AWS_REGION)
try:
    wr.config.athena_ctas_approach = False
except Exception:
    try: wr.config.update("athena_ctas_approach", False)
    except Exception: pass

# Arquivos do bronze (ajuste se necessário)
FILES = [
    "PNAD_COVID_082020.csv",
    "PNAD_COVID_092020.csv",
    "PNAD_COVID_102020.csv",
    "PNAD_COVID_112020.csv",
]

def parse_ano_mes_from_filename(fname: str):
    m = re.search(r"_(\d{2})(\d{4})", fname)  # _MMYYYY
    if not m:
        raise ValueError(f"Sem MMYYYY no nome: {fname}")
    return int(m.group(2)), int(m.group(1))   # (ano, mes)

print("Região:", AWS_REGION, "| Bucket:", S3_BUCKET)

Região: us-east-1 | Bucket: tc-pnad-covid-gabrielarodrigues


## 1) Variáveis de interesse 
- **Demografia (4)**: UF, idade, sexo, escolaridade  
- **Sintomas (5)**: febre, tosse, garganta, falta ar, perda olfato/paladar  
- **Saúde/Teste (4)**: plano de saúde, fez teste, qualquer sintoma, teste positivo (qualquer)  
- **Econômicas (4)**: ocupado, desempregado, força de trabalho, renda total  
- **Rótulos (3)**: uf_nome, sexo_nome, mes_label  
> **Total Silver = 20 colunas** (ano/mes vêm como **partições**).

In [3]:
# Conjuntos originais (códigos do microdado)
VAR_DEMO = ["UF", "A002", "A003", "A005"]
VAR_SINT = ["B0011","B0012","B0013","B0014","B0015","B0018","B0019","B00111","B00112","B00113"]
VAR_TEST = ["B007","B008","B009B","B009D","B009F"]
VAR_ECON = ["C001","C002","C003"]  # núcleo emprego

VARS = VAR_DEMO + VAR_SINT + VAR_TEST + VAR_ECON

# Renome amigável (snake_case) para Pandas
rename_map = {
    # demografia
    "uf":"uf_codigo", "a002":"idade", "a003":"sexo", "a005":"escolaridade",
    # sintomas (semana anterior)
    "b0011":"sint_febre_semana", "b0012":"sint_tosse_semana", "b0013":"sint_dor_garganta_semana",
    "b0014":"sint_falta_ar_semana", "b0015":"sint_dor_cabeca_semana", "b0018":"sint_nariz_escorrendo_semana",
    "b0019":"sint_fadiga_semana", "b00111":"sint_perda_olfato_paladar_semana", "b00112":"sint_dor_muscular_semana",
    "b00113":"sint_diarreia_semana",
    # saúde/teste
    "b007":"fez_teste_covid", "b008":"plano_saude",
    "b009b":"resultado_teste_b009b", "b009d":"resultado_teste_b009d", "b009f":"resultado_teste_b009f",
}

## 2) Bronze → Transformações (Pandas) → Derivadas (saúde + economia)

In [4]:
dfs = []

for fname in FILES:
    df = wr.s3.read_csv(BRONZE + fname)
    df.columns = [c.strip().lower() for c in df.columns]

    ano, mes = parse_ano_mes_from_filename(fname)
    df["ano"], df["mes"] = ano, mes

    want = [v.lower() for v in VARS]
    cols_exist = [c for c in want if c in df.columns]

    # sintomas -> 0/1
    for c in [x.lower() for x in VAR_SINT if x.lower() in df.columns]:
        s = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)
        df[c] = (s == 1).astype(int)

    # plano/teste -> 0/1
    for c in [x.lower() for x in ["B007","B008"] if x.lower() in df.columns]:
        s = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)
        df[c] = (s == 1).astype(int)

    # resultados: manter código e criar *_pos (1=positivo)
    for c in [x.lower() for x in ["B009B","B009D","B009F"] if x.lower() in df.columns]:
        s = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)
        df[c] = s  # 1=positivo, 2=negativo (IBGE)
        df[c + "_pos"] = (s == 1).astype(int)

    # derivadas saúde
    sint_cols = [x.lower() for x in VAR_SINT if x.lower() in df.columns]
    df["qualquer_sintoma"] = (df[sint_cols].sum(axis=1) > 0).astype(int) if sint_cols else 0

    pos_cols = [c for c in ["b009b_pos","b009d_pos","b009f_pos"] if c in df.columns]
    df["teste_positivo_any"] = (df[pos_cols].sum(axis=1) > 0).astype(int) if pos_cols else 0

    # econômicas (emprego)
    if "c007b" in df.columns:
        s_econ = pd.to_numeric(df["c007b"], errors="coerce").fillna(3) # Se nulo, assume "Fora da força"
        df["ocupado"] = (s_econ == 1).astype(int)
        df["desempregado"] = (s_econ == 2).astype(int)
        df["forca_trabalho"] = ((s_econ == 1) | (s_econ == 2)).astype(int)
    else:
        # Fallback caso C007B não exista
        df["ocupado"] = 0
        df["desempregado"] = 0
        df["forca_trabalho"] = 0

    # renda_total = soma de todas D*
    import re as _re
    d_cols = [c for c in df.columns if _re.match(r"^d\d+", c)]
    if d_cols:
        for c in d_cols:
            df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)
        df["renda_total"] = df[d_cols].sum(axis=1)
    else:
        df["renda_total"] = 0.0

    # Seleção básica antes do rename
    keep = [c for c in cols_exist if c not in ["ano","mes"]]            + ["ano","mes","qualquer_sintoma","teste_positivo_any"]
    # econômicas a manter
    for k in ["ocupado","desempregado","forca_trabalho","renda_total"]:
        if k not in keep:
            keep.append(k)

    df = df[keep].copy()
    dfs.append(df)

# concatena meses
df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all.loc[:, ~df_all.columns.duplicated()]

# renome amigável
df_all = df_all.rename(columns={c: rename_map[c] for c in df_all.columns if c in rename_map})

# renome da derivada de positivo
if "teste_positivo_any" in df_all.columns and "teste_positivo_qualquer" not in df_all.columns:
    df_all = df_all.rename(columns={"teste_positivo_any":"teste_positivo_qualquer"})

# enriquecimentos
map_uf = {
  11:"Rondônia",12:"Acre",13:"Amazonas",14:"Roraima",15:"Pará",16:"Amapá",17:"Tocantins",
  21:"Maranhão",22:"Piauí",23:"Ceará",24:"Rio Grande do Norte",25:"Paraíba",26:"Pernambuco",
  27:"Alagoas",28:"Sergipe",29:"Bahia",31:"Minas Gerais",32:"Espírito Santo",33:"Rio de Janeiro",
  35:"São Paulo",41:"Paraná",42:"Santa Catarina",43:"Rio Grande do Sul",50:"Mato Grosso do Sul",
  51:"Mato Grosso",52:"Goiás",53:"Distrito Federal"
}
map_sexo = {1:"Masculino", 2:"Feminino"}

if "uf_codigo" in df_all.columns:
    df_all["uf_nome"] = df_all["uf_codigo"].map(map_uf)
if "sexo" in df_all.columns:
    df_all["sexo_nome"] = df_all["sexo"].map(map_sexo)

df_all["mes_label"] = df_all["ano"].astype(str) + "-" + df_all["mes"].astype(str).str.zfill(2)

print("Shape pós-transform:", df_all.shape)
df_all.head()

Shape pós-transform: (1535717, 33)


Unnamed: 0,uf_codigo,idade,sexo,escolaridade,sint_febre_semana,sint_tosse_semana,sint_dor_garganta_semana,sint_falta_ar_semana,sint_dor_cabeca_semana,sint_nariz_escorrendo_semana,...,mes,qualquer_sintoma,teste_positivo_qualquer,ocupado,desempregado,forca_trabalho,renda_total,uf_nome,sexo_nome,mes_label
0,11,36,1,5,0,0,0,0,0,0,...,8,0,0,1,0,1,1213.0,Rondônia,Masculino,2020-08
1,11,30,2,7,0,0,0,0,0,0,...,8,0,0,0,0,0,1213.0,Rondônia,Feminino,2020-08
2,11,13,1,2,0,0,0,0,0,0,...,8,0,0,0,0,0,1213.0,Rondônia,Masculino,2020-08
3,11,11,1,2,0,0,0,0,0,0,...,8,0,0,0,0,0,1213.0,Rondônia,Masculino,2020-08
4,11,57,2,2,0,0,0,0,0,0,...,8,0,0,0,0,0,1058.0,Rondônia,Feminino,2020-08


## 3) **CAP de 20 variáveis no Silver**
Lista final (20):  
`uf_codigo, idade, sexo, escolaridade, plano_saude, fez_teste_covid, qualquer_sintoma, teste_positivo_qualquer, sint_febre_semana, sint_tosse_semana, sint_dor_garganta_semana, sint_falta_ar_semana, sint_perda_olfato_paladar_semana, ocupado, desempregado, forca_trabalho, renda_total, uf_nome, sexo_nome, mes_label`  
As partições `ano` e `mes` **não** contam no limite.

In [5]:
CANDIDATES_20 = [
    "uf_codigo","idade","sexo","escolaridade",
    "plano_saude","fez_teste_covid","qualquer_sintoma","teste_positivo_qualquer",
    "sint_febre_semana","sint_tosse_semana","sint_dor_garganta_semana",
    "sint_falta_ar_semana","sint_perda_olfato_paladar_semana",
    "ocupado","desempregado","forca_trabalho","renda_total",
    "uf_nome","sexo_nome","mes_label",
]
exist_20 = [c for c in CANDIDATES_20 if c in df_all.columns]
if len(exist_20) > 20:
    exist_20 = exist_20[:20]  # segurança

print(f"Colunas Silver ({len(exist_20)}):", exist_20)

# Mantém só as 20 + partições
df_all = df_all[exist_20 + ["ano","mes"]].copy()

Colunas Silver (20): ['uf_codigo', 'idade', 'sexo', 'escolaridade', 'plano_saude', 'fez_teste_covid', 'qualquer_sintoma', 'teste_positivo_qualquer', 'sint_febre_semana', 'sint_tosse_semana', 'sint_dor_garganta_semana', 'sint_falta_ar_semana', 'sint_perda_olfato_paladar_semana', 'ocupado', 'desempregado', 'forca_trabalho', 'renda_total', 'uf_nome', 'sexo_nome', 'mes_label']


## 4) Gravar **Silver** em Parquet (particionado por `ano/mes`)
Idempotente com `overwrite_partitions`.

In [6]:
# Tipagem básica
for c in ["ano","mes","qualquer_sintoma","fez_teste_covid","teste_positivo_qualquer",
          "ocupado","desempregado","forca_trabalho"]:
    if c in df_all.columns:
        df_all[c] = pd.to_numeric(df_all[c], errors="coerce").fillna(0).astype("int32")
if "renda_total" in df_all.columns:
    df_all["renda_total"] = pd.to_numeric(df_all["renda_total"], errors="coerce").astype("float64").fillna(0.0)

# Grava
wr.s3.to_parquet(
    df=df_all,
    path=SILVER,
    dataset=True,
    partition_cols=["ano","mes"],
    mode="overwrite_partitions",
    compression="snappy",
)
print("✅ Silver escrito em:", SILVER)

✅ Silver escrito em: s3://tc-pnad-covid-gabrielarodrigues/silver/pnad_covid/


## 5) Glue Catalog — DB/Tabela externa e partições
Evita duplicar `ano/mes` no schema (apenas como `PartitionKeys`).

In [7]:
import boto3
DB_NAME = "pnad_covid_tc"
TABLE_NAME = "microdados_silver"
glue = boto3.client("glue", region_name=AWS_REGION)

# DB
try:
    glue.get_database(Name=DB_NAME); print("DB existe:", DB_NAME)
except glue.exceptions.EntityNotFoundException:
    glue.create_database(DatabaseInput={"Name": DB_NAME}); print("DB criado:", DB_NAME)

# Remove tabela antiga (se houver)
try:
    glue.delete_table(DatabaseName=DB_NAME, Name=TABLE_NAME); print("Tabela antiga removida.")
except glue.exceptions.EntityNotFoundException:
    pass

# Tipos para o Glue (sem partições)
cols_types = {}
for c in df_all.columns:
    if c in ("ano","mes"): 
        continue
    d = str(df_all[c].dtype)
    if d.startswith(("int","Int")): cols_types[c] = "int"
    elif d.startswith(("float","Float")): cols_types[c] = "double"
    else: cols_types[c] = "string"

wr.catalog.create_parquet_table(
    database=DB_NAME,
    table=TABLE_NAME,
    path=SILVER,
    columns_types=cols_types,
    partitions_types={"ano":"int","mes":"int"},
    compression="snappy"
)

wr.athena.repair_table(database=DB_NAME, table=TABLE_NAME)
print(f"✅ Glue OK e partições reparadas: {DB_NAME}.{TABLE_NAME}")

DB existe: pnad_covid_tc
Tabela antiga removida.
✅ Glue OK e partições reparadas: pnad_covid_tc.microdados_silver


## 6) Athena — configurar saída e criar **views** (saúde, economia, full)
As views usam **apenas** as 20 colunas disponíveis no Silver.

In [8]:
ath = boto3.client("athena", region_name=AWS_REGION)

# Define diretório de saída do WorkGroup
try:
    ath.update_work_group(
        WorkGroup="primary",
        ConfigurationUpdates={"ResultConfigurationUpdates": {"OutputLocation": ATHENA_OUTPUT}}
    )
    print("Athena OutputLocation:", ATHENA_OUTPUT)
except Exception as e:
    print("Aviso (workgroup):", e)

def run(sql):
    ath.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": DB_NAME},
        WorkGroup="primary"
    )

# Base dinâmica conforme colunas existentes
cols = list(df_all.columns)
tem_uf = "uf_codigo" in cols

def base_sel(extra):
    base = ["ano","mes"] + (["uf_codigo"] if tem_uf else [])
    if "uf_nome" in cols: base.append("uf_nome")
    if "sexo_nome" in cols: base.append("sexo_nome")
    return ", ".join(base + extra)

# Views
sint_cols = [c for c in [
    "sint_febre_semana","sint_tosse_semana","sint_dor_garganta_semana",
    "sint_falta_ar_semana","sint_perda_olfato_paladar_semana","qualquer_sintoma"
] if c in cols]
teste_cols = [c for c in [
    "plano_saude","fez_teste_covid","teste_positivo_qualquer"
] if c in cols]

run(f"""
CREATE OR REPLACE VIEW vw_sintomas AS
SELECT {base_sel(sint_cols)}
FROM {TABLE_NAME}
""")

run(f"""
CREATE OR REPLACE VIEW vw_testes AS
SELECT {base_sel(teste_cols)}
FROM {TABLE_NAME}
""")

kpis = []
if "qualquer_sintoma" in cols:
    kpis.append("AVG(CASE WHEN qualquer_sintoma=1 THEN 1 ELSE 0 END) AS taxa_sintomas")
if "fez_teste_covid" in cols:
    kpis.append("AVG(CASE WHEN fez_teste_covid=1 THEN 1 ELSE 0 END) AS taxa_testou")
if "teste_positivo_qualquer" in cols:
    kpis.append("AVG(CASE WHEN teste_positivo_qualquer=1 THEN 1 ELSE 0 END) AS taxa_positivo_any")

gb_count = 2 + (1 if tem_uf else 0) + (1 if "uf_nome" in cols else 0) + (1 if "sexo_nome" in cols else 0)
group_by_idx = ", ".join(str(i) for i in range(1, gb_count+1))

run(f"""
CREATE OR REPLACE VIEW vw_kpis AS
SELECT {base_sel([])}, {", ".join(kpis)}
FROM {TABLE_NAME}
GROUP BY {group_by_idx}
""")

econ_cols = [c for c in ["ocupado","desempregado","forca_trabalho","renda_total"] if c in cols]
run(f"""
CREATE OR REPLACE VIEW vw_economia AS
SELECT {base_sel(econ_cols)}
FROM {TABLE_NAME}
""")

kpis_econ = [
    "AVG(CASE WHEN ocupado=1 THEN 1 ELSE 0 END) AS taxa_ocupacao",
    "CASE WHEN SUM(forca_trabalho) > 0 THEN CAST(SUM(desempregado) AS DOUBLE)/CAST(SUM(forca_trabalho) AS DOUBLE) ELSE NULL END AS taxa_desemprego",
    "AVG(NULLIF(CAST(renda_total AS DOUBLE), 0.0)) AS renda_media"
]
run(f"""
CREATE OR REPLACE VIEW vw_kpis_econ AS
SELECT {base_sel([])}, {", ".join(kpis_econ)}
FROM {TABLE_NAME}
GROUP BY {group_by_idx}
""")

run("""
CREATE OR REPLACE VIEW vw_kpis_full AS
SELECT
  COALESCE(s.ano, e.ano) AS ano,
  COALESCE(s.mes, e.mes) AS mes,
  COALESCE(s.uf_codigo, e.uf_codigo) AS uf_codigo,
  COALESCE(s.uf_nome, e.uf_nome) AS uf_nome,
  COALESCE(s.sexo_nome, e.sexo_nome) AS sexo_nome,
  s.taxa_sintomas, s.taxa_testou, s.taxa_positivo_any,
  e.taxa_ocupacao, e.taxa_desemprego, e.renda_media
FROM vw_kpis s
FULL OUTER JOIN vw_kpis_econ e
  ON s.ano=e.ano AND s.mes=e.mes
  AND ((s.uf_codigo IS NOT DISTINCT FROM e.uf_codigo) OR (s.uf_codigo IS NULL AND e.uf_codigo IS NULL))
  AND ((s.uf_nome IS NOT DISTINCT FROM e.uf_nome) OR (s.uf_nome IS NULL AND e.uf_nome IS NULL))
  AND ((s.sexo_nome IS NOT DISTINCT FROM e.sexo_nome) OR (s.sexo_nome IS NULL AND e.sexo_nome IS NULL))
""")
print("✅ Views criadas: vw_sintomas, vw_testes, vw_kpis, vw_economia, vw_kpis_econ, vw_kpis_full")

Athena OutputLocation: s3://tc-pnad-covid-gabrielarodrigues/athena-output/
✅ Views criadas: vw_sintomas, vw_testes, vw_kpis, vw_economia, vw_kpis_econ, vw_kpis_full


## 7) Validação rápida (Athena, sem CTAS)
Use `ATHENA_OUTPUT` como staging.

In [9]:
wr.athena.read_sql_query(
    "SELECT DISTINCT ano, mes FROM microdados_silver ORDER BY ano, mes",
    database="pnad_covid_tc", ctas_approach=False, s3_output=ATHENA_OUTPUT
)

wr.athena.read_sql_query(
    "SELECT * FROM vw_kpis ORDER BY ano, mes LIMIT 20",
    database="pnad_covid_tc", ctas_approach=False, s3_output=ATHENA_OUTPUT
)

wr.athena.read_sql_query(
    "SELECT * FROM vw_kpis_econ ORDER BY ano, mes LIMIT 20",
    database="pnad_covid_tc", ctas_approach=False, s3_output=ATHENA_OUTPUT
)

Unnamed: 0,ano,mes,uf_codigo,uf_nome,sexo_nome,taxa_ocupacao,taxa_desemprego,renda_media
0,2020,8,25,Paraíba,Feminino,0.058981,0.469983,1507.775148
1,2020,8,21,Maranhão,Masculino,0.069961,0.25817,1438.26618
2,2020,8,28,Sergipe,Masculino,0.118819,0.194872,1471.7833
3,2020,8,51,Mato Grosso,Feminino,0.138335,0.261181,1178.442639
4,2020,8,26,Pernambuco,Masculino,0.13058,0.170334,1381.248846
5,2020,8,13,Amazonas,Feminino,0.059307,0.377823,1380.008417
6,2020,8,41,Paraná,Feminino,0.155409,0.245932,1470.640318
7,2020,8,24,Rio Grande do Norte,Feminino,0.080319,0.349908,1623.39727
8,2020,8,32,Espírito Santo,Feminino,0.132658,0.228448,1377.985054
9,2020,8,14,Roraima,Masculino,0.096045,0.37037,1140.2742


## 8) GOLD — materialização de KPIs (opcional, recomendado)


In [10]:
grp_cols = [c for c in ["ano","mes","uf_codigo","uf_nome","sexo_nome"] if c in df_all.columns]

# GOLD (saúde)
need = [c for c in ["qualquer_sintoma","fez_teste_covid","teste_positivo_qualquer"] if c in df_all.columns]
gold = (df_all
        .groupby(grp_cols, dropna=False)[need]
        .mean()
        .reset_index()
        .rename(columns={
            "qualquer_sintoma":"taxa_sintomas",
            "fez_teste_covid":"taxa_testou",
            "teste_positivo_qualquer":"taxa_positivo_any"
        }))

wr.s3.to_parquet(
    df=gold,
    path=GOLD_PATH,
    dataset=True,
    partition_cols=["ano","mes"],
    mode="overwrite_partitions",
    compression="snappy"
)
print("✅ GOLD (saúde) escrito em:", GOLD_PATH)

# GOLD FULL (saúde + economia)
need_econ = [c for c in ["ocupado","desempregado","forca_trabalho","renda_total"] if c in df_all.columns]
gold_full = (
    df_all.groupby(grp_cols, dropna=False)[need + need_econ].mean().reset_index()
)
# ajustar taxa_desemprego (desemp/força) e renda_media em zeros
# para desemprego precisamos das somas; refazemos especificamente:
tmp = (df_all.groupby(grp_cols, dropna=False)[["desempregado","forca_trabalho"]]
       .sum().reset_index())
gold_full = gold_full.merge(tmp, on=grp_cols, suffixes=("","_sumfix"), how="left")

gold_full["taxa_sintomas"] = gold_full["qualquer_sintoma"]
gold_full["taxa_testou"] = gold_full["fez_teste_covid"]
gold_full["taxa_positivo_any"] = gold_full["teste_positivo_qualquer"]
gold_full["taxa_ocupacao"] = gold_full["ocupado"]
gold_full["taxa_desemprego"] = (gold_full["desempregado_sumfix"] / gold_full["forca_trabalho_sumfix"])    .where(gold_full["forca_trabalho_sumfix"] > 0)
gold_full["renda_media"] = gold_full["renda_total"].replace(0, pd.NA)

cols_keep = grp_cols + ["taxa_sintomas","taxa_testou","taxa_positivo_any","taxa_ocupacao","taxa_desemprego","renda_media"]
gold_full = gold_full[cols_keep]

wr.s3.to_parquet(
    df=gold_full,
    path=GOLD_FULL_PATH,
    dataset=True,
    partition_cols=["ano","mes"],
    mode="overwrite_partitions",
    compression="snappy"
)
print("✅ GOLD FULL escrito em:", GOLD_FULL_PATH)

✅ GOLD (saúde) escrito em: s3://tc-pnad-covid-gabrielarodrigues/gold/pnad_covid_kpis/
✅ GOLD FULL escrito em: s3://tc-pnad-covid-gabrielarodrigues/gold/pnad_covid_kpis_full/


## 9) Glue + Views para GOLD
Cria tabelas `kpis_gold` e `kpis_gold_full` e as views correspondentes.

In [11]:
glue = boto3.client("glue", region_name=AWS_REGION)
ath = boto3.client("athena", region_name=AWS_REGION)

for tbl, path, view in [
    ("kpis_gold", GOLD_PATH, "vw_kpis_gold"),
    ("kpis_gold_full", GOLD_FULL_PATH, "vw_kpis_gold_full")
]:
    try:
        glue.delete_table(DatabaseName="pnad_covid_tc", Name=tbl)
    except glue.exceptions.EntityNotFoundException:
        pass

    # infer types
    df_tmp = wr.s3.read_parquet(path, dataset=True).head(1)
    cols_types = {}
    for c in df_tmp.columns:
        if c in ("ano","mes"): continue
        d = str(df_tmp[c].dtype)
        if d.startswith(("int","Int")): cols_types[c] = "int"
        elif d.startswith(("float","Float")): cols_types[c] = "double"
        else: cols_types[c] = "string"

    wr.catalog.create_parquet_table(
        database="pnad_covid_tc",
        table=tbl,
        path=path,
        columns_types=cols_types,
        partitions_types={"ano":"int","mes":"int"},
        compression="snappy"
    )
    wr.athena.repair_table(database="pnad_covid_tc", table=tbl)

    ath.start_query_execution(
        QueryString=f"CREATE OR REPLACE VIEW {view} AS SELECT * FROM {tbl}",
        QueryExecutionContext={"Database":"pnad_covid_tc"},
        WorkGroup="primary"
    )

print("✅ Glue + Views GOLD criadas")

✅ Glue + Views GOLD criadas


---

# ⬇️ Export **final + entrevistados** para Power BI
*Adicionado em 2025-10-06 02:03.*


In [12]:

# ==== Export final + entrevistados (Parquet/CSV em S3 e local) ====
# Lê a view final (vw_kpis_gold_full -> fallback vw_kpis_full)
# e anexa contagens (entrevistados, testados, positivos) por ano/mes/UF/sexo.

# Config simples (usa defaults se não existirem)
try: DB_NAME
except NameError: DB_NAME = "pnad_covid_tc"
try: ATHENA_OUTPUT
except NameError: ATHENA_OUTPUT = "s3://tc-pnad-covid-gabrielarodrigues/athena-output/"
try: S3_BUCKET
except NameError: S3_BUCKET = "tc-pnad-covid-gabrielarodrigues"

S3_EXPORT_ROOT = f"s3://{S3_BUCKET}/export/powerbi/"
EXPORT_FORMAT   = "parquet"     # "parquet" (recomendado) ou "csv"
SAVE_LOCAL_COPY = True
LOCAL_FILE_BASENAME = "base_final_com_entrevistados"

sql_final_candidates = [
    """
    WITH k AS (
      SELECT ano, mes, uf_codigo, uf_nome, sexo_nome,
             taxa_sintomas, taxa_testou, taxa_positivo_any,
             taxa_ocupacao, taxa_desemprego, renda_media
      FROM vw_kpis_gold_full
    ),
    c AS (
      SELECT ano, mes, uf_codigo, uf_nome, sexo_nome,
             COUNT(1) AS entrevistados,
             SUM(CASE WHEN fez_teste_covid = 1 THEN 1 ELSE 0 END)        AS testados,
             SUM(CASE WHEN teste_positivo_qualquer = 1 THEN 1 ELSE 0 END) AS positivos
      FROM microdados_silver
      GROUP BY 1,2,3,4,5
    )
    SELECT
      k.ano, k.mes, k.uf_codigo, k.uf_nome, k.sexo_nome,
      k.taxa_sintomas, k.taxa_testou, k.taxa_positivo_any,
      k.taxa_ocupacao, k.taxa_desemprego, k.renda_media,
      c.entrevistados, c.testados, c.positivos
    FROM k
    LEFT JOIN c
      ON k.ano = c.ano AND k.mes = c.mes
     AND COALESCE(k.uf_codigo, -1) = COALESCE(c.uf_codigo, -1)
     AND COALESCE(k.uf_nome,   '') = COALESCE(c.uf_nome,   '')
     AND COALESCE(k.sexo_nome,'')  = COALESCE(c.sexo_nome,'')
    """,
    """
    WITH k AS (
      SELECT ano, mes, uf_codigo, uf_nome, sexo_nome,
             taxa_sintomas, taxa_testou, taxa_positivo_any,
             taxa_ocupacao, taxa_desemprego, renda_media
      FROM vw_kpis_full
    ),
    c AS (
      SELECT ano, mes, uf_codigo, uf_nome, sexo_nome,
             COUNT(1) AS entrevistados,
             SUM(CASE WHEN fez_teste_covid = 1 THEN 1 ELSE 0 END)        AS testados,
             SUM(CASE WHEN teste_positivo_qualquer = 1 THEN 1 ELSE 0 END) AS positivos
      FROM microdados_silver
      GROUP BY 1,2,3,4,5
    )
    SELECT
      k.ano, k.mes, k.uf_codigo, k.uf_nome, k.sexo_nome,
      k.taxa_sintomas, k.taxa_testou, k.taxa_positivo_any,
      k.taxa_ocupacao, k.taxa_desemprego, k.renda_media,
      c.entrevistados, c.testados, c.positivos
    FROM k
    LEFT JOIN c
      ON k.ano = c.ano AND k.mes = c.mes
     AND COALESCE(k.uf_codigo, -1) = COALESCE(c.uf_codigo, -1)
     AND COALESCE(k.uf_nome,   '') = COALESCE(c.uf_nome,   '')
     AND COALESCE(k.sexo_nome,'')  = COALESCE(c.sexo_nome,'')
    """
]

df_final, _err = None, None
for _sql in sql_final_candidates:
    try:
        df_final = wr.athena.read_sql_query(_sql, database=DB_NAME, ctas_approach=False, s3_output=ATHENA_OUTPUT)
        if not df_final.empty:
            break
    except Exception as ex:
        _err = ex
        df_final = None

if df_final is None or df_final.empty:
    raise RuntimeError(f"Falha ao montar a base final com entrevistados. Último erro: {_err!r}")

# Enriquecimento temporal amigável
df_final["mes_label"] = pd.to_datetime(
    df_final["ano"].astype(str) + "-" + df_final["mes"].astype(str).str.zfill(2) + "-01"
).dt.strftime("%Y-%m")

# Caminhos e gravação
s3_path    = f"{S3_EXPORT_ROOT}{LOCAL_FILE_BASENAME}.{'parquet' if EXPORT_FORMAT=='parquet' else 'csv'}"
local_path = f"{LOCAL_FILE_BASENAME}.{'parquet' if EXPORT_FORMAT=='parquet' else 'csv'}"

if EXPORT_FORMAT == "parquet":
    wr.s3.to_parquet(df=df_final, path=s3_path, dataset=False, compression="snappy")
    if SAVE_LOCAL_COPY:
        df_final.to_parquet(local_path, index=False)
else:
    wr.s3.to_csv(df=df_final, path=s3_path, index=False)
    if SAVE_LOCAL_COPY:
        df_final.to_csv(local_path, index=False, encoding="utf-8")

print(f"✅ Export OK — colunas: {list(df_final.columns)}")
print(f"Linhas: {len(df_final):,}")
print("S3   :", s3_path)
if SAVE_LOCAL_COPY: print("Local:", local_path)


✅ Export OK — colunas: ['ano', 'mes', 'uf_codigo', 'uf_nome', 'sexo_nome', 'taxa_sintomas', 'taxa_testou', 'taxa_positivo_any', 'taxa_ocupacao', 'taxa_desemprego', 'renda_media', 'entrevistados', 'testados', 'positivos', 'mes_label']
Linhas: 216
S3   : s3://tc-pnad-covid-gabrielarodrigues/export/powerbi/base_final_com_entrevistados.parquet
Local: base_final_com_entrevistados.parquet
