<a href="https://colab.research.google.com/github/Dan2912/projeto_fort_tech/blob/main/pepiline_s3_bronze.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##IP-Colab

In [None]:
import requests
print("IP do Colab:", requests.get('https://ifconfig.me').text)


IP do Colab: 35.221.229.245


##Instalação da biblioteca

In [None]:
pip install boto3 psycopg2-binary pandas pyarrow s3fs


Collecting boto3
  Downloading boto3-1.38.41-py3-none-any.whl.metadata (6.6 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting s3fs
  Downloading s3fs-2025.5.1-py3-none-any.whl.metadata (1.9 kB)
Collecting botocore<1.39.0,>=1.38.41 (from boto3)
  Downloading botocore-1.38.41-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.14.0,>=0.13.0 (from boto3)
  Downloading s3transfer-0.13.0-py3-none-any.whl.metadata (1.7 kB)
Collecting aiobotocore<3.0.0,>=2.5.4 (from s3fs)
  Downloading aiobotocore-2.23.0-py3-none-any.whl.metadata (24 kB)
Collecting fsspec==2025.5.1 (from s3fs)
  Downloading fsspec-2025.5.1-py3-none-any.whl.metadata (11 kB)
Collecting aioitertools<1.0.0,>=0.5.1 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading aioitertools-0.12.0-py3-none-any.whl.metadat

##Código (RDS + leitura do S3 parquet)

In [None]:
import pandas as pd
import psycopg2
import boto3
import s3fs
from getpass import getpass
from datetime import date

# === ACESSO AO RDS ===
host = input("Host do RDS: ")
database = input("Nome do banco: ")
user = input("Usuário do RDS: ")
password = getpass("Senha do RDS: ")

# Conectar ao RDS
conn = psycopg2.connect(
    host=host,
    database=database,
    user=user,
    password=password,
    port='5432'
)

# Ler dados da tabela vendas (pode ser tudo ou só os últimos dias)
query = "SELECT * FROM vendas"
df_rds = pd.read_sql(query, conn)
conn.close()

# === ACESSO AO S3 ===
aws_access_key = getpass("AWS Access Key ID: ")
aws_secret_key = getpass("AWS Secret Access Key: ")
bucket = input("Bucket do S3: ").strip()
caminho_parquet = "bronze/base_vendas/carga_inicial/vendas_01.parquet"

# Criar sistema de arquivos S3 com boto3 + s3fs
fs = s3fs.S3FileSystem(
    key=aws_access_key,
    secret=aws_secret_key
)

# Ler parquet direto do S3
df_s3 = pd.read_parquet(f"s3://{bucket}/{caminho_parquet}", filesystem=fs)

print(f"🔎 Registros no RDS: {len(df_rds)}")
print(f"📦 Registros no parquet S3: {len(df_s3)}")


Host do RDS: fort-tech.c6l8w0a8uczb.us-east-1.rds.amazonaws.com
Nome do banco: postgres
Usuário do RDS: postgres
Senha do RDS: ··········


  df_rds = pd.read_sql(query, conn)


AWS Access Key ID: ··········
AWS Secret Access Key: ··········
Bucket do S3: data-lake-eletro-fort
🔎 Registros no RDS: 100
📦 Registros no parquet S3: 50


##🔍 Etapa 3: Comparar os id_venda e filtrar só os novos

In [None]:
# Identificar IDs que já existem no parquet
ids_existentes = set(df_s3['id_venda'])

# Filtrar novos registros do RDS
df_novos = df_rds[~df_rds['id_venda'].isin(ids_existentes)]

print(f"🆕 Novos registros encontrados: {len(df_novos)}")


🆕 Novos registros encontrados: 50


##💾 Etapa 4: Salvar os novos dados no S3

In [None]:
if len(df_novos) > 0:
    hoje = date.today().isoformat()
    novo_arquivo = f"bronze/base_vendas/data={hoje}/vendas_{hoje}.parquet"

    df_novos.to_parquet(
        f"s3://{bucket}/{novo_arquivo}",
        index=False,
        engine='pyarrow',
        filesystem=fs
    )
    print(f"✅ Novos dados salvos em: s3://{bucket}/{novo_arquivo}")
else:
    print("✅ Nenhum novo registro para salvar.")


✅ Novos dados salvos em: s3://data-lake-eletro-fort/bronze/base_vendas/data=2025-06-21/vendas_2025-06-21.parquet
