In [188]:
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType
from datetime import datetime
import polars as pl
import boto3
from io import BytesIO

# -------------------
# catálogo
# -------------------
catalog = load_catalog(
    "dev",
    uri="http://rest:8181",

    **{
        "s3.endpoint": "http://minio:9000",
        "s3.access-key-id": "minioadmin",
        "s3.secret-access-key": "minioadmin",
        "s3.path-style-access": "true",
    }
)

# -------------------
# s3 / minio
# -------------------
s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
)


In [183]:
# Helper para criação de tabelas
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.transforms import IdentityTransform
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField
from pyiceberg.io.pyarrow import pyarrow_to_schema


def build_partition_spec(schema, partition_cols):
    # nenhum particionamento
    if not partition_cols:
        return PartitionSpec()

    fields = []

    for i, col in enumerate(partition_cols, start=1000):
        field = schema.find_field(col)

        if field is None:
            raise ValueError(f"Coluna '{col}' não existe no schema")

        fields.append(
            PartitionSpec.Field(
                source_id=field.field_id,
                field_id=i,
                transform=IdentityTransform(),
                name=col,
            )
        )

    return PartitionSpec(*fields)

def update_schema(table, arrow_schema):
    """
    Atualiza o schema da tabela Iceberg se houver colunas novas no DataFrame.
    
    Parameters
    ----------
    table : Iceberg table
        Tabela a ser verificada/atualizada
    arrow_schema : pyarrow.Schema
        Schema do DataFrame que será inserido
        
    Returns
    -------
    bool
        True se o schema foi atualizado, False caso contrário
    """ 
    # Atualiza o schema adicionando as novas colunas
    with table.update_schema() as update:
        update.union_by_name(arrow_schema)
    
    print(f"✔ schema atualizado")
    return True


def set_table(
    catalog,
    table_name: str,
    df,
    partition_cols=None,
    mode="append",
):
    """
    Cria ou carrega uma tabela Iceberg e escreve dados nela.

    Parameters
    ----------
    catalog : Iceberg catalog
    table_name : str
    df : polars.DataFrame
    partition_cols : list[str] | None
    mode : 'append' | 'overwrite'
    """

    # garante nomes limpos
    df = df.rename(lambda c: c.strip())

    arrow_table = df.to_arrow()
    arrow_schema = arrow_table.schema

    # -------------------
    # load or create
    # -------------------
    try:
        table = catalog.load_table(table_name)
        update_schema(table, arrow_schema)
        print(f"✔ tabela '{table_name}' carregada")

    except Exception as e:
        if "NoSuchTable" not in str(type(e)):
            raise  # erro real → não cria tabela
    
        print(f"⚙ criando tabela '{table_name}'")
    
        partition_spec = build_partition_spec(arrow_schema, partition_cols)
    
        table = catalog.create_table(
            identifier=table_name,
            schema=arrow_schema,
            partition_spec=partition_spec,
            properties={
                "write.format.default": "parquet",
                "write.parquet.compression-codec": "zstd",
                "schema.name-mapping.default": "true",
            },
        )
    # -------------------
    # write
    # -------------------
    if mode == "overwrite":
        table.overwrite(arrow_table)
    elif mode == "append":
        table.append(arrow_table)
    elif mode in ("reset"):
        print("⚠ full reset da tabela")
        table.delete("true")   # remove todos datafiles ativos
        table.append(arrow_table)

    print(f"✔ dados gravados ({mode})")

    return table

In [185]:
## RAW


bucket = "raw"
key = "carros_astro/ENTRADAS E SAÍDAS ASTRO AUTOMÓVEIS.xlsx"

response = s3.get_object(Bucket=bucket, Key=key)
buffer = BytesIO(response["Body"].read())

# -------------------
# leitura excel
# -------------------
df = pl.read_excel(buffer, has_header=False)

# transforma tudo em string (RAW tolerante)
df = df.with_columns(pl.all().cast(pl.Utf8))

# adiciona metadata de ingestão
df = df.with_columns(
    pl.lit(key).alias("_source_file"),
    pl.lit(datetime.utcnow().isoformat()).alias("_ingestion_time"),
)

arrow_table = df.to_arrow()
set_table(catalog, "bronze.raw_excel", df, mode="reset")


print("✔ RAW ingestão concluída")


Could not determine dtype for column 9, falling back to string


✔ schema atualizado
✔ tabela 'bronze.raw_excel' carregada
⚠ full reset da tabela
✔ dados gravados (reset)
✔ RAW ingestão concluída


In [189]:
table = catalog.load_table("bronze.raw_excel")

df = pl.from_arrow(table.scan().to_arrow())
anos = df["column_4"].str.to_integer(strict=False).unique().to_list()
first_ano = [a for a in anos if a is not None][0]-1

df = df.with_columns(
    ano_raw = (
        pl.col("column_4")
        .str.to_integer(strict=False)
        .forward_fill().fill_null(first_ano)
    ),
    is_label = (
        pl.col("column_4").is_not_null() &
        pl.col("column_1").is_null()
    )
)

df = df.with_columns(
    mes_raw = pl.when(
        pl.col("is_label")
    ).then(pl.col('column_4')).otherwise(None)
    .forward_fill()
)

column_labels = list(df.row(0))
columns = df.columns
mapping = { col: column_labels[idx].strip()
           for idx,col in enumerate(columns)
           if col.startswith('column') 
           and column_labels[idx] is not None}

def cast_date(column_name):
    col = pl.col(column_name)

    serial = (
        pl.date(1899, 12, 30) +
        pl.duration(
            days=col.cast(pl.Float64, strict=False)
            .cast(pl.Int64, strict=False)
        )
    )

    text_date = col.str.strptime(
        pl.Date,
        "%d/%m/%Y",
        strict=False
    )

    return (
        pl.coalesce([serial, text_date])
        .alias(column_name)
    )

def cast_currency(col_name):
    return (
        pl.col(col_name).cast(pl.Float32).alias(col_name)
    )

df = df.rename(mapping)
df = df.drop_nulls(subset=["column_1"])

df = df.with_columns(
    cast_date("DATA SAÍDA"),
    cast_date("DATA ENTRADA"),
    pl.col('column_1').cast(pl.Int64).alias("sequencia"),
    pl.col('VALOR ENTRADA').cast(pl.Float32, strict=False).alias("vl_entrada"),
    pl.col('VALOR VENDA').cast(pl.Float32, strict=False).alias("vl_saida"),
)
df = df.drop(["column_10", 'column_1'])

df

df.filter(pl.col("PLACA") == "ODF-1H32")
df.columns


['DATA ENTRADA',
 'NOTA COMPRA',
 'VEÍCULO',
 'PLACA',
 'VALOR ENTRADA',
 'NOTA SAÍDA',
 'DATA SAÍDA',
 'VALOR VENDA',
 '_source_file',
 '_ingestion_time',
 'ano_raw',
 'is_label',
 'mes_raw',
 'sequencia',
 'vl_entrada',
 'vl_saida']

In [187]:
## Save to silver

silver_df = df.select([
    "sequencia",
    "VEÍCULO",
    "PLACA",
    "DATA ENTRADA",
    "DATA SAÍDA",
    'NOTA SAÍDA',
    'NOTA COMPRA',
    "vl_entrada",
    "vl_saida",
    "ano_raw",
    "mes_raw",
    "_source_file",
    "_ingestion_time",
])

set_table(
    catalog,
    "silver.carros_astro",
    silver_df,
    mode="reset",
)


✔ schema atualizado
✔ tabela 'silver.carros_astro' carregada
⚠ full reset da tabela
✔ dados gravados (reset)


carros_astro(
  1: sequencia: optional long,
  2: VEÍCULO: optional string,
  3: PLACA: optional string,
  4: DATA ENTRADA: optional date,
  5: DATA SAÍDA: optional date,
  6: vl_entrada: optional float,
  7: vl_saida: optional float,
  8: ano_raw: optional long,
  9: mes_raw: optional string,
  10: _source_file: optional string,
  11: _ingestion_time: optional string,
  12: NOTA SAÍDA: optional string,
  13: NOTA COMPRA: optional string
),
partition by: [],
sort order: [],
snapshot: Operation.APPEND: id=6070894803919287373, parent_id=2979075849402979294, schema_id=1

In [192]:
tbl = catalog.load_table("silver.nubank_clean")

df = pl.from_arrow(tbl.scan().to_arrow())
print(df.head(100))


shape: (100, 4)
┌────────────────┬──────────┬─────────────────────────────────┬─────────────────────────────────┐
│ data_transacao ┆ valor    ┆ descricao                       ┆ source_file                     │
│ ---            ┆ ---      ┆ ---                             ┆ ---                             │
│ date           ┆ f64      ┆ str                             ┆ str                             │
╞════════════════╪══════════╪═════════════════════════════════╪═════════════════════════════════╡
│ 2025-10-02     ┆ 79.79    ┆ Portal da Bahia                 ┆ s3a://raw/nubank/Nubank_2025-1… │
│ 2025-10-02     ┆ 29.9     ┆ Pizza Hut Shopping Par          ┆ s3a://raw/nubank/Nubank_2025-1… │
│ 2025-10-02     ┆ 6.99     ┆ Ponto Cultural                  ┆ s3a://raw/nubank/Nubank_2025-1… │
│ 2025-10-01     ┆ 111.78   ┆ Hotel At Booking.Com            ┆ s3a://raw/nubank/Nubank_2025-1… │
│ 2025-10-01     ┆ 12.9     ┆ Kfc Shop Paralela Ba            ┆ s3a://raw/nubank/Nubank_2025-1… │
│ … 