### imports

In [1]:
# Importar biblioteca completa
import requests
import zipfile
import io
import pandas as pd
import os
import boto3
import sys
import time 

# Importar algo especifico de uma biblioteca
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from botocore.exceptions import BotoCoreError, ClientError
from psycopg2 import sql

### Testar a conexão de dados

In [2]:
# Testar a conexão ao banco de dados
def test_connection(engine):

    try:
        with engine.connect() as connection:
            
            # Testar a versão do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("✅ Conectado com sucesso:", versao[0])

            # Listar as tabelas no schema público
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public';
            """))
            tabelas = result.fetchall()
            print("📄 Tabelas no banco:")
            for tabela in tabelas:
                print("-", tabela[0])

    except Exception as e:
        print("❌ Erro ao executar comandos:", e)
        sys.exit()


### Variáveis

In [None]:
# Nome do bucket S3 e subpastas
s3_bucket = 'postech-covid19-gp81'
s3_bronze = 'bronze'
s3_silver = 'silver'
s3_gold = 'gold'

silver_prefix = f's3://{s3_bucket}/{s3_silver}/'
gold_prefix = f's3://{s3_bucket}/{s3_gold}/'

# Caminho do Github com dados do PNAD
api_url = 'https://api.github.com/repos/Pedr012/TECH-CHALLENGE-FASE-3---COVID19/contents/BASE_PNAD_COVID'


### Configuração AWS e Banco de Dados

In [4]:
# Carregar as credencias do .env
load_dotenv()

# Configuração storage_options
storage_options = {
    "key": os.getenv('AWS_ACCESS_KEY_ID'),
    "secret": os.getenv('AWS_SECRET_ACCESS_KEY'),
    "token": os.getenv('AWS_SESSION_TOKEN')
}

# Configuração S3
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    aws_session_token=os.getenv('AWS_SESSION_TOKEN'),
    region_name=os.getenv('AWS_REGION')
)

# Credenciais do PostgreSQL
usuario_pg = os.getenv("POSTGRES_USER")
senha_pg = os.getenv("POSTGRES_PASSWORD")
host_pg = os.getenv("POSTGRES_HOST")
porta_pg = os.getenv("POSTGRES_PORT")
banco_pg = os.getenv("POSTGRES_DATABASE")

### Validar e testar as conexões

In [5]:
# Validar conexão com a AWS através do .env
load_dotenv()

try:
    sts_client = boto3.client(
        'sts',
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
        aws_session_token=os.getenv('AWS_SESSION_TOKEN'),
        region_name=os.getenv('AWS_REGION')
    )
    
    identity = sts_client.get_caller_identity()
    print("✅ Conectado à conta\n")
    print("UserId:", identity["UserId"])
    print("Account:", identity["Account"])
    print("Arn:", identity["Arn"])

except (BotoCoreError, ClientError) as e:
    print("❌ Erro ao conectar à AWS. Verifique suas credenciais e tente novamente.")
    print("Detalhes do erro:", e)

✅ Conectado à conta

UserId: AROA6GBMGAURQRLE7O4BA:user1578413=Pedro_Henrique_Rocha_Farias
Account: 975050245411
Arn: arn:aws:sts::975050245411:assumed-role/voclabs/user1578413=Pedro_Henrique_Rocha_Farias


In [6]:
# Verificar conexão com o bucket S3
try:
    s3_client.head_bucket(Bucket=s3_bucket)
    print(f"✅ Bucket '{s3_bucket}' encontrado e acessível")
    
    # Listar objetos no bucket para mostrar o conteúdo
    response = s3_client.list_objects_v2(Bucket=s3_bucket)
    
    if 'Contents' in response:
        print(f"\n📁 Conteúdo do bucket '{s3_bucket}':")
        for obj in response['Contents']:
            print(f"  - {obj['Key']} ({obj['Size']} bytes)")
    else:
        print(f"\n📁 Bucket '{s3_bucket}' está vazio")
        
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == '404':
        print(f"❌ Bucket '{s3_bucket}' não encontrado")
    elif error_code == '403':
        print(f"⚠️ Bucket '{s3_bucket}' existe, mas você não tem permissão para acessá-lo")
        print("Verifique suas credenciais AWS ou as políticas de permissão do bucket")
    else:
        print(f"❌ Erro ao verificar bucket '{s3_bucket}': {e}")
    sys.exit()
except Exception as e:
    print(f"❌ Erro inesperado ao verificar bucket '{s3_bucket}': {e}")
    sys.exit()

✅ Bucket 'postech-covid19-gp81' encontrado e acessível

📁 Conteúdo do bucket 'postech-covid19-gp81':
  - bronze/.keep (0 bytes)
  - bronze/PNAD_COVID_082020.csv (115229404 bytes)
  - bronze/PNAD_COVID_092020.csv (115445896 bytes)
  - bronze/PNAD_COVID_102020.csv (113515233 bytes)
  - bronze/PNAD_COVID_112020.csv (115141700 bytes)
  - gold/.keep (0 bytes)
  - silver/.keep (0 bytes)


In [7]:
# Criar estrutura de subpastas no bucket S3
subpastas = [s3_bronze, s3_silver, s3_gold]

for subpasta in subpastas:
    try:
        # Verificar se a subpasta já existe
        key = f"{subpasta}/.keep"
        
        try:
            s3_client.head_object(Bucket=s3_bucket, Key=key)
            print(f"ℹ️ Subpasta '{subpasta}' já existe")
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                # Subpasta não existe, criar
                s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=key,
                    Body=b''
                )
                print(f"✅ Subpasta '{subpasta}' criada com sucesso")
            else:
                raise e
        
    except Exception as e:
        print(f"❌ Erro ao processar subpasta '{subpasta}': {e}")

ℹ️ Subpasta 'bronze' já existe
ℹ️ Subpasta 'silver' já existe
ℹ️ Subpasta 'gold' já existe


### Ingesta dos dados na camada bronze

In [8]:
# Primeiro, obter a lista de arquivos da API do GitHub
try:
    print("🔄 Obtendo lista de arquivos do repositório GitHub...")
    response = requests.get(api_url)
    response.raise_for_status()
    arquivos = response.json()
    
    print(f"✅ Encontrados {len(arquivos)} arquivos no repositório")
    
    # Filtrar apenas arquivos .zip
    arquivos_zip = [arquivo for arquivo in arquivos if arquivo['name'].endswith('.zip')]
    print(f"📦 Encontrados {len(arquivos_zip)} arquivos ZIP para ingestão")
    
    # Obter lista de arquivos já existentes na camada bronze
    try:
        response_s3 = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=f"{s3_bronze}/")
        arquivos_existentes = []
        if 'Contents' in response_s3:
            arquivos_existentes = [obj['Key'].replace(f"{s3_bronze}/", "") for obj in response_s3['Contents'] if obj['Key'].endswith('.csv')]
        print(f"📁 Encontrados {len(arquivos_existentes)} arquivos já existentes na camada bronze")
    except Exception as e:
        print(f"⚠️ Erro ao verificar arquivos existentes: {e}")
        arquivos_existentes = []
    
    bases_ingeridas = []
    bases_ignoradas = []
    
    for arquivo in arquivos_zip:
        nome_arquivo = arquivo['name']
        download_url = arquivo['download_url']
        
        try:
            print(f"🔄 Processando arquivo: {nome_arquivo}")
            
            # Baixar o arquivo ZIP
            zip_response = requests.get(download_url)
            zip_response.raise_for_status()
            
            # Extrair arquivos CSV do ZIP
            with zipfile.ZipFile(io.BytesIO(zip_response.content), 'r') as zip_file:
                # Listar arquivos no ZIP
                arquivos_no_zip = zip_file.namelist()
                arquivos_csv_no_zip = [f for f in arquivos_no_zip if f.endswith('.csv')]
                
                print(f"📊 Encontrados {len(arquivos_csv_no_zip)} arquivos CSV no ZIP")
                
                for arquivo_csv in arquivos_csv_no_zip:
                    nome_arquivo_csv = os.path.basename(arquivo_csv)
                    
                    # Verificar se o arquivo já existe na camada bronze
                    if nome_arquivo_csv in arquivos_existentes:
                        print(f"⏭️ {nome_arquivo_csv} já existe na camada bronze - ignorando")
                        bases_ignoradas.append(nome_arquivo_csv)
                        continue
                    
                    # Extrair conteúdo do arquivo CSV
                    conteudo_csv = zip_file.read(arquivo_csv)
                    
                    # Fazer upload para S3 bronze
                    s3_key = f"{s3_bronze}/{nome_arquivo_csv}"
                    
                    s3_client.put_object(
                        Bucket=s3_bucket,
                        Key=s3_key,
                        Body=conteudo_csv,
                        ContentType='text/csv'
                    )
                    
                    # Verificar se o arquivo foi carregado com sucesso
                    try:
                        s3_client.head_object(Bucket=s3_bucket, Key=s3_key)
                        print(f"✅ {nome_arquivo_csv} extraído e ingerido com sucesso na camada bronze")
                        bases_ingeridas.append(nome_arquivo_csv)
                    except ClientError:
                        print(f"❌ Falha na verificação do arquivo {nome_arquivo_csv}")
                        
        except Exception as e:
            print(f"❌ Erro ao processar {nome_arquivo}: {e}")
    
    # Mostrar resumo final
    print("\n" + "="*50)
    print("📊 RESUMO DA INGESTÃO:")
    print(f"✅ Bases ingeridas: {len(bases_ingeridas)}")
    for base in bases_ingeridas:
        print(f"  📁 {base}")
    
    if bases_ignoradas:
        print(f"⏭️ Bases ignoradas (já existentes): {len(bases_ignoradas)}")
        for base in bases_ignoradas:
            print(f"  📁 {base}")
    print("="*50)
    
except Exception as e:
    print(f"❌ Erro geral na ingestão: {e}")


🔄 Obtendo lista de arquivos do repositório GitHub...
✅ Encontrados 5 arquivos no repositório
📦 Encontrados 4 arquivos ZIP para ingestão
📁 Encontrados 4 arquivos já existentes na camada bronze
🔄 Processando arquivo: PNAD_COVID_082020.zip
📊 Encontrados 1 arquivos CSV no ZIP
⏭️ PNAD_COVID_082020.csv já existe na camada bronze - ignorando
🔄 Processando arquivo: PNAD_COVID_092020.zip
📊 Encontrados 1 arquivos CSV no ZIP
⏭️ PNAD_COVID_092020.csv já existe na camada bronze - ignorando
🔄 Processando arquivo: PNAD_COVID_102020.zip
📊 Encontrados 1 arquivos CSV no ZIP
⏭️ PNAD_COVID_102020.csv já existe na camada bronze - ignorando
🔄 Processando arquivo: PNAD_COVID_112020.zip
📊 Encontrados 1 arquivos CSV no ZIP
⏭️ PNAD_COVID_112020.csv já existe na camada bronze - ignorando

📊 RESUMO DA INGESTÃO:
✅ Bases ingeridas: 0
⏭️ Bases ignoradas (já existentes): 4
  📁 PNAD_COVID_082020.csv
  📁 PNAD_COVID_092020.csv
  📁 PNAD_COVID_102020.csv
  📁 PNAD_COVID_112020.csv


## Teste Conexão DB

In [9]:
# Criar engine de conexão usando as variáveis do .env
engine = create_engine(f"postgresql+psycopg2://{usuario_pg}:{senha_pg}@{host_pg}:{porta_pg}/{banco_pg}")

In [10]:
def test_connection(engine):
    try:
        with engine.connect() as connection:
            # Testa a versão do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("✅ Conectado com sucesso:", versao[0])

            # Lista as tabelas no schema público
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public';
            """))
            tabelas = result.fetchall()
            print("📄 Tabelas no banco:")
            for tabela in tabelas:
                print("  -", tabela[0])

    except Exception as e:
        print("❌ Erro ao executar comandos:", e)

In [11]:
test_connection(engine)

✅ Conectado com sucesso: PostgreSQL 17.4 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit
📄 Tabelas no banco:
  - pnad_covid_092020
  - pnad_covid_102020
  - pnad_covid_112020


In [20]:
# Criar conexão psycopg2 pura a partir do engine SQLAlchemy
conn = engine.raw_connection()

In [12]:
with engine.connect() as connection:
    # Verificar se o banco de dados já existe
    result = connection.execute(text("""
        SELECT 1 FROM pg_database WHERE datname = 'postechcovid';
    """))
    
    if result.fetchone():
        print("ℹ️ Banco de dados 'postechcovid' já existe")
    else:
        connection.execute(text("COMMIT"))  # Necessário para criar banco fora de transação
        connection.execute(text("CREATE DATABASE postechcovid;"))
        print("✅ Banco de dados 'postechcovid' criado com sucesso!")

ℹ️ Banco de dados 'postechcovid' já existe


### Criar as tabelas no Banco de Dados


In [13]:
sql_files = [
    r"E:\Documentos\Pós Tech\Fase 3 - Big Data\Tech Challenge\SQL\postgres_bronze_092020.sql",
    r"E:\Documentos\Pós Tech\Fase 3 - Big Data\Tech Challenge\SQL\postgres_bronze_102020.sql",
    r"E:\Documentos\Pós Tech\Fase 3 - Big Data\Tech Challenge\SQL\postgres_bronze_112020.sql",
]

# Mapear arquivos SQL para seus nomes de tabelas correspondentes
table_names = {
    "postgres_bronze_092020.sql": "pnad_covid_092020",
    "postgres_bronze_102020.sql": "pnad_covid_102020", 
    "postgres_bronze_112020.sql": "pnad_covid_112020"
}

# Executar cada arquivo SQL com validação
with engine.begin() as conn:
    for file_path in sql_files:
        file_name = file_path.split('\\')[-1]  # Extrair nome do arquivo
        table_name = table_names.get(file_name)
        
        if table_name:
            # Verificar se a tabela já existe
            result = conn.execute(text("""
                SELECT EXISTS (
                    SELECT FROM information_schema.tables 
                    WHERE table_schema = 'public' 
                    AND table_name = :table_name
                );
            """), {"table_name": table_name})
            
            table_exists = result.scalar()
            
            if table_exists:
                print(f"ℹ️ Tabela '{table_name}' já existe - pulando criação")
                continue
        
        # Executar o arquivo SQL para criar a tabela
        with open(file_path, "r", encoding="utf-8") as file:
            sql_content = file.read()
            conn.execute(text(sql_content))
            print(f"✅ Tabela criada: {file_name}")

ℹ️ Tabela 'pnad_covid_092020' já existe - pulando criação
ℹ️ Tabela 'pnad_covid_102020' já existe - pulando criação
ℹ️ Tabela 'pnad_covid_112020' já existe - pulando criação


In [21]:
# Ingestão otimizada: COPY direto do CSV do S3 para o PostgreSQL usando conexão já existente
import io

if 'conn' not in globals():
    raise RuntimeError("A conexão 'conn' com o banco de dados precisa estar criada antes de rodar esta célula.")

print("🔄 Iniciando ingestão otimizada via COPY (CSV -> PostgreSQL)...")

mapeamento_csv_tabela = {
    'PNAD_COVID_092020.csv': 'pnad_covid_092020',
    'PNAD_COVID_102020.csv': 'pnad_covid_102020',
    'PNAD_COVID_112020.csv': 'pnad_covid_112020'
}

for arquivo_csv, nome_tabela in mapeamento_csv_tabela.items():
    try:
        print(f"\n📊 Processando {arquivo_csv} -> {nome_tabela}...")
        s3_key_csv = f"{s3_bronze}/{arquivo_csv}"
        # Baixar CSV do S3 para memória
        csv_obj = s3_client.get_object(Bucket=s3_bucket, Key=s3_key_csv)
        csv_bytes = csv_obj['Body'].read()
        csv_buffer = io.BytesIO(csv_bytes)

        # Usar conexão já existente (conn)
        cur = conn.cursor()
        cur.execute(f"TRUNCATE TABLE {nome_tabela};")
        conn.commit()
        cur.copy_expert(f"COPY {nome_tabela} FROM STDIN WITH (FORMAT CSV, HEADER TRUE, DELIMITER ',')", csv_buffer)
        conn.commit()
        print(f"✅ Dados inseridos em {nome_tabela} via COPY!")
        cur.close()
    except Exception as e:
        print(f"❌ Erro ao processar {arquivo_csv} via COPY: {e}")

print("\n" + "="*50)
print("📊 RESUMO DA CARGA OTIMIZADA NO POSTGRESQL (COPY):")
with engine.connect() as conn_check:
    for arquivo_csv, nome_tabela in mapeamento_csv_tabela.items():
        try:
            result = conn_check.execute(text(f"SELECT COUNT(*) FROM {nome_tabela};"))
            count = result.scalar()
            print(f"📋 {nome_tabela}: {count:,} registros")
        except Exception as e:
            print(f"❌ Erro ao verificar {nome_tabela}: {e}")
print("="*50)


🔄 Iniciando ingestão otimizada via COPY (CSV -> PostgreSQL)...

📊 Processando PNAD_COVID_092020.csv -> pnad_covid_092020...
✅ Dados inseridos em pnad_covid_092020 via COPY!

📊 Processando PNAD_COVID_102020.csv -> pnad_covid_102020...
✅ Dados inseridos em pnad_covid_092020 via COPY!

📊 Processando PNAD_COVID_102020.csv -> pnad_covid_102020...
✅ Dados inseridos em pnad_covid_102020 via COPY!

📊 Processando PNAD_COVID_112020.csv -> pnad_covid_112020...
✅ Dados inseridos em pnad_covid_102020 via COPY!

📊 Processando PNAD_COVID_112020.csv -> pnad_covid_112020...
✅ Dados inseridos em pnad_covid_112020 via COPY!

📊 RESUMO DA CARGA OTIMIZADA NO POSTGRESQL (COPY):
✅ Dados inseridos em pnad_covid_112020 via COPY!

📊 RESUMO DA CARGA OTIMIZADA NO POSTGRESQL (COPY):
📋 pnad_covid_092020: 387,298 registros
📋 pnad_covid_092020: 387,298 registros
📋 pnad_covid_102020: 380,461 registros
📋 pnad_covid_102020: 380,461 registros
📋 pnad_covid_112020: 381,438 registros
📋 pnad_covid_112020: 381,438 registros


O método COPY do PostgreSQL é uma das formas mais rápidas e eficientes de inserir grandes volumes de dados em uma tabela. Ele permite importar dados diretamente de um arquivo (ou stream) no formato CSV, TXT, etc., para dentro do banco, minimizando a sobrecarga de transações e parsing linha a linha.

No seu notebook, a ingestão está sendo realizada assim:

Para cada arquivo CSV no S3, o arquivo é baixado para a memória.
Uma conexão nativa psycopg2 é usada para acessar métodos avançados do PostgreSQL.
Antes de inserir, a tabela de destino é truncada (limpa).
O método cur.copy_expert executa o comando COPY, lendo o CSV diretamente da memória e inserindo em lote na tabela, com suporte a cabeçalho e delimitador.
Após cada carga, um resumo é impresso mostrando o total de registros em cada tabela.
Esse processo garante ingestão rápida, eficiente e segura, aproveitando o máximo desempenho do PostgreSQL para cargas de dados em massa.