## Importações do Pipeline Iceberg

Neste bloco importamos todas as dependências necessárias para o pipeline:

- **Pandas** para manipulação de dados  
- **PyIceberg (Schemas e Tipos)** para definição do schema Iceberg  
- **Módulos locais** responsáveis por:
  - Configuração do ambiente Iceberg e S3 (`config_setup`)
  - Conexão com MinIO e catálogo Iceberg (`s3_iceberg_connections`)
  - Leitura e transformação dos dados RAW (`data_processor`)
  - Criação e gerenciamento de tabelas Iceberg (`iceberg_table_manager`)
  - Escrita final no formato Iceberg (`iceberg_writer`)

Abaixo estão todos os imports necessários para a execução completa do pipeline.


In [1]:
import pandas as pd
from pyiceberg.types import StringType, LongType, DoubleType, TimestampType, NestedField

from config_setup import configure_iceberg
from s3_iceberg_connections import setup_connections
from data_processor import read_raw_csv_from_s3, apply_data_cleaning_and_typing
from iceberg_table_manager import create_iceberg_schema, manage_table_lifecycle
from iceberg_writer import write_dataframe_to_iceberg

## Configuração do Pipeline

Nesta etapa:

- Definimos os **buckets** de origem (`RAW_BUCKET`) e destino (`ICEBERG_BUCKET`)  
- Definimos prefixo do CSV (`CSV_PREFIX`), banco (`DB_NAME`) e tabela (`TABLE_NAME`)  
- Criamos o **schema da tabela** (`pedido_fields`) para o Iceberg  
- Configuramos o **catálogo Iceberg** (`catalog_properties`) e as conexões S3/Hive (`s3_client`, `catalog`)  
- Garantimos que a tabela exista ou seja criada no bucket de destino (`ICEBERG_BUCKET`)  

Após essa etapa, o ambiente está pronto para ler dados da RAW e escrever no Iceberg.


In [2]:
RAW_BUCKET = "raw"
ICEBERG_BUCKET = "pyiceberg"

CSV_PREFIX = "dataset.csv"
DB_NAME = "sales"
TABLE_NAME = "pedidos"

In [3]:
pedido_fields = [
    NestedField(1, "ID_Pedido", StringType(), required=False),
    NestedField(2, "Data_Pedido", TimestampType(), required=False),
    NestedField(3, "ID_Cliente", StringType(), required=False),
    NestedField(4, "Segmento", StringType(), required=False),
    NestedField(5, "Regiao", StringType(), required=False),
    NestedField(6, "Pais", StringType(), required=False),
    NestedField(7, "Product_ID", StringType(), required=False),
    NestedField(8, "Categoria", StringType(), required=False),
    NestedField(9, "SubCategoria", StringType(), required=False),
    NestedField(10, "Total_Vendas", DoubleType(), required=False),
    NestedField(11, "Quantidade", LongType(), required=False),
    NestedField(12, "Desconto", DoubleType(), required=False),
    NestedField(13, "Lucro", DoubleType(), required=False),
    NestedField(14, "Prioridade", StringType(), required=False),
]

In [4]:
catalog_properties = configure_iceberg()
s3_client, catalog = setup_connections(catalog_properties)
print("Conexões e ambiente configurados.")

Conexões e ambiente configurados.


In [5]:
iceberg_schema = create_iceberg_schema(pedido_fields)

table = manage_table_lifecycle(
    catalog,
    DB_NAME,
    TABLE_NAME,
    ICEBERG_BUCKET,    
    iceberg_schema
)

Tabela ('sales', 'pedidos') antiga deletada.


Unable to resolve region for bucket pyiceberg


Tabela criada com o novo schema em: <bound method Table.location of pedidos(
  1: ID_Pedido: optional string,
  2: Data_Pedido: optional timestamp,
  3: ID_Cliente: optional string,
  4: Segmento: optional string,
  5: Regiao: optional string,
  6: Pais: optional string,
  7: Product_ID: optional string,
  8: Categoria: optional string,
  9: SubCategoria: optional string,
  10: Total_Vendas: optional double,
  11: Quantidade: optional long,
  12: Desconto: optional double,
  13: Lucro: optional double,
  14: Prioridade: optional string
),
partition by: [],
sort order: [],
snapshot: null>


## Leitura e Processamento dos Dados

Nesta etapa:

- Lemos os dados brutos do bucket `RAW_BUCKET` usando a função `read_raw_csv_from_s3`  
- Aplicamos a função de limpeza e conversão de tipos `clean_and_cast_pedido_data` usando `apply_data_cleaning_and_typing`  
- Corrigimos nomes de colunas, formatos numéricos e datas para compatibilidade com o schema Iceberg  
- O DataFrame final `df_final` está pronto para ser escrito no Iceberg


In [9]:
def clean_and_cast_pedido_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df.rename(columns={'Product ID': 'Product_ID'})
    
    cols_numericas = ["Total_Vendas", "Desconto", "Lucro"]
    for col in cols_numericas:
        df[col] = df[col].astype(str).str.replace(",", ".", regex=False).astype(float)

    df["Quantidade"] = (
        df["Quantidade"]
        .astype(str).str.replace(",", ".", regex=False)
        .astype(float).astype('int64')
    )
    
    df["Data_Pedido"] = (
        pd.to_datetime(df["Data_Pedido"], format="%d-%m-%Y")
        .astype('datetime64[us]')
    )
    
    return df

In [7]:
df_raw = read_raw_csv_from_s3(s3_client, RAW_BUCKET, CSV_PREFIX)
df_final = apply_data_cleaning_and_typing(df_raw, clean_and_cast_pedido_data)
print(f"DataFrame processado. Linhas: {len(df_final)}")

DataFrame processado. Linhas: 51290


In [8]:
write_dataframe_to_iceberg(table, df_final, catalog_properties)

Dados escritos e commitados com sucesso no Iceberg!
