In [1]:
# importando libs
import pandas as pd
from sqlalchemy import create_engine, text
import psycopg2
import boto3
from io import StringIO
import os
from botocore.exceptions import ClientError
import datetime

In [13]:
df = pd.read_excel("Data\pnad_covid19_202011_saude_BR_GR_UF.xlsx")

In [14]:
df.head(20)

Unnamed: 0.1,Unnamed: 0,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,Unnamed: 10,Unnamed: 11,Unnamed: 12,Unnamed: 13,Unnamed: 14,Unnamed: 15,Unnamed: 16,Unnamed: 17,Unnamed: 18,Unnamed: 19
0,PNAD Covid,,,,,,,,,,,,,,,,,,,
1,Periodicidade da divulgação: Mensal,,,,,,,,,,,,,,,,,,,
2,Nota 1: os indicadores de saúde são para as pe...,,,,,,,,,,,,,,,,,,,
3,Nota 2: os valores totais incluem as pessoas q...,,,,,,,,,,,,,,,,,,,
4,Nota 3: Considera-se que apresentou sintomas c...,,,,,,,,,,,,,,,,,,,
5,Nota 4: Inclusive pessoas de cor amarela ou in...,,,,,,,,,,,,,,,,,,,
6,"Nota 5: O sintoma ""Diarreia"" não foi considera...",,,,,,,,,,,,,,,,,,,
7,Indicador,Nível Territorial,Abertura Territorial,Variável de abertura 1,Categoria de abertura 1,Variável de abertura 2,Categoria de abertura 2,Maio,Junho,Situação Maio para Junho,Julho,Situação Junho para Julho,Agosto,Situação Julho para Agosto,Setembro,Situação Agosto para Setembro,Outubro,Situação Setembro para Outubro,Novembro,Situação Outubro para Novembro
8,,,,,,,,,,,,,,,,,,,,
9,Pessoas que apresentaram algum dos sintoma(s) ...,País,Brasil,Sexo,Total,-,-,24012.308055,15506.473518,Queda,13793.338769,Queda,12135.650022,Queda,9237.127768,Queda,7810.718422,Queda,7956.19569,Estável


In [None]:
# postgreSQL
pg_config = {
  "host": "bd-relacional.cjeco8miq291.us-east-1.rds.amazonaws.com",
  "database":"postgres",
  "user": os.getenv("POSTGRES_USER"),
  "password": os.getenv("POSTGRES_PASS"),
  "port":5432
}


# AWS S3
bucket_name = "323317787666"
s3_prefix = "raw/"

# nome das tabelas
tabelas = ['dim_cliente', 'dim_produto', 'fato_pedidos']

In [None]:
# conexão com o postgress
conn = psycopg2.connect(**pg_config)

In [None]:
# conexão com S3
s3 = boto3.client('s3')
region = s3.meta.region_name or "us-east-1"

In [None]:
# verifica se já existe o bucket criado, se não, realiza a criação
try:
  s3.head_bucket(Bucket=bucket_name)
  print(f"Bucket {bucket_name} já existe")
except ClientError as e:
  error_code = int(e.response['Error']['Code'])
  if error_code == 404:
    print(f"Bucket {bucket_name} não existe, criando...")
    if region == "us-east-1":
      s3.create_bucket(Bucket=bucket_name)
    else:
      s3.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={'LocationConstraint': region}
      )
    print(f"Bucket {bucket_name} criado com sucesso")
  else:
    raise


In [None]:
# exportação das tabelas
for tabela in tabelas:
  print(f"Exportando tabela: {tabela}")
  df = pd.read_sql(f"SELECT * FROM {tabela};", conn)

  # salvar como csv em memória
  csv_buffer = StringIO()
  df.to_csv(csv_buffer, index=False)

  # Enviar para S3
  s3_key = f"{s3_prefix}{tabela}.csv"
  s3.put_object(Bucket=bucket_name, Key=s3_key, Body=csv_buffer.getvalue())
  print(f"{tabela} salva no S3 em: s3://{bucket_name}/{s3_key}")

# fecha conexão 
conn.close
print("Exportação finalizada")

In [None]:
RAW_PREFIX = f's3://{bucket_name}/raw/'
SILVER_PREFIX = f's3://{bucket_name}/silver/' 
GOLD_PREFIX = f's3://{bucket_name}/gold' 

In [None]:
# Camada Silver
clientes = pd.read_csv(RAW_PREFIX + 'dim_cliente.csv')
produto = pd.read_csv(RAW_PREFIX + 'dim_produto.csv')
pedidos = pd.read_csv(RAW_PREFIX + 'fato_pedidos.csv')

In [None]:
# padronizar colunas para lower case
for df in [clientes, produto, pedidos]:
  df.columns = [col.lower() for col in df.columns]

In [None]:
# limpeza básica
clientes = clientes.drop_duplicates().dropna(subset=['id_cliente'])
produto = produto.drop_duplicates().dropna(subset=['id_produto'])
pedidos = pedidos.drop_duplicates().dropna(subset=['id_cliente', 'id_produto'])

In [None]:
# padronização das colunas datas
if 'data_pedido' in pedidos.columns:
  pedidos['data_pedido'] = pd.to_datetime(pedidos['data_pedido'], errors='coerce')

In [None]:
# salvar camada silver em parquet no S3
clientes.to_parquet(SILVER_PREFIX + 'clientes/', index=False)
produto.to_parquet(SILVER_PREFIX + 'produto/', index=False)
pedidos.to_parquet(SILVER_PREFIX + 'pedidos/', index=False)