## Import libs

In [1]:
import requests
import zipfile
import io
import pandas as pd
import json 
import os
import boto3
import sys
import time 
import duckdb
import datetime
import uuid
import requests
import pandas as pd
from io import BytesIO
from zipfile import ZipFile


from dotenv import load_dotenv
from pyathena import connect
from sqlalchemy import create_engine, text
from botocore.exceptions import BotoCoreError, ClientError
from psycopg2 import sql
from pyspark.sql import SparkSession
from io import BytesIO

pd.set_option('display.max_columns', None) 


## Fun√ß√µes

In [2]:
# Testar a conex√£o ao banco de dados
def test_connection(engine):

    try:
        with engine.connect() as connection:
            
            # Testar a vers√£o do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("‚úÖ Conectado com sucesso:", versao[0])

            # Listar as tabelas no schema p√∫blico
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public';
            """))
            tabelas = result.fetchall()
            print("üìÑ Tabelas no banco:")
            for tabela in tabelas:
                print("-", tabela[0])

    except Exception as e:
        print("‚ùå Erro ao executar comandos:", e)
        sys.exit()

In [3]:
# Fun√ß√£o para upload Parquet para S3
def upload_parquet_to_s3(df: pd.DataFrame, layer_name: str, file_name: str = None):
    bio = BytesIO()
    df.to_parquet(bio, index=False, engine='pyarrow', compression='snappy')
    bio.seek(0)

    if not file_name:
        ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
        uid = uuid.uuid4().hex[:8]
        file_name = f"pnad_raw_{ts}_{uid}.parquet"

    # Construa a chave S3 corretamente
    s3_key = f"{layer_name}/{file_name}"
    s3_client.upload_fileobj(bio, s3_bucket, s3_key)
    print(f"‚úÖ RAW: s3://{s3_bucket}/{s3_key}")

In [4]:
# L√™ todos os arquivos Parquet de um prefixo S3 e concatena em um DataFrame.
def read_parquet_from_s3(bucket, prefix):
    all_dfs = []
    try:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        if 'Contents' not in response:
            print(f"‚ùå Nenhum arquivo encontrado em s3://{bucket}/{prefix}")
            return pd.DataFrame()

        for obj in response['Contents']:
            # Pula se o objeto for uma pasta (termina com '/')
            if obj['Key'].endswith('/'):
                continue

            print(f"Lendo arquivo: s3://{bucket}/{obj['Key']}")
            obj_data = s3_client.get_object(Bucket=bucket, Key=obj['Key'])
            df = pd.read_parquet(BytesIO(obj_data['Body'].read()))
            all_dfs.append(df)

        if all_dfs:
            return pd.concat(all_dfs, ignore_index=True)
        else:
            return pd.DataFrame()

    except Exception as e:
        print(f"‚ùå Erro ao ler arquivos do S3: {e}")
        return pd.DataFrame()



In [35]:
# Salva um DataFrame em um arquivo Parquet e faz o upload para uma camada S3.
def upload_to_s3_layer(df: pd.DataFrame, layer_name: str, file_name: str = None):
    bio = BytesIO()
    df.to_parquet(bio, index=False, engine='pyarrow', compression='snappy')
    bio.seek(0)

    if not file_name:
        ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
        uid = uuid.uuid4().hex[:8]
        file_name = f"pnad_{layer_name}_{ts}_{uid}.parquet"

    s3_key = f"{layer_name}/{file_name}"
    s3_client.upload_fileobj(bio, s3_bucket, s3_key)
    print(f"‚úÖ Salvo na camada {layer_name}: s3://{s3_bucket}/{s3_key}")

In [5]:
# Cria banco de dados no glue
def create_glue_database(database_name, description=None, location_uri=None):
    try:
        # Verifica se o banco j√° existe
        existing_dbs = glue_client.get_databases()
        db_names = [db['Name'] for db in existing_dbs['DatabaseList']]

        if database_name in db_names:
            print(f"‚ö†Ô∏è O banco de dados '{database_name}' j√° existe no Glue.")
            return

        # Cria o banco
        params = {
            'DatabaseInput': {
                'Name': database_name,
                'Description': description or f'Banco {database_name} criado via script Python.'
            }
        }

        if location_uri:
            params['DatabaseInput']['LocationUri'] = location_uri

        glue_client.create_database(**params)
        print(f"‚úÖ Banco de dados '{database_name}' criado com sucesso!\n")

    except ClientError as e:
        print(f"‚ùå Erro ao criar banco de dados: {e}")

In [33]:
# Fun√ßao para uploard de arquivos via glue crawler
def create_or_update_crawler(crawler_name, role_arn, database_name, s3_target_path):
    try:
        # Verifica se o crawler j√° existe
        existing_crawlers = glue_client.get_crawlers()
        crawler_names = [c['Name'] for c in existing_crawlers['Crawlers']]

        if crawler_name in crawler_names:
            print(f"‚ö†Ô∏è O crawler '{crawler_name}' j√° existe. Atualizando...")
            glue_client.update_crawler(
                Name=crawler_name,
                Role=role_arn,
                DatabaseName=database_name,
                Targets={'S3Targets': [{'Path': s3_target_path}]}
            )
            print(f"‚úÖ Crawler '{crawler_name}' atualizado com sucesso!\n")
        else:
            print(f"üöÄ Criando crawler '{crawler_name}'...")
            glue_client.create_crawler(
                Name=crawler_name,
                Role=role_arn,
                DatabaseName=database_name,
                Description=f"Crawler para detectar tabelas na camada Gold em {s3_target_path}",
                Targets={'S3Targets': [{'Path': s3_target_path}]},
                TablePrefix="",
                SchemaChangePolicy={
                    'UpdateBehavior': 'UPDATE_IN_DATABASE',
                    'DeleteBehavior': 'LOG'
                }
            )
            print(f"‚úÖ Crawler '{crawler_name}' criado com sucesso!\n")

    except ClientError as e:
        print(f"‚ùå Erro ao criar/atualizar crawler: {e}")

In [34]:
# Cria job de execu√ß√£o do crawler
def run_crawler_and_wait(crawler_name):
    try:
        print(f"üöÄ Iniciando execu√ß√£o do crawler '{crawler_name}'...")
        glue_client.start_crawler(Name=crawler_name)

        while True:
            status = glue_client.get_crawler(Name=crawler_name)['Crawler']['State']
            if status == 'READY':
                print(f"‚úÖ Crawler '{crawler_name}' finalizado com sucesso!\n")
                break
            else:
                print("‚è≥ Crawler em execu√ß√£o...")
                time.sleep(180)

    except ClientError as e:
        print(f"‚ùå Erro ao executar o crawler: {e}")

## Configura√ß√£o AWS

In [None]:
# Carregar as credencias do .env
load_dotenv()

# Configura√ß√£o storage_options
storage_options = {
    "key": os.getenv('aws_access_key_id'),
    "secret": os.getenv('aws_secret_access_key'),
    "token": os.getenv('aws_session_token')
}

# S3
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('aws_access_key_id'),
    aws_secret_access_key=os.getenv('aws_secret_access_key'),
    aws_session_token=os.getenv('aws_session_token'),
    region_name=os.getenv('region')
)

#glue
glue_client = boto3.client(
    'glue',
    aws_access_key_id=os.getenv('aws_access_key_id'),
    aws_secret_access_key=os.getenv('aws_secret_access_key'),
    aws_session_token=os.getenv('aws_session_token'),
    region_name=os.getenv('region')
)

# PostgreSQL
usuario_pg = os.getenv("POSTGRES_USER_PNAD")
senha_pg = os.getenv("POSTGRES_PASSWORD_PNAD")
host_pg = os.getenv("POSTGRES_HOST_PNAD")
porta_pg = os.getenv("POSTGRES_PORT_PNAD")
banco_pg = os.getenv("POSTGRES_DB_PNAD")


# Cria os clientes para os servi√ßos Glue e Athena
iam_client = boto3.client('iam', region_name=os.getenv('region'))
#glue_client = boto3.client('glue', region_name=os.getenv('region'))
athena_client = boto3.client('athena', region_name=os.getenv('region'))
role_arn = 'arn:aws:iam::992382762426:role/LabRole'


In [16]:
# ------------------- Configura√ß√µes S3 e camadas -------------------
s3_bucket = 'fiaptechchallengefase3'

# Camadas
s3_raw = 'raw'
s3_bronze = 'bronze'
s3_silver = 'silver'
s3_gold = 'gold'

# Prefixos S3
raw_prefix = f"s3://{s3_bucket}/{s3_raw}/"
bronze_prefix = f"s3://{s3_bucket}/{s3_bronze}/"
silver_prefix = f"s3://{s3_bucket}/{s3_silver}/"
gold_prefix = f"s3://{s3_bucket}/{s3_gold}/"

# Nome arquivos Parquet
nome_arquivo_gold = 'pnad_final.parquet'
caminho_saida_silver = silver_prefix + 'pnad.parquet'
caminho_entrada_silver = silver_prefix + 'pnad.parquet'
caminho_saida_gold = gold_prefix + nome_arquivo_gold
caminho_completo_gold = gold_prefix + nome_arquivo_gold

# ------------------- Configura√ß√µes RDS -------------------
nome_tabela_inicial = "pnad_covid"
nome_tabela_questionario = 'questionario_covid'
nome_tabela_codigo_uf = 'codigo_uf'

# ------------------- Configura√ß√µes de pipeline -------------------
CHUNKSIZE = 100000  # N√∫mero de linhas por chunk para leitura do RDS


# ------------------- Configura√ß√µes Glue e Athena -------------------
database_name = 'db_fiap_challenge_glue'
iam_role_name = 'FiapTechChallengeGlueRole'
crawler_name = 'pnad_covid_crawler'

## Valida√ß√£o de conex√£o

In [38]:
# Valida√ß√£o conex√£o com a AWS atrav√©s do .env
load_dotenv()

try:
    sts_client = boto3.client(
        'sts',
        aws_access_key_id=os.getenv('aws_access_key_id'),
        aws_secret_access_key=os.getenv('aws_secret_access_key'),
        aws_session_token=os.getenv('aws_session_token'),
        region_name=os.getenv('region')
    )
    
    identity = sts_client.get_caller_identity()
    print("‚úÖ Conectado √† conta\n")
    print("UserId:", identity["UserId"])
    print("Account:", identity["Account"])
    print("Arn Completo:", identity["Arn"])


except (BotoCoreError, ClientError) as e:
    print("‚ùå Erro ao conectar √† AWS. Verifique suas credenciais e tente novamente.")
    print("Detalhes do erro:", e)

‚úÖ Conectado √† conta

UserId: AROA6ODU7HW5AICBR3YUC:user4401657=geovanaferreira47@gmail.com
Account: 992382762426
Arn Completo: arn:aws:sts::992382762426:assumed-role/voclabs/user4401657=geovanaferreira47@gmail.com


In [39]:
# Criar engine com banco 
engine = create_engine(f"postgresql+psycopg2://{usuario_pg}:{senha_pg}@{host_pg}:{porta_pg}/{banco_pg}")

# Testar a conex√£o
test_connection(engine)

‚úÖ Conectado com sucesso: PostgreSQL 17.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit
üìÑ Tabelas no banco:
- questionario_covid
- codigo_uf
- pnad_covid


## Leitura de arquivos do git

In [None]:

# URL da API que lista os arquivos da pasta
api_url = "https://api.github.com/repos/geoferreira1/fiap_tech_challenge_fase_3_novo/contents/covid/microdados"

# 1Ô∏è‚É£ Pegar o JSON da pasta
response = requests.get(api_url)
if response.status_code != 200:
    raise Exception(f"Erro ao acessar a API: {response.status_code}")

arquivos_json = response.json()

# 2Ô∏è‚É£ Inicializar lista para armazenar todos os DataFrames
dfs = []
colunas_comuns = None  # para armazenar as colunas que existem em todos os CSVs

# 3Ô∏è‚É£ Iterar sobre cada arquivo listado
for arquivo in arquivos_json:
    if arquivo['name'].endswith('.zip'):
        zip_url = arquivo['download_url']
        print(f"Baixando: {arquivo['name']}")

        # Baixar o arquivo ZIP em mem√≥ria
        r = requests.get(zip_url)
        if r.status_code != 200:
            print(f"‚ùå Erro ao baixar {arquivo['name']}")
            continue
        
        zip_bytes = BytesIO(r.content)
        
        # Abrir ZIP e ler todos os CSVs dentro
        with ZipFile(zip_bytes) as zip_file:
            for csv_name in zip_file.namelist():
                if csv_name.endswith('.csv'):
                    with zip_file.open(csv_name) as f:
                        df = pd.read_csv(f)
                        
                        # Atualizar colunas comuns
                        #if colunas_comuns is None:
                        #    colunas_comuns = set(df.columns)
                        #else:
                        #    colunas_comuns &= set(df.columns)  # interse√ß√£o das colunas
                        
                        dfs.append(df)
                        print(f"‚úÖ Lido {csv_name} do ZIP {arquivo['name']}")

# 4Ô∏è‚É£ Filtrar apenas as colunas comuns antes de concatenar
#dfs_filtrados = [df[list(colunas_comuns)] for df in dfs]

# 5Ô∏è‚É£ Concatenar todos os DataFrames em um √∫nico
df_completo = pd.concat(dfs, ignore_index=True)

# 6Ô∏è‚É£ Mostrar as primeiras linhas
df_completo.head()

# 7Ô∏è‚É£ Filtra os √∫ltimos 3 meses
df_final1 = df_completo[df_completo['V1013'].isin([9, 8, 7])]


Baixando: PNAD_COVID_052020.zip
‚úÖ Lido PNAD_COVID_052020.csv do ZIP PNAD_COVID_052020.zip
Baixando: PNAD_COVID_062020.zip
‚úÖ Lido PNAD_COVID_062020.csv do ZIP PNAD_COVID_062020.zip
Baixando: PNAD_COVID_072020.zip
‚úÖ Lido PNAD_COVID_072020.csv do ZIP PNAD_COVID_072020.zip
Baixando: PNAD_COVID_082020.zip
‚úÖ Lido PNAD_COVID_082020.csv do ZIP PNAD_COVID_082020.zip
Baixando: PNAD_COVID_092020.zip
‚úÖ Lido PNAD_COVID_092020.csv do ZIP PNAD_COVID_092020.zip
Baixando: PNAD_COVID_102020.zip
‚úÖ Lido PNAD_COVID_102020.csv do ZIP PNAD_COVID_102020.zip
Baixando: PNAD_COVID_112020.zip
‚úÖ Lido PNAD_COVID_112020.csv do ZIP PNAD_COVID_112020.zip


In [13]:

# Caminho do Github com dados do c√≥digo IBGE UF
link_codigo_uf = 'https://raw.githubusercontent.com/geoferreira1/fiap_tech_challenge_fase_3/refs/heads/main/covid/codigo_uf.csv'

# Ler arquivos CSV Codigo IBGE e gerar Data Frame
print(f"\nLendo tabela com os c√≥digos do IBGE de: {link_codigo_uf}")
df_uf = pd.read_csv(link_codigo_uf, sep=",")
print("Data Frame criado com sucesso")


Lendo tabela com os c√≥digos do IBGE de: https://raw.githubusercontent.com/geoferreira1/fiap_tech_challenge_fase_3/refs/heads/main/covid/codigo_uf.csv
Data Frame criado com sucesso


## Cria pastas no s3

In [14]:
# Criar Bucket e subpastas

# Lista com camadas no s3
camadas = [s3_raw, s3_bronze, s3_silver, s3_gold]

# Armazenar a regi√£o da conex√£o com a AWS
aws_region = s3_client.meta.region_name

print(f"\nIniciando Processo de Valida√ß√£o e Cria√ß√£o no S3")

print(f"\nValidando o bucket '{s3_bucket}'")
bucket_pronto = False

# Validar se o Bucket esta criado
try:
    s3_client.head_bucket(Bucket=s3_bucket)
    print(f"‚û°Ô∏è  Bucket '{s3_bucket}' j√° existe")
    bucket_pronto = True

except ClientError as e:
    
    if e.response['Error']['Code'] == '404':
        print(f"Bucket '{s3_bucket}' n√£o encontrado. Tentando criar")

        try:
            if aws_region == "us-east-1":
                s3_client.create_bucket(Bucket=s3_bucket)

            else:
                location = {'LocationConstraint': aws_region}
                s3_client.create_bucket(
                    Bucket=s3_bucket,
                    CreateBucketConfiguration=location
                )
            print(f"‚úÖ Bucket '{s3_bucket}' criado com sucesso!")
            bucket_pronto = True

        except Exception as create_e:
            print(f"‚ùå Falha ao tentar criar o bucket: {create_e}")

    else:
        print(f"‚ùå Erro de permiss√£o ou outro problema ao verificar o bucket: {e}")

# Validar se as subpastas est√£o criadas
if bucket_pronto:
    print(f"\nValidando as subpastas no bucket '{s3_bucket}'")

    for nome_pasta in camadas:
        chave_pasta = nome_pasta if nome_pasta.endswith('/') else nome_pasta + '/'
        
        try:
            s3_client.head_object(Bucket=s3_bucket, Key=chave_pasta)
            print(f"‚û°Ô∏è Pasta '{chave_pasta}' j√° existe")

        except ClientError as e:

            if e.response['Error']['Code'] == '404':
                try:
                    s3_client.put_object(
                        Bucket=s3_bucket,
                        Key=chave_pasta,
                        Body=''
                    )
                    print(f"‚úÖ Pasta '{chave_pasta}' criada com sucesso")

                except Exception as create_e:
                    print(f"‚ùå Falha ao TENTAR CRIAR a pasta '{chave_pasta}': {create_e}")

            else:
                print(f"‚ùå Erro ao verificar a pasta '{chave_pasta}': {e}")
else:
    print("\nCria√ß√£o das pastas abortada, pois houve um problema com o bucket")

print("\nProcesso Finalizado")


Iniciando Processo de Valida√ß√£o e Cria√ß√£o no S3

Validando o bucket 'fiaptechchallengefase3'
‚û°Ô∏è  Bucket 'fiaptechchallengefase3' j√° existe

Validando as subpastas no bucket 'fiaptechchallengefase3'
‚û°Ô∏è Pasta 'raw/' j√° existe
‚û°Ô∏è Pasta 'bronze/' j√° existe
‚û°Ô∏è Pasta 'silver/' j√° existe
‚û°Ô∏è Pasta 'gold/' j√° existe

Processo Finalizado


## Insere dados no RDS

In [25]:
# Inserir no RDS (se a tabela j√° existir, substitui)
df_uf.to_sql(
    nome_tabela_codigo_uf,
    con=engine,
    if_exists='replace',  # 'replace' = substitui a tabela se j√° existir
    index=False
)

print(f"‚úÖ DataFrame inserido com sucesso na tabela '{nome_tabela_codigo_uf}' do RDS!")


‚úÖ DataFrame inserido com sucesso na tabela 'codigo_uf' do RDS!


In [39]:
# Inserir no RDS (se a tabela j√° existir, substitui)
df_final.to_sql(
    nome_tabela_inicial,
    con=engine,
    if_exists='replace',  # 'replace' = substitui a tabela se j√° existir
    index=False
)

print(f"‚úÖ DataFrame inserido com sucesso na tabela '{nome_tabela_inicial}' do RDS!")


‚úÖ DataFrame inserido com sucesso na tabela 'pnad_covid' do RDS!


## Importa arquivos do RDS para s3 (Bronze -> Silver -> Gold)

In [28]:
sql = f"SELECT * FROM {nome_tabela_inicial}"
chunk_iter = pd.read_sql_query(text(sql), engine, chunksize=CHUNKSIZE)
for idx, chunk in enumerate(chunk_iter, start=1):
    print(f"üì¶ Upload chunk #{idx} com {len(chunk)} linhas")
    upload_parquet_to_s3(chunk, s3_raw) # Passe a vari√°vel `s3_raw` que cont√©m a string "raw"

üì¶ Upload chunk #1 com 100000 linhas


  ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")


‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012038Z_39a1b874.parquet
üì¶ Upload chunk #2 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012042Z_85d1257f.parquet
üì¶ Upload chunk #3 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012045Z_1ab76f0c.parquet
üì¶ Upload chunk #4 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012048Z_9c824e44.parquet
üì¶ Upload chunk #5 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012052Z_238e9b1c.parquet
üì¶ Upload chunk #6 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012055Z_a330b021.parquet
üì¶ Upload chunk #7 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012057Z_c23f875d.parquet
üì¶ Upload chunk #8 com 100000 linhas
‚úÖ RAW: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012100Z_a0099620.parquet
üì¶ Upload chunk #9 com 100000 linhas
‚úÖ RAW: s3://fi

In [None]:
# --- Execu√ß√£o do Script ---
print("Iniciando a transforma√ß√£o da camada Raw para a camada Bronze...")

# 1. L√™ os dados da camada Raw
df_raw = read_parquet_from_s3(s3_bucket, s3_raw)

if not df_raw.empty:
    print("Dados brutos lidos. Iniciando a limpeza para a camada Bronze...")

    # 2. Realiza a transforma√ß√£o e limpeza (Bronze)
    # Exemplo de limpeza: Remover linhas duplicadas
    df_bronze = df_raw.drop_duplicates()
    print(f"‚úÖ Removidas {len(df_raw) - len(df_bronze)} linhas duplicadas.")

    # Exemplo de tratamento de valores nulos (substituindo NaN por -1)
    df_bronze = df_bronze.fillna(-1)
    print("‚úÖ Valores nulos tratados.")

    # Exemplo de convers√£o de tipos (garantindo que V1008 seja int)
    df_bronze['V1008'] = pd.to_numeric(df_bronze['V1008'], errors='coerce').astype('Int64')
    print("‚úÖ Tipos de dados padronizados.")


    # 3. Salva os dados processados na camada Bronze
    upload_to_s3_layer(df_bronze, s3_bronze)
    print("‚úÖ Processo de transforma√ß√£o de Raw para Bronze conclu√≠do.")
else:
    print("‚ùå Nenhum dado para processar. Verifique a camada Raw.")

Iniciando a transforma√ß√£o da camada Raw para a camada Bronze...
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012038Z_39a1b874.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012042Z_85d1257f.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012045Z_1ab76f0c.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012048Z_9c824e44.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012052Z_238e9b1c.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012055Z_a330b021.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012057Z_c23f875d.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012100Z_a0099620.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012103Z_e59aebab.parquet
Lendo arquivo: s3://fiaptechchallengefase3/raw/pnad_raw_20251004T012106Z_5091ae07.parquet
Lendo arquivo: s3://fiaptechchalle

  return pd.concat(all_dfs, ignore_index=True)


Dados brutos lidos. Iniciando a limpeza para a camada Bronze...
‚úÖ Removidas 0 linhas duplicadas.
‚úÖ Valores nulos tratados.
‚úÖ Tipos de dados padronizados.


  ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")


‚úÖ Salvo na camada bronze: s3://fiaptechchallengefase3/bronze/pnad_bronze_20251004T012157Z_6e1de45b.parquet
‚úÖ Processo de transforma√ß√£o de Raw para Bronze conclu√≠do.


In [15]:
# --- Execu√ß√£o do Script ---
print("Iniciando a transforma√ß√£o da camada Bronze para a camada Silver...")

# 1. L√™ os dados da camada Bronze
df_bronze = read_parquet_from_s3(s3_bucket, s3_bronze)

if not df_bronze.empty:
    print("Dados brutos lidos. Iniciando os filtros para a camada Silver...")

    # 2. Realiza a filtros (Silver)
    # Relacionar os Data Frame
    print("\nIniciando enriquecimento dos dados com merge com o Data Frame dicionadio de uf")
    df_silver = pd.merge(
        df_bronze,
        df_uf,
        how='left',
        left_on='UF', \
        right_on='C√≥digo'
    )

    # Remover colunas n√£o necessarias e ajustar o nome das colunas
    df_silver = df_silver.drop(columns=["C√≥digo"])
    df_silver = df_silver.rename(columns={"UF_x": "UF", "UF_y": "Estado", "Regi√£o": "Regiao"})
    print(f"‚úÖ Enriquecimento feito com sucesso!")

    # Copiar e padronizar colunas para min√∫sculas
    df_silver_normalizado = df_silver.copy()
    df_silver_normalizado.columns = df_silver_normalizado.columns.str.lower()
    print(f"‚úÖ Normaliza√ß√£o das colunas feita com sucesso!")

    # Manter colunas:
    colunas_manter = [
        'ano','v1013','v1012','uf','capital','rm_ride','estado','sigla','regiao','a002','a003','a004','a005',
        'a006','b002','b005','b006','b007','b008','b009b','b009d','b009f','b0011','b0012','b0013','b0014','b0015',
        'b0016','b0017','b0018','b0019','b00110','b00111','b00112','b00113','b0101','b0102','b0103','b0104','b0105','b0106','c007b','f001','b011','d0051', 'c01011',
    ]

    df_silver_normalizado = df_silver_normalizado[colunas_manter]
    print(f"‚úÖ Sele√ß√£o de colunas feito com sucesso!")

    # 3. Salva os dados processados na camada silver
    upload_to_s3_layer(df_silver_normalizado, s3_silver)
    print("‚úÖ Processo de transforma√ß√£o de Bronze para Silver conclu√≠do.")
else:
    print("‚ùå Nenhum dado para processar. Verifique a camada Silver.")

Iniciando a transforma√ß√£o da camada Bronze para a camada Silver...
Lendo arquivo: s3://fiaptechchallengefase3/bronze/pnad_bronze_20251004T012157Z_6e1de45b.parquet
Dados brutos lidos. Iniciando os filtros para a camada Silver...

Iniciando enriquecimento dos dados com merge com o Data Frame dicionadio de uf
‚úÖ Enriquecimento feito com sucesso!
‚úÖ Normaliza√ß√£o das colunas feita com sucesso!
‚úÖ Sele√ß√£o de colunas feito com sucesso!


  ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")


‚úÖ Salvo na camada silver: s3://fiaptechchallengefase3/silver/pnad_silver_20251005T170937Z_88538373.parquet
‚úÖ Processo de transforma√ß√£o de Bronze para Silver conclu√≠do.


In [16]:
# --- Execu√ß√£o do Script ---
print("Iniciando a transforma√ß√£o da camada Silver para a camada Gold...")

# 1. L√™ os dados da camada Silver
df_silver = read_parquet_from_s3(s3_bucket, s3_silver)

if not df_silver.empty:
    print("Iniciando o ETL para a camada Gold...")

    # 2. Faz as tradu√ß√µes de c√≥digos para texto e renomeia as colunas
    
    # Dicion√°rios de mapeamento
    mapeamento_sexo = {1: 'Masculino', 2: 'Feminino'}
    mapeamento_raca_cor = {1: 'Branca', 2: 'Preta', 3: 'Amarela', 4: 'Parda', 5: 'Ind√≠gena'}
    mapeamento_escolaridade = {1: 'Sem instru√ß√£o', 2: 'Ensino Fundamental incompleto', 3: 'Ensino Fundamental completo', 4: 'Ensino M√©dio incompleto', 5: 'Ensino M√©dio completo', 6: 'Ensino Superior incompleto', 7: 'Ensino Superior completo', 8: 'P√≥s gradua√ß√£o, mestrado ou doutorado'}
    mapeamento_tipo_instituicao = {1: 'P√∫blica', 2: 'Privada'}
    mapeamento_sim_nao_ignorado = {1: 'Sim', 2: 'N√£o'}
    mapeamento_teste_resultado = {1: 'Positivo', 2: 'Negativo', 3: 'Inconclusivo', 4: 'Aguardando resultado'}
    mapeamento_trabalha_atualmente = {1: 'Sim, carteira assinada', 2: 'Sim, servidor p√∫blico', 3: 'N√£o'}
    mapeamento_moradia = {1: 'Pr√≥pria', 2: 'Pr√≥pria', 3: 'Aluguel', 4: 'Cedido', 5: 'Cedido', 6: 'Cedido'}
    mapeamento_isolamento = {1: 'N√£o fez restri√ß√£o, levou vida normal como antes da pandemia', 2: 'Reduziu o contato com as pessoas, mas continuou saindo de casa para trabalho ou atividades n√£o essenciais e/ou recebendo visitas', 3: 'Ficou em casa e s√≥ saiu em caso de necessidade b√°sica', 4: 'Ficou rigorosamente em casa'}
    mapeamento_auxilio_social = {1: 'Sim', 2: 'N√£o'}
    mapeamento_faixa_salarial = {0:'0-100', 1: '101 - 300', 2: '301 - 600', 3: '601 - 800', 4: '801 - 1.600', 5: '1.601 - 3.000', 6: '3.001 - 10.000', 7:'Acima de 10.000',8:'Acima de 10.000',9:'Acima de 10.000'}
    print("‚úÖ Processo de mapeamento de dicion√°rios conclu√≠do.")

    # Dicion√°rio de renomea√ß√£o de colunas
    col_mapping = {
        'v1013': 'mes_pesquisa', 'v1012': 'semana_mes',
        'a002': 'idade', 'a003': 'sexo', 'a004': 'cor', 'a005': 'escolaridade', 
        'b002': 'buscou_auxilio_medico', 'b005': 'precisou_de_internacao', 'b006': 'precisou_de_sedacao',
        'b007': 'plano_de_saude', 'b008': 'realizou_teste_covid', 'b009b': 'resultado_teste_swab',
        'b009d': 'resultado_teste_dedo', 'b009f': 'resultado_teste_veia', 'b0011': 'frebre_semana_anterior',
        'b0012': 'tosse_semana_anterior', 'b0013': 'dor_de_garganta_semana_anterior',
        'b0014': 'dificuldade_de_respirar_semana_anterior', 'b0015': 'dor_de_cabeca_semana_anterior',
        'b0016': 'dor_no_peito_semana_anterior', 'b0017': 'nausea_semana_anterior',
        'b0018': 'nariz_constipado_semana_anterior', 'b0019': 'fadiga_semana_anterior',
        'b00110': 'dor_nos_olhos_semana_anterior', 'b00111': 'perda_olfato_paladar_semana_anterior',
        'b00112': 'dor_muscular_semana_anterior', 'b00113': 'diarreia_semana_anterior',
        'b0101': 'diabetes', 'b0102': 'hipertensao', 'b0103': 'doenca_respiratoria',
        'b0104': 'doencas_cardiacas', 'b0105': 'depressao', 'b0106': 'cancer',
        'c007b': 'trabalha_atualmente', 'f001': 'modaria','b011':'isolamento_social','d0051': 'auxilio_emergencial','c01011':'faixa_salarial'
    }
    
    df_gold = df_silver.copy()
    
    # Renomear as colunas
    df_gold.rename(columns=col_mapping, inplace=True)
    print("‚úÖ Processo de renomear as colunas conclu√≠do.")

    # Aplicar as tradu√ß√µes
    df_gold['sexo'] = df_gold['sexo'].map(mapeamento_sexo).fillna('Desconhecido')
    df_gold['cor'] = df_gold['cor'].map(mapeamento_raca_cor).fillna('Desconhecido')
    df_gold['escolaridade'] = df_gold['escolaridade'].map(mapeamento_escolaridade).fillna('Desconhecido')
    df_gold['buscou_auxilio_medico'] = df_gold['buscou_auxilio_medico'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['precisou_de_internacao'] = df_gold['precisou_de_internacao'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['precisou_de_sedacao'] = df_gold['precisou_de_sedacao'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['plano_de_saude'] = df_gold['plano_de_saude'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['realizou_teste_covid'] = df_gold['realizou_teste_covid'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['resultado_teste_swab'] = df_gold['resultado_teste_swab'].map(mapeamento_teste_resultado).fillna('Desconhecido')
    df_gold['resultado_teste_dedo'] = df_gold['resultado_teste_dedo'].map(mapeamento_teste_resultado).fillna('Desconhecido')
    df_gold['resultado_teste_veia'] = df_gold['resultado_teste_veia'].map(mapeamento_teste_resultado).fillna('Desconhecido')
    df_gold['frebre_semana_anterior'] = df_gold['frebre_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['tosse_semana_anterior'] = df_gold['tosse_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['dor_de_garganta_semana_anterior'] = df_gold['dor_de_garganta_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['dificuldade_de_respirar_semana_anterior'] = df_gold['dificuldade_de_respirar_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['dor_de_cabeca_semana_anterior'] = df_gold['dor_de_cabeca_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['dor_no_peito_semana_anterior'] = df_gold['dor_no_peito_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['nausea_semana_anterior'] = df_gold['nausea_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['nariz_constipado_semana_anterior'] = df_gold['nariz_constipado_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['fadiga_semana_anterior'] = df_gold['fadiga_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['dor_nos_olhos_semana_anterior'] = df_gold['dor_nos_olhos_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['perda_olfato_paladar_semana_anterior'] = df_gold['perda_olfato_paladar_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['dor_muscular_semana_anterior'] = df_gold['dor_muscular_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['diarreia_semana_anterior'] = df_gold['diarreia_semana_anterior'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['diabetes'] = df_gold['diabetes'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['hipertensao'] = df_gold['hipertensao'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['doenca_respiratoria'] = df_gold['doenca_respiratoria'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['doencas_cardiacas'] = df_gold['doencas_cardiacas'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['depressao'] = df_gold['depressao'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['cancer'] = df_gold['cancer'].map(mapeamento_sim_nao_ignorado).fillna('Desconhecido')
    df_gold['trabalha_atualmente'] = df_gold['trabalha_atualmente'].map(mapeamento_trabalha_atualmente).fillna('Desconhecido')
    df_gold['isolamento_social'] = df_gold['isolamento_social'].map(mapeamento_isolamento).fillna('Desconhecido')
    df_gold['auxilio_emergencial'] = df_gold['auxilio_emergencial'].map(mapeamento_auxilio_social).fillna('Desconhecido')
    df_gold['faixa_salarial'] = df_gold['faixa_salarial'].map(mapeamento_faixa_salarial).fillna('Desconhecido')
    print("‚úÖ Processo de classificar as colunas conclu√≠do.")

    # 3. Salva os dados processados na camada silver
    upload_to_s3_layer(df_gold, s3_gold)
    print("‚úÖ Processo de transforma√ß√£o de Silver para Gold conclu√≠do.")
else:
    print("‚ùå Nenhum dado para processar. Verifique a camada Silver.")



Iniciando a transforma√ß√£o da camada Silver para a camada Gold...
Lendo arquivo: s3://fiaptechchallengefase3/silver/pnad_silver_20251005T170937Z_88538373.parquet
Iniciando o ETL para a camada Gold...
‚úÖ Processo de mapeamento de dicion√°rios conclu√≠do.
‚úÖ Processo de renomear as colunas conclu√≠do.
‚úÖ Processo de classificar as colunas conclu√≠do.


  ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")


‚úÖ Salvo na camada gold: s3://fiaptechchallengefase3/gold/pnad_gold_20251005T171304Z_e289285d.parquet
‚úÖ Processo de transforma√ß√£o de Silver para Gold conclu√≠do.


## Importa tabela da Gold para banco (An√°lise dos dados por RDS)

In [40]:
# Inserir no RDS (se a tabela j√° existir, substitui)

# 1. L√™ os dados da camada Silver
df_gold = read_parquet_from_s3(s3_bucket, s3_gold)

df_gold.to_sql(
    nome_tabela_questionario,
    con=engine,
    if_exists='replace',  # 'replace' = substitui a tabela se j√° existir
    index=False
)

print(f"‚úÖ DataFrame inserido com sucesso na tabela '{nome_tabela_questionario}' do RDS!")


Lendo arquivo: s3://fiaptechchallengefase3/gold/pnad_gold_20251005T171304Z_e289285d.parquet
‚úÖ DataFrame inserido com sucesso na tabela 'questionario_covid' do RDS!


## Importa tabela da Gold para Glue (An√°lise dos dados por Athena)

In [25]:
db_name = database_name
db_description = "Este √© um banco de dados para o Athena consumir os dados da gold do s3."
create_glue_database(db_name, db_description)

‚úÖ Banco de dados 'db_fiap_challenge_glue' criado com sucesso!



In [36]:
# Cria ou atualiza o crawler
create_or_update_crawler(
    crawler_name=crawler_name,
    role_arn=role_arn,
    database_name=database_name,
    s3_target_path=gold_prefix
)
# Executa o crawler e aguarda finaliza√ß√£o
run_crawler_and_wait(crawler_name)

üöÄ Criando crawler 'pnad_covid_crawler'...
‚úÖ Crawler 'pnad_covid_crawler' criado com sucesso!

üöÄ Iniciando execu√ß√£o do crawler 'pnad_covid_crawler'...
‚è≥ Crawler em execu√ß√£o...
‚úÖ Crawler 'pnad_covid_crawler' finalizado com sucesso!

