## Imports necessários

In [259]:
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
import duckdb
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo


## Extensão

In [260]:
cn=duckdb.connect()
cn.sql("""from duckdb_extensions() """)

┌──────────────────┬─────────┬───────────┬─────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────┬───────────────────┬───────────────────┬───────────────────┬────────────────┐
│  extension_name  │ loaded  │ installed │                                  install_path                                   │                                    description                                     │      aliases      │ extension_version │   install_mode    │ installed_from │
│     varchar      │ boolean │  boolean  │                                     varchar                                     │                                      varchar                                       │     varchar[]     │      varchar      │      varchar      │    varchar     │
├──────────────────┼─────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────┼───────────────

In [261]:
def duckdb_connection() -> duckdb.DuckDBPyConnection:    
    con = duckdb.connect()
    con.install_extension("httpfs")
    con.load_extension("httpfs")
    return con

In [262]:
def create_secret(cn):
    """Cria um segredo no DuckDB para acessar o MinIO."""
    try:
        cn.sql("""CREATE SECRET secret_minio  (
                TYPE S3,
                KEY_ID 'cursolab',
                SECRET 'cursolab',
                REGION 'us-east-1',
                ENDPOINT 'minio:9000',
                URL_STYLE 'path',
                USE_SSL false
                );"""
            )
        print("Secret criado com sucesso!")
    except Exception as e:
        print(f"Erro ao criar o segredo: {e}")



In [263]:
def get_path_bronze(source_base_path: str, process_date: datetime) -> str:   

    source_bucket = "bronze"   
    source_base_path = "topics/postgres.inventory.products"

    # Formata o caminho da partição
    partition_path = process_date.strftime("year=%Y/month=%m/day=%d/hour=%H")

    # Monta a chave S3
    path =  f"s3://{source_bucket}/{source_base_path}/{partition_path}/*"

    print(f"get_path_bronze: {path}")

    return path

In [264]:
def set_path_silver(source_base_path: str, process_date: datetime) -> str: 

    sink_bucket = "silver"   
    # Formata o caminho da partição
    partition_path = process_date.strftime("year=%Y/month=%m/day=%d/hour=%H")

    # Monta a chave S3
    path=  f"s3://{sink_bucket}/{source_base_path}/{partition_path}"

    print(f"set_path_silver: {path}")

    return path

In [316]:
def load_bronze(con, source_path) -> duckdb.DuckDBPyRelation: 
            
   con.execute(f"CREATE OR REPLACE TABLE products_bronze \
                      AS FROM read_json_auto('{source_path}', ignore_errors=true)")

    
   return con.table("products_bronze")

In [348]:
def read_delta(path: str) -> DeltaTable:     
    
     storage_options = {
            'AWS_ACCESS_KEY_ID': 'cursolab',
            'AWS_SECRET_ACCESS_KEY': 'cursolab',
            'AWS_ENDPOINT_URL': 'http://minio:9000',
            'AWS_ALLOW_HTTP': 'true',
            'aws_conditional_put': 'etag'
     }

     exists= DeltaTable.is_deltatable(path, storage_options)

     if exists:
     
         dt=DeltaTable(
            table_uri=path,
            storage_options=storage_options   
         )

         return dt
     else:
        return None
    

In [318]:
def clear_bronze(con, bronze_dataset) -> duckdb.DuckDBPyRelation:
    query = f"""        
        SELECT 
            data.payload.after.id::BIGINT AS id,
            data.payload.after.name::VARCHAR AS name,
            data.payload.after.description::VARCHAR AS description,
            data.payload.after.weight::DOUBLE AS weight,
            time,
            hour,
            month,
            year
        FROM '{bronze_dataset}'        
    """
    con.execute(f"CREATE OR REPLACE TABLE bronze_clean AS FROM ({query})")   
    return con.table("bronze_clean")

In [391]:
def write_delta_silver(df, path_silver: str):    

    if df is None:
       print("Nenhum dado foi carregado.")
       return None
    
    storage_options = {
            'AWS_ACCESS_KEY_ID': 'cursolab',
            'AWS_SECRET_ACCESS_KEY': 'cursolab',
            'AWS_ENDPOINT_URL': 'http://minio:9000',
            'AWS_ALLOW_HTTP': 'true',
            'aws_conditional_put': 'etag'
      }

    dt = read_delta(path_silver)

    if dt is None:
        write_deltalake(
        path_silver,
        df,
        mode="append",             
        storage_options= storage_options
        )
    else :
        dt.merge(source=df,
                 predicate='target.id=source.id',
                 source_alias='source',
                 target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
        print(dt.to_pyarrow_table().to_pydict())
        print(df)
    print("Write deltalake com sucesso")

In [387]:
def init_duckdb() -> duckdb.DuckDBPyConnection:  
    con = duckdb_connection()
    create_secret(con)
    return con

In [388]:
def ingest_silver(con: duckdb.DuckDBPyConnection, process_date: datetime):   
    source_base_path = "topics/postgres.inventory.products"

    path_bronze = get_path_bronze(source_base_path, process_date)
    path_silver = set_path_silver(source_base_path, process_date)
    
    bronze = load_bronze(con, path_bronze)    
    bronze_clear= clear_bronze(con, bronze.alias)
    write_delta_silver(bronze_clear.to_df(),path_silver)
    

In [389]:
def main():

    con =init_duckdb()
    
    """Gera o caminho do arquivo no formato 'year=YYYY/month=MM/day=DD/hour=HH' para MinIO."""
    # Define o fuso horário de São Paulo
    saopaulo_tz = ZoneInfo("America/Sao_Paulo")

    now = datetime.now(saopaulo_tz)
    # Calcula a data de processamento (1 hora antes)
    process_date = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=0)
    
    ingest_silver(con, process_date)

    con.close()


In [390]:
main()

Secret criado com sucesso!
get_path_bronze: s3://bronze/topics/postgres.inventory.products/year=2025/month=03/day=11/hour=09/*
set_path_silver: s3://silver/topics/postgres.inventory.products/year=2025/month=03/day=11/hour=09
{'id': [159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159, 15

In [375]:
main()

Secret criado com sucesso!
get_path_bronze: s3://bronze/topics/postgres.inventory.products/year=2025/month=03/day=11/hour=09/*
set_path_silver: s3://silver/topics/postgres.inventory.products/year=2025/month=03/day=11/hour=09
{'id': [170, 171, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 159, 159, 159, 159, 159, 159, 159, 159, 159, 159], 'name': ['Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'Lapis', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado', 'O melhor lapis alterado'], 'description': ['O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor', 'O melhor'