## Import Libs

In [29]:
import requests
import zipfile
import io
import pandas as pd
import os
import boto3
import sys
import time 
import duckdb

from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from botocore.exceptions import BotoCoreError, ClientError
from psycopg2 import sql
from pyspark.sql import SparkSession

## Funções

In [4]:
# Testar a conexão ao banco de dados
def test_connection(engine):

    try:
        with engine.connect() as connection:
            
            # Testar a versão do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("✅ Conectado com sucesso:", versao[0])

            # Listar as tabelas no schema público
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public';
            """))
            tabelas = result.fetchall()
            print("📄 Tabelas no banco:")
            for tabela in tabelas:
                print("-", tabela[0])

    except Exception as e:
        print("❌ Erro ao executar comandos:", e)
        sys.exit()

## Configuração e conexão s3

In [5]:
# Carregar as credencias do .env
load_dotenv()

# Configuração storage_options
storage_options = {
    "key": os.getenv('aws_access_key_id'),
    "secret": os.getenv('aws_secret_access_key'),
    "token": os.getenv('aws_session_token')
}

# S3
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('aws_access_key_id'),
    aws_secret_access_key=os.getenv('aws_secret_access_key'),
    aws_session_token=os.getenv('aws_session_token'),
    region_name=os.getenv('region')
)

# PostgreSQL
usuario_pg = os.getenv("POSTGRES_USER_PNAD")
senha_pg = os.getenv("POSTGRES_PASSWORD_PNAD")
host_pg = os.getenv("POSTGRES_HOST_PNAD")
porta_pg = os.getenv("POSTGRES_PORT_PNAD")
banco_pg = os.getenv("POSTGRES_DB_PNAD")


In [6]:
# Validação conexão com a AWS através do .env
load_dotenv()

try:
    sts_client = boto3.client(
        'sts',
        aws_access_key_id=os.getenv('aws_access_key_id'),
        aws_secret_access_key=os.getenv('aws_secret_access_key'),
        aws_session_token=os.getenv('aws_session_token'),
        region_name=os.getenv('region')
    )
    
    identity = sts_client.get_caller_identity()
    print("✅ Conectado à conta\n")
    print("UserId:", identity["UserId"])
    print("Account:", identity["Account"])
    print("Arn:", identity["Arn"])

except (BotoCoreError, ClientError) as e:
    print("❌ Erro ao conectar à AWS. Verifique suas credenciais e tente novamente.")
    print("Detalhes do erro:", e)

✅ Conectado à conta

UserId: AROA6ODU7HW5AICBR3YUC:user4401657=geovanaferreira47@gmail.com
Account: 992382762426
Arn: arn:aws:sts::992382762426:assumed-role/voclabs/user4401657=geovanaferreira47@gmail.com


## Definição dos caminhos S3

In [34]:
# Bucket s3 e camadas
s3_bucket = 'fiaptechchallengefase3'
s3_bronze = 'bronze'
s3_silver = 'silver'
s3_gold = 'gold'

silver_prefix = f's3://{s3_bucket}/{s3_silver}/'
gold_prefix = f's3://{s3_bucket}/{s3_gold}/'

# Nome arquivo .parquet
nome_arquivo_gold = 'pnad_final.delta'

# Caminho de saida e entrada camada silver
caminho_saida_silver = silver_prefix + 'pnad.delta'
caminho_entrada_silver = silver_prefix + 'pnad.delta'
caminho_saida_gold = gold_prefix + 'pnad_final.delta'
caminho_completo_gold = gold_prefix + nome_arquivo_gold

# Caminho do Github com dados do PNAD
api_url = 'https://api.github.com/repos/geoferreira1/fiap_tech_challenge_fase_3/contents/covid/microdados'

# Caminho do Github com dados do código IBGE UF
link_codigo_uf = 'https://raw.githubusercontent.com/geoferreira1/fiap_tech_challenge_fase_3/refs/heads/main/covid/codigo_uf.csv'

# Nome da tabela no PostgreSQL
nome_tabela = 'questionario_covid'

In [35]:
# Criar engine com banco 
engine = create_engine(f"postgresql+psycopg2://{usuario_pg}:{senha_pg}@{host_pg}:{porta_pg}/{banco_pg}")

# Testar a conexão
test_connection(engine)

✅ Conectado com sucesso: PostgreSQL 17.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit
📄 Tabelas no banco:
- questionario_covid


In [36]:
# Criar Bucket e subpastas (camadas medalhões)

# Lista com camadas no s3
camadas = [s3_bronze, s3_silver, s3_gold]

# Armazenar a região da conexão com a AWS
aws_region = s3_client.meta.region_name

print(f"\nIniciando Processo de Validação e Criação no S3")

print(f"\nValidando o bucket '{s3_bucket}'")
bucket_pronto = False

# Validar se o Bucket esta criado
try:
    s3_client.head_bucket(Bucket=s3_bucket)
    print(f"➡️  Bucket '{s3_bucket}' já existe")
    bucket_pronto = True

except ClientError as e:
    
    if e.response['Error']['Code'] == '404':
        print(f"Bucket '{s3_bucket}' não encontrado. Tentando criar")

        try:
            if aws_region == "us-east-1":
                s3_client.create_bucket(Bucket=s3_bucket)

            else:
                location = {'LocationConstraint': aws_region}
                s3_client.create_bucket(
                    Bucket=s3_bucket,
                    CreateBucketConfiguration=location
                )
            print(f"✅ Bucket '{s3_bucket}' criado com sucesso!")
            bucket_pronto = True

        except Exception as create_e:
            print(f"❌ Falha ao tentar criar o bucket: {create_e}")

    else:
        print(f"❌ Erro de permissão ou outro problema ao verificar o bucket: {e}")

# Validar se as subpastas estão criadas
if bucket_pronto:
    print(f"\nValidando as subpastas no bucket '{s3_bucket}'")

    for nome_pasta in camadas:
        chave_pasta = nome_pasta if nome_pasta.endswith('/') else nome_pasta + '/'
        
        try:
            s3_client.head_object(Bucket=s3_bucket, Key=chave_pasta)
            print(f"➡️ Pasta '{chave_pasta}' já existe")

        except ClientError as e:

            if e.response['Error']['Code'] == '404':
                try:
                    s3_client.put_object(
                        Bucket=s3_bucket,
                        Key=chave_pasta,
                        Body=''
                    )
                    print(f"✅ Pasta '{chave_pasta}' criada com sucesso")

                except Exception as create_e:
                    print(f"❌ Falha ao TENTAR CRIAR a pasta '{chave_pasta}': {create_e}")

            else:
                print(f"❌ Erro ao verificar a pasta '{chave_pasta}': {e}")
else:
    print("\nCriação das pastas abortada, pois houve um problema com o bucket")

print("\nProcesso Finalizado")


Iniciando Processo de Validação e Criação no S3

Validando o bucket 'fiaptechchallengefase3'
➡️  Bucket 'fiaptechchallengefase3' já existe

Validando as subpastas no bucket 'fiaptechchallengefase3'
➡️ Pasta 'bronze/' já existe
➡️ Pasta 'silver/' já existe
➡️ Pasta 'gold/' já existe

Processo Finalizado


## Carregamento de dados para as camadas

In [37]:
# Carregar os dados para camada Bronze

# Realizar a consulta a URL
print(f"Buscando lista de arquivos em: {api_url}")

response = requests.get(api_url) # Fazer requisição HTTP
response.raise_for_status() # Verificar se a requisição foi bem feita
files = response.json() # Transformar em uma lista de dicionario

# Itera sobre cada item no diretório do GitHub
for file_info in files:

    if file_info['name'].endswith('.zip'):

        zip_name = file_info['name']
        zip_url = file_info['download_url']

        print(f"\nProcessando: {zip_name}")
        
        try:
            r_zip = requests.get(zip_url)
            r_zip.raise_for_status()
            zip_content = io.BytesIO(r_zip.content) # Arquivo temporario salvo em memoria

            with zipfile.ZipFile(zip_content) as z:
                csv_filename = [f for f in z.namelist() if f.endswith('.csv')][0]

                print(f"Arquivo CSV encontrado: {csv_filename}")

                csv_content_bytes = z.read(csv_filename)
                caminho_completo_s3 = f"{s3_bronze}/{csv_filename}"

                print(f"Enviando para o S3 em: '{caminho_completo_s3}'")

                # Enviar arquivo .csv para o S3
                s3_client.put_object(
                    Bucket=s3_bucket, # Nome do bucket
                    Key=caminho_completo_s3, # Caminho completo do arquivo
                    Body=csv_content_bytes # Arquivo que vai ser carregado
                )

                print(f"✅ Sucesso! Arquivo enviado para s3://{s3_bucket}/{caminho_completo_s3}")
        
        # Caso ocorra algum erro
        except Exception as e:
            print(f"Erro: Ocorreu um erro inesperado ao processar {zip_name}. Erro: {e}")
            sys.exit()
            
print("\nProcesso para o S3 concluído!")

Buscando lista de arquivos em: https://api.github.com/repos/geoferreira1/fiap_tech_challenge_fase_3/contents/covid/microdados

Processando: PNAD_COVID_052020.zip
Arquivo CSV encontrado: PNAD_COVID_052020.csv
Enviando para o S3 em: 'bronze/PNAD_COVID_052020.csv'
✅ Sucesso! Arquivo enviado para s3://fiaptechchallengefase3/bronze/PNAD_COVID_052020.csv

Processando: PNAD_COVID_062020.zip
Arquivo CSV encontrado: PNAD_COVID_062020.csv
Enviando para o S3 em: 'bronze/PNAD_COVID_062020.csv'
✅ Sucesso! Arquivo enviado para s3://fiaptechchallengefase3/bronze/PNAD_COVID_062020.csv

Processando: PNAD_COVID_072020.zip
Arquivo CSV encontrado: PNAD_COVID_072020.csv
Enviando para o S3 em: 'bronze/PNAD_COVID_072020.csv'
✅ Sucesso! Arquivo enviado para s3://fiaptechchallengefase3/bronze/PNAD_COVID_072020.csv

Processando: PNAD_COVID_082020.zip
Arquivo CSV encontrado: PNAD_COVID_082020.csv
Enviando para o S3 em: 'bronze/PNAD_COVID_082020.csv'
✅ Sucesso! Arquivo enviado para s3://fiaptechchallengefase3/bro

In [38]:
# Ler arquivos CSV Codigo IBGE e gerar Data Frame
print(f"\nLendo tabela com os códigos do IBGE de: {link_codigo_uf}")
df_uf = pd.read_csv(link_codigo_uf, sep=",")
print("Data Frame criado com sucesso")


Lendo tabela com os códigos do IBGE de: https://raw.githubusercontent.com/geoferreira1/fiap_tech_challenge_fase_3/refs/heads/main/covid/codigo_uf.csv
Data Frame criado com sucesso


In [39]:
# Carregar dados para camada silver

# Ler arquivos CSV PNAD COVID do S3, consolidar e gerar Data Frame
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=f'{s3_bronze}/') # Listar todos os objetos (arquivos) dentro do S3
bronze_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.csv')] # Pegar apenas arquivos .csv
lista_de_dataframes = []

print("Lendo arquivos da camada Bronze no S3")

# Iterar sobre os arquivos
for file_key in bronze_files:

    file_path = f"s3://{s3_bucket}/{file_key}"
    print(f"Lendo {file_path}")
    df_temp = pd.read_csv(file_path, storage_options=storage_options, sep=',')
    lista_de_dataframes.append(df_temp)

# Consolidar os data frame
df_consolidado = pd.concat(lista_de_dataframes, ignore_index=True)
print(f"\nConsolidação concluída e {len(df_consolidado)} linhas lidas da camada Bronze")

# Relacionar os Data Frame
print("\nIniciando enriquecimento dos dados com merge com o Data Frame df_uf")
df_silver = pd.merge(
    df_consolidado,
    df_uf,
    how='left',
    left_on='UF', 
    right_on='Código'
)

# Remover colunas não necessarias e ajustar o nome das colunas
df_silver = df_silver.drop(columns=["Código"])
df_silver = df_silver.rename(columns={"UF_x": "UF", "UF_y": "UF_Nome", "Região": "Regiao"})
print("Enriquecimento concluído")

# Ver o resultado final
print("\nData Frame com os primeiros registros")
display(df_silver.head(5))

print("Data Frame com os ultimos registros")
display(df_silver.tail(5))

Lendo arquivos da camada Bronze no S3
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_052020.csv
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_062020.csv
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_072020.csv
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_082020.csv
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_092020.csv
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_102020.csv
Lendo s3://fiaptechchallengefase3/bronze/PNAD_COVID_112020.csv

Consolidação concluída e 2650459 linhas lidas da camada Bronze

Iniciando enriquecimento dos dados com merge com o Data Frame df_uf
Enriquecimento concluído

Data Frame com os primeiros registros


Unnamed: 0,Ano,UF,CAPITAL,RM_RIDE,V1008,V1012,V1013,V1016,Estrato,UPA,...,F002A2,F002A3,F002A4,F002A5,A006A,A006B,A007A,UF_Nome,Sigla,Regiao
0,2020,11,11.0,,1,4,5,1,1110011,110015970,...,,,,,,,,Rondônia,RO,Norte
1,2020,11,11.0,,1,4,5,1,1110011,110015970,...,,,,,,,,Rondônia,RO,Norte
2,2020,11,11.0,,1,4,5,1,1110011,110015970,...,,,,,,,,Rondônia,RO,Norte
3,2020,11,11.0,,1,4,5,1,1110011,110015970,...,,,,,,,,Rondônia,RO,Norte
4,2020,11,11.0,,3,2,5,1,1110011,110015970,...,,,,,,,,Rondônia,RO,Norte


Data Frame com os ultimos registros


Unnamed: 0,Ano,UF,CAPITAL,RM_RIDE,V1008,V1012,V1013,V1016,Estrato,UPA,...,F002A2,F002A3,F002A4,F002A5,A006A,A006B,A007A,UF_Nome,Sigla,Regiao
2650454,2020,53,53.0,,6,3,11,7,5310220,530009738,...,1.0,1.0,2.0,1.0,,,,Distrito Federal,DF,Centro-Oeste
2650455,2020,53,53.0,,6,3,11,7,5310220,530009738,...,1.0,1.0,2.0,1.0,,,,Distrito Federal,DF,Centro-Oeste
2650456,2020,53,53.0,,6,3,11,7,5310220,530009738,...,1.0,1.0,2.0,1.0,1.0,4.0,,Distrito Federal,DF,Centro-Oeste
2650457,2020,53,53.0,,10,2,11,7,5310220,530009738,...,1.0,1.0,1.0,1.0,,,,Distrito Federal,DF,Centro-Oeste
2650458,2020,53,53.0,,10,2,11,7,5310220,530009738,...,1.0,1.0,1.0,1.0,,,,Distrito Federal,DF,Centro-Oeste


In [40]:
# Salvar Data Frame df_silver na camada silver
print(f"\nSalvando dados da camada Silver em: {caminho_saida_silver}")

df_silver.to_parquet(
    caminho_saida_silver,
    index=False,
    storage_options=storage_options
)

print("✅ Sucesso! Camada Silver criada e salva no S3")


Salvando dados da camada Silver em: s3://fiaptechchallengefase3/silver/pnad.delta
✅ Sucesso! Camada Silver criada e salva no S3


In [41]:
# Tratar os dados para a camada Gold
print(f"Lendo dados da camada Silver de: {caminho_entrada_silver}")

try:
    df_silver = pd.read_parquet(caminho_entrada_silver, storage_options=storage_options)
    print(f"Leitura concluída onde o dataFrame contem {len(df_silver)} linhas e {len(df_silver.columns)} colunas")

except Exception as e:
    print(f"❌ Erro ao ler o arquivo da camada Silver. Verifique o caminho e as permissões. Detalhes: {e}")
    sys.exit()

# Copiar e padronizar colunas para minúsculas
df_estruturado = df_silver.copy()
df_estruturado.columns = df_estruturado.columns.str.lower()

# Colunas fixas 
colunas_fixas = [
    'ano',       
    'v1013',     
    'v1012',     
    'uf',        
    'capital',   
    'rm_ride',   
    'uf_nome',   
    'sigla',     
    'regiao',    
]
# Colunas desejadas 
colunas_desejadas = [
    'a002',
    'a003', 
    'a004',  
    'a005', 
    'a006a',  
    'b002',  
    'b005',  
    'b006',  
    'b007',   
    'b008',   
    'b009b',  
    'b009d',  
    'b009f',  
    'b0011',  
    'b0012',  
    'b0013',  
    'b0014',  
    'b0015',  
    'b0016',  
    'b0017',  
    'b0018', 
    'b0019', 
    'b00110', 
    'b00111', 
    'b00112', 
    'b00113', 
    'b0101',  
    'b0102', 
    'b0103',  
    'b0104', 
    'b0105', 
    'b0106', 
    'c007b', 
    'f001'
]

# Unir listas mantendo a ordem e sem duplicar
todas_colunas = list(dict.fromkeys([*colunas_fixas, *colunas_desejadas]))

# Manter apenas as colunas que existem no DataFrame após padronização
colunas_existentes = [c for c in todas_colunas if c in df_estruturado.columns]

# Filtro com os 3 últimos valores de v1013
if 'v1013' in df_estruturado.columns:
    # Coletar valores únicos (excluindo NaN), ordenar e pegar os 3 últimos
    ultimos_3 = sorted(pd.unique(df_estruturado['v1013'].dropna()))[-3:]

    # Filtrar registros pertencentes aos 3 últimos meses e selecionar colunas
    estrutura_final = (
        df_estruturado[df_estruturado['v1013'].isin(ultimos_3)][colunas_existentes]
        .reset_index(drop=True)
    )
else:
    # Se não houver v1013, apenas seleciona as colunas existentes
    estrutura_final = df_estruturado[colunas_existentes].reset_index(drop=True)

# Avisos sobre colunas ausentes
faltantes = [c for c in todas_colunas if c not in df_estruturado.columns]
if faltantes:
    print(f"Atenção: estas colunas não foram encontradas e ficaram de fora: {faltantes}")

# Ver o resultado final
print("\nData Frame com os primeiros registros")
display(estrutura_final.head(5))

print("Data Frame com os ultimos registros")
display(estrutura_final.tail(5))

Lendo dados da camada Silver de: s3://fiaptechchallengefase3/silver/pnad.delta
Leitura concluída onde o dataFrame contem 2650459 linhas e 151 colunas

Data Frame com os primeiros registros


Unnamed: 0,ano,v1013,v1012,uf,capital,rm_ride,uf_nome,sigla,regiao,a002,...,b00112,b00113,b0101,b0102,b0103,b0104,b0105,b0106,c007b,f001
0,2020,9,4,11,11.0,,Rondônia,RO,Norte,36,...,2,2.0,2.0,2.0,2.0,2.0,2.0,2.0,1.0,1
1,2020,9,4,11,11.0,,Rondônia,RO,Norte,30,...,2,2.0,2.0,2.0,2.0,2.0,2.0,2.0,,1
2,2020,9,4,11,11.0,,Rondônia,RO,Norte,13,...,2,2.0,2.0,2.0,2.0,2.0,2.0,2.0,,1
3,2020,9,4,11,11.0,,Rondônia,RO,Norte,11,...,2,2.0,2.0,2.0,2.0,2.0,2.0,2.0,,1
4,2020,9,1,11,11.0,,Rondônia,RO,Norte,57,...,2,2.0,2.0,2.0,2.0,2.0,2.0,2.0,,1


Data Frame com os ultimos registros


Unnamed: 0,ano,v1013,v1012,uf,capital,rm_ride,uf_nome,sigla,regiao,a002,...,b00112,b00113,b0101,b0102,b0103,b0104,b0105,b0106,c007b,f001
1149192,2020,11,3,53,53.0,,Distrito Federal,DF,Centro-Oeste,45,...,2,2.0,2.0,1.0,2.0,2.0,2.0,2.0,1.0,1
1149193,2020,11,3,53,53.0,,Distrito Federal,DF,Centro-Oeste,22,...,2,2.0,2.0,2.0,2.0,2.0,2.0,2.0,,1
1149194,2020,11,3,53,53.0,,Distrito Federal,DF,Centro-Oeste,16,...,2,2.0,2.0,2.0,2.0,2.0,1.0,2.0,,1
1149195,2020,11,2,53,53.0,,Distrito Federal,DF,Centro-Oeste,83,...,2,2.0,1.0,1.0,2.0,2.0,2.0,2.0,,5
1149196,2020,11,2,53,53.0,,Distrito Federal,DF,Centro-Oeste,75,...,2,2.0,2.0,1.0,2.0,2.0,2.0,2.0,,5


In [42]:
# Carregar os dados para a Gold
print(f"\nSalvando dados da camada Gold em: {caminho_saida_gold}")

# Salvar o data frame como .parquet 
estrutura_final.to_parquet(
    caminho_saida_gold,
    index=False,
    storage_options=storage_options # Parametro necessario para conseguir conectar o Pandas ao AWS
)

print(f"✅ Sucesso! Camada Gold criada com {len(estrutura_final)} linhas e salva no S3.")


Salvando dados da camada Gold em: s3://fiaptechchallengefase3/gold/pnad_final.delta
✅ Sucesso! Camada Gold criada com 1149197 linhas e salva no S3.


In [43]:
# Criar a tabela no banco de dados PostgreSQL
print("Iniciando a criação do esquema da tabela")

try:
    df_schema = estrutura_final.head(0)
    df_schema.to_sql(
        nome_tabela, 
        con=engine, 
        if_exists='replace', # Dropar a tabela
        index=False)
    
    print(f"✅ Esquema da tabela '{nome_tabela}' criado com sucesso no PostgreSQL!")

    # Adicionar colunas padrão de carga com valores DEFAULT
    print("\nAdicionando colunas de metadados (data_hora_carga, usuario_carga)")

    with engine.connect() as connection:
        comando_sql_data = text(f"""
            ALTER TABLE {nome_tabela}
            ADD COLUMN data_hora_carga TIMESTAMPTZ DEFAULT NOW();
        """)
        
        comando_sql_usuario = text(f"""
            ALTER TABLE {nome_tabela}
            ADD COLUMN usuario_carga VARCHAR(255) DEFAULT CURRENT_USER;
        """)

        connection.execute(comando_sql_data)
        connection.execute(comando_sql_usuario)
        connection.commit()

        print(f"✅ Colunas de metadados adicionadas com sucesso à tabela '{nome_tabela}'.")

except Exception as e:
    print(f"❌ Erro durante a criação ou alteração da tabela: {e}")
    sys.exit()

Iniciando a criação do esquema da tabela
✅ Esquema da tabela 'questionario_covid' criado com sucesso no PostgreSQL!

Adicionando colunas de metadados (data_hora_carga, usuario_carga)
✅ Colunas de metadados adicionadas com sucesso à tabela 'questionario_covid'.


In [None]:
# Carregar dados da Gold para PostgreSQL
with engine.connect() as connection:
    try:
        print(f"Lendo dados de: {caminho_completo_gold}")
        df_gold = pd.read_parquet(caminho_completo_gold, storage_options=storage_options)
        print(f"Leitura concluída. DataFrame com {len(df_gold)} linhas.")

        # Obtemos a conexão do psycopg2 a partir da conexão da engine
        raw_conn = connection.connection
        cursor = raw_conn.cursor()

        # Truncar a tabela
        print(f"Limpando a tabela de destino '{nome_tabela}'")
        truncate_query = sql.SQL("TRUNCATE TABLE {}").format(sql.Identifier(nome_tabela))
        cursor.execute(truncate_query)
        
        # Carregar os dados via COPY
        buffer = io.StringIO()
        df_gold.to_csv(buffer, index=False, header=False, sep='\t')
        buffer.seek(0)
        print(f"Iniciando a carga de {len(df_gold)} linhas via COPY")
        colunas_df = list(df_gold.columns)
        cursor.copy_from(buffer, nome_tabela, sep='\t', null='', columns=colunas_df)
        print("Carga via COPY concluída")
        raw_conn.commit()
        cursor.close()
        print(f"✅ Sucesso! Transação efetivada na tabela '{nome_tabela}'")

        # Salvar os dados da tabela do PostgreSQL em um .csv
        print("\nIniciando a exportação dos dados da tabela para .csv")
        sql_query = f"""SELECT 
                          ano,
                          v1013 AS mes_pesquisa,
                          v1012 AS semana_mes,
                          uf_nome,
                          sigla AS uf,
                          regiao,
                          a002 AS idade,
                          CASE
                              WHEN CAST(A003 AS INTEGER) = 1 THEN 'Masculino'
                              WHEN CAST(A003 AS INTEGER) = 2 THEN 'Feminino'
                              ELSE 'Desconhecido'
                          END AS sexo,
                          CASE
                              WHEN CAST(A004 AS INTEGER) = 1 THEN 'Branca'
                              WHEN CAST(A004 AS INTEGER) = 2 THEN 'Preta'
                              WHEN CAST(A004 AS INTEGER) = 3 THEN 'Amarela'
                              WHEN CAST(A004 AS INTEGER) = 4 THEN 'Parda'
                              WHEN CAST(A004 AS INTEGER) = 5 THEN 'Indígena'
                              ELSE 'Ignorada'
                          END AS cor,
                          CASE
                              WHEN CAST(A005 AS INTEGER) = 1 THEN 'Sem instrução'
                              WHEN CAST(A005 AS INTEGER) = 2 THEN 'Ensino Fundamental incompleto'
                              WHEN CAST(A005 AS INTEGER) = 3 THEN 'Ensino Fundamental completo'
                              WHEN CAST(A005 AS INTEGER) = 4 THEN 'Ensino Médio incompleto'
                              WHEN CAST(A005 AS INTEGER) = 5 THEN 'Ensino Médio completo'
                              WHEN CAST(A005 AS INTEGER) = 6 THEN 'Ensino Superior incompleto'
                              WHEN CAST(A005 AS INTEGER) = 7 THEN 'Ensino Superior completo'
                              WHEN CAST(A005 AS INTEGER) = 8 THEN 'Pós graduação, mestrado ou doutorado'
                              ELSE 'Desconhecido'
                          END AS escolaridade,
                          CASE
                              WHEN CAST(a006a AS INTEGER) = 1 THEN 'Pública'
                              WHEN CAST(a006a AS INTEGER) = 2 THEN 'Privada'
                              ELSE 'Desconhecido'
                          END AS tipo_instituicao,
                          CASE
                              WHEN CAST(b002 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b002 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b002 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS buscou_auxilio_medico,
                          CASE
                              WHEN CAST(b005 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b005 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b005 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS precisou_de_internacao,
                          CASE
                              WHEN CAST(b006 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b006 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b006 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS precisou_de_sedacao,
                          CASE
                              WHEN CAST(b007 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b007 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b007 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS plano_de_saude,
                          CASE
                              WHEN CAST(b008 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b008 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b008 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS realizou_teste_covid,
                          CASE
                              WHEN CAST(b009b AS INTEGER) = 1 THEN 'Positivo'
                              WHEN CAST(b009b AS INTEGER) = 2 THEN 'Negativo'
                              WHEN CAST(b009b AS INTEGER) = 3 THEN 'Inconclusivo'
                              WHEN CAST(b009b AS INTEGER) = 4 THEN 'Aguardando resultado'
                              WHEN CAST(b009b AS INTEGER) = 9 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS resultado_teste_swab,
                          CASE
                              WHEN CAST(b009d AS INTEGER) = 1 THEN 'Positivo'
                              WHEN CAST(b009d AS INTEGER) = 2 THEN 'Negativo'
                              WHEN CAST(b009d AS INTEGER) = 3 THEN 'Inconclusivo'
                              WHEN CAST(b009d AS INTEGER) = 4 THEN 'Aguardando resultado'
                              WHEN CAST(b009d AS INTEGER) = 9 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS resultado_teste_dedo,
                          CASE
                              WHEN CAST(b009f AS INTEGER) = 1 THEN 'Positivo'
                              WHEN CAST(b009f AS INTEGER) = 2 THEN 'Negativo'
                              WHEN CAST(b009f AS INTEGER) = 3 THEN 'Inconclusivo'
                              WHEN CAST(b009f AS INTEGER) = 4 THEN 'Aguardando resultado'
                              WHEN CAST(b009f AS INTEGER) = 9 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS resultado_teste_veia,
                          CASE
                              WHEN CAST(b0011 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0011 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0011 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS frebre_semana_anterior,
                          CASE
                              WHEN CAST(b0012 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0012 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0012 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS tosse_semana_anterior,
                          CASE
                              WHEN CAST(b0013 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0013 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0013 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS dor_de_garganta_semana_anterior,
                          CASE
                              WHEN CAST(b0014 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0014 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0014 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS dificuldade_de_respirar_semana_anterior,
                          CASE
                              WHEN CAST(b0015 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0015 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0015 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS dor_de_cabeca_semana_anterior,
                          CASE
                              WHEN CAST(b0016 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0016 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0016 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS dor_no_peito_semana_anterior,
                          CASE
                              WHEN CAST(b0017 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0017 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0017 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS nausea_semana_anterior,
                          CASE
                              WHEN CAST(b0018 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0018 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0018 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS nariz_constipado_semana_anterior,
                          CASE
                              WHEN CAST(b0019 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0019 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0019 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS fadiga_semana_anterior,
                          CASE
                              WHEN CAST(b00110 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b00110 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b00110 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS dor_nos_olhos_semana_anterior,
                          CASE
                              WHEN CAST(b00111 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b00111 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b00111 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS perda_olfato_paladar_semana_anterior,
                          CASE
                              WHEN CAST(b00112 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b00112 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b00112 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS dor_muscular_semana_anterior,
                          CASE
                              WHEN CAST(b00113 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b00113 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b00113 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS diarreia_semana_anterior,
                          CASE
                              WHEN CAST(b0101 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0101 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0101 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS diabetes,
                          CASE
                              WHEN CAST(b0102 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0102 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0102 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS hipertensao,
                          CASE
                              WHEN CAST(b0103 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0103 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0103 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS doenca_respiratoria,
                          CASE
                              WHEN CAST(b0104 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0104 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0104 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS doencas_cardiacas,
                          CASE
                              WHEN CAST(b0105 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0105 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0105 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS depressao,
                          CASE
                              WHEN CAST(b0106 AS INTEGER) = 1 THEN 'Sim'
                              WHEN CAST(b0106 AS INTEGER) = 2 THEN 'Não'
                              WHEN CAST(b0106 AS INTEGER) = 3 THEN 'Ignorado'
                              ELSE 'Desconhecido'
                          END AS cancer,
                          CASE
                              WHEN CAST(c007b AS INTEGER) = 1 THEN 'Sim, carteira assinada'
                              WHEN CAST(c007b AS INTEGER) = 2 THEN 'Sim, servidor público'
                              WHEN CAST(c007b AS INTEGER) = 3 THEN 'Não'
                              ELSE 'Desconhecido'
                          END AS trabalha_atualmente,
                          CASE
                              WHEN CAST(f001 AS INTEGER) = 1 THEN 'Própria'
                              WHEN CAST(f001 AS INTEGER) = 2 THEN 'Própria'
                              WHEN CAST(f001 AS INTEGER) = 3 THEN 'Aluguel'
                              WHEN CAST(f001 AS INTEGER) = 4 THEN 'Cedido'
                              WHEN CAST(f001 AS INTEGER) = 5 THEN 'Cedido'
                              WHEN CAST(f001 AS INTEGER) = 6 THEN 'Cedido'
                              WHEN CAST(f001 AS INTEGER) = 7 THEN 'Outras'
                              ELSE 'Desconhecido'
                          END AS modaria
                    FROM {nome_tabela};"""
        df_exportar = pd.read_sql_query(sql_query, connection)
        diretorio_script = os.getcwd() 
        nome_arquivo_csv = f"{nome_tabela}_exportado.csv"
        caminho_csv_exportado = os.path.join(diretorio_script, nome_arquivo_csv)
        df_exportar.to_csv(caminho_csv_exportado, index=False, sep=';', encoding='utf-8-sig')
        print("✅ Dados exportados com sucesso!")
        print(f"Localização: {caminho_csv_exportado}")
        
        print("\nProcesso concluído com sucesso!")

    except Exception as e:
        print(f"❌ Ocorreu um erro durante o processo: {e}")
        if 'raw_conn' in locals() and not raw_conn.closed:
             raw_conn.rollback()
             print("Transação revertida (rollback).")

Lendo dados de: s3://fiaptechchallengefase3/gold/pnad_final.delta
Leitura concluída. DataFrame com 1149197 linhas.
Limpando a tabela de destino 'questionario_covid'
Iniciando a carga de 1149197 linhas via COPY
Carga via COPY concluída
✅ Sucesso! Transação efetivada na tabela 'questionario_covid'

Iniciando a exportação dos dados da tabela para .csv
✅ Dados exportados com sucesso!
Localização: /Users/geo/Documents/fiap_tech_challenge_fase_3/questionario_covid_exportado.csv

Processo concluído com sucesso!
