# Data Ingestion: S3 → Supabase (PostgreSQL)

Este notebook faz parte de um projeto de engenharia de dados que simula um pipeline de ingestão batch
para um e-commerce.

## Objetivo
- Ler dados armazenados no S3 (Supabase Storage)
- Suportar múltiplos formatos (CSV e Parquet)
- Carregar os dados em um banco PostgreSQL (Supabase)
- Garantir um padrão de ingestão reutilizável

## Datasets
- clientes.csv
- produtos.csv
- vendas.csv
- preco_competidores.parquet

Este notebook foca **exclusivamente na ingestão dos dados**.
Transformações e modelagem analítica serão tratadas em etapas posteriores (dbt).


In [None]:
# =========================
# Instalação de dependências (Colab)
# =========================

!pip install boto3 sqlalchemy pyarrow


In [4]:
# =========================
# Setup & Imports
# =========================

import os
import io

import pandas as pd
import boto3
from sqlalchemy import create_engine


In [5]:
# =========================
# Configurações
# =========================

# S3 / Supabase Storage
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")
S3_REGION = os.getenv("S3_REGION", "us-east-1")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

# Banco de Dados (Supabase - PostgreSQL)
DATABASE_URL = os.getenv("DATABASE_URL")


In [6]:
# =========================
# Conexões
# =========================

# Client S3 (Supabase Storage via S3 API)
s3_client = boto3.client(
    "s3",
    endpoint_url=S3_ENDPOINT_URL,
    region_name=S3_REGION,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
)

# Engine PostgreSQL (Supabase)
engine = create_engine(DATABASE_URL)


In [7]:
# =========================
# Funções de Ingestão
# =========================

def read_csv_from_s3(bucket: str, key: str) -> pd.DataFrame:
    """
    Lê um arquivo CSV do S3 e retorna um DataFrame.
    """
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_csv(response["Body"])


def read_parquet_from_s3(bucket: str, key: str) -> pd.DataFrame:
    """
    Lê um arquivo Parquet do S3 e retorna um DataFrame.
    """
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(response["Body"].read()))


def write_to_postgres(df: pd.DataFrame, table_name: str):
    """
    Grava um DataFrame no PostgreSQL (Supabase).
    """
    df.to_sql(
        name=table_name,
        con=engine,
        if_exists="replace",
        index=False
    )


In [8]:
# =========================
# Ingestão dos Datasets
# =========================

datasets = [
    {
        "type": "csv",
        "key": "clientes.csv",
        "table": "clientes"
    },
    {
        "type": "csv",
        "key": "produtos.csv",
        "table": "produtos"
    },
    {
        "type": "csv",
        "key": "vendas.csv",
        "table": "vendas"
    },
    {
        "type": "parquet",
        "key": "preco_competidores.parquet",
        "table": "preco_competidores"
    },
]

for dataset in datasets:
    if dataset["type"] == "csv":
        df = read_csv_from_s3(S3_BUCKET_NAME, dataset["key"])
    else:
        df = read_parquet_from_s3(S3_BUCKET_NAME, dataset["key"])

    write_to_postgres(df, dataset["table"])

    print(f"{dataset['table']} carregada ({df.shape[0]} linhas)")


clientes carregada (50 linhas)
produtos carregada (215 linhas)
vendas carregada (3020 linhas)
preco_competidores carregada (728 linhas)


In [9]:
# =========================
# Validação da Ingestão
# =========================

tables = ["clientes", "produtos", "vendas", "preco_competidores"]

for table in tables:
    df_check = pd.read_sql(f"SELECT COUNT(*) AS total FROM {table}", engine)
    total = df_check.loc[0, "total"]
    print(f"Tabela '{table}': {total} registros")


Tabela 'clientes': 50 registros
Tabela 'produtos': 215 registros
Tabela 'vendas': 3020 registros
Tabela 'preco_competidores': 728 registros
