In [1]:
import duckdb  # Importa o DuckDB para manipulação de dados e execução de SQL
import os  # Importa o módulo os para interagir com variáveis de ambiente do sistema
from dotenv import load_dotenv, find_dotenv  # Importa funções para carregar variáveis de ambiente de um arquivo .env

from deltalake import DeltaTable, write_deltalake
# Carrega as variáveis de ambiente definidas no arquivo .env
load_dotenv(find_dotenv())

# Define as credenciais de acesso ao MinIO a partir das variáveis de ambiente
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_MINIO")  # Chave de acesso do MinIO
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY_MINIO")  # Chave secreta do MinIO
HOST_MINIO = os.getenv("HOST_MINIO")  # Host do MinIO

# Conecta ao DuckDB, criando uma instância de conexão
con = duckdb.connect()

# Cria uma secret no DuckDB para acessar o MinIO com as credenciais fornecidas
con.execute(f"""
    CREATE SECRET my_minio_secret (
        TYPE 'S3',
        KEY_ID '{AWS_ACCESS_KEY}',
        SECRET '{AWS_SECRET_KEY}',
        REGION 'us-east-1',
        ENDPOINT '{HOST_MINIO}:9000',
        URL_STYLE 'path',
        USE_SSL false
    );
""")

# Define o caminho de destino no MinIO para os arquivos Parquet
path_minio_landing = 's3://landing/comex'
path_minio_bronze = 's3://bronze/comex/uf_mun'

In [2]:
storage_options = {
    "AWS_ENDPOINT_URL": f"http://{HOST_MINIO}:9000",
    "AWS_REGION": "us-east-1",
    "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY,
    "AWS_SECRET_ACCESS_KEY": AWS_SECRET_KEY,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    "AWS_ALLOW_HTTP": "true"
}

In [3]:
con.sql(f"""
        SELECT 
        *
         from '{path_minio_landing}/UF_MUN.parquet';        
        """)

┌────────────┬──────────────────────────┬──────────────────────────┬─────────┐
│ CO_MUN_GEO │          NO_MUN          │        NO_MUN_MIN        │  SG_UF  │
│   int64    │         varchar          │         varchar          │ varchar │
├────────────┼──────────────────────────┼──────────────────────────┼─────────┤
│    1501758 │ BREJO GRANDE DO ARAGUAIA │ Brejo Grande do Araguaia │ PA      │
│    1501782 │ BREU BRANCO              │ Breu Branco              │ PA      │
│    1501808 │ BREVES                   │ Breves                   │ PA      │
│    1501907 │ BUJARU                   │ Bujaru                   │ PA      │
│    1501956 │ CACHOEIRA DO PIRIA       │ Cachoeira do Piriá       │ PA      │
│    1502004 │ CACHOEIRA DO ARARI       │ Cachoeira do Arari       │ PA      │
│    1502103 │ CAMETA                   │ Cametá                   │ PA      │
│    1502152 │ CANAA DOS CARAJAS        │ Canaã dos Carajás        │ PA      │
│    1502202 │ CAPANEMA                 │ Capanema  

In [4]:
df = con.sql(f"""
        SELECT 
        CAST( CO_MUN_GEO as int) AS cod_municipio,
        NO_MUN_MIN as nome_municipio,
        SG_UF as sigla_estado
        from '{path_minio_landing}/UF_MUN.parquet';        

        """).to_arrow_table()

In [5]:
df

pyarrow.Table
cod_municipio: int32
nome_municipio: string
sigla_estado: string
----
cod_municipio: [[1501758,1501782,1501808,1501907,1501956,...,4314209,4314308,4314407,4314423,4314456]]
nome_municipio: [["Brejo Grande do Araguaia","Breu Branco","Breves","Bujaru","Cachoeira do Piriá",...,"Pedro Osório","Pejuçara","Pelotas","Picada Café","Pinhal"]]
sigla_estado: [["PA","PA","PA","PA","PA",...,"RS","RS","RS","RS","RS"]]

In [6]:
write_deltalake(
        f'{path_minio_bronze}',
        df,
        storage_options=storage_options,
        
    )

FileExistsError: Delta table already exists, write mode set to error.

In [7]:
table_path = f'{path_minio_bronze}'

# Conecte à tabela Delta existente
table = DeltaTable(table_path, storage_options=storage_options)



In [8]:
table.merge(
    source=df,
    predicate='target.cod_municipio = source.cod_municipio',
    source_alias="source",
    target_alias="target",
).when_matched_update(
    {
        "nome_municipio": "source.nome_municipio",
        "sigla_estado": "source.sigla_estado"
    }
).when_not_matched_insert_all().execute()


{'num_source_rows': 5570,
 'num_target_rows_inserted': 2386,
 'num_target_rows_updated': 3184,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5570,
 'num_target_files_added': 1,
 'num_target_files_removed': 1,
 'execution_time_ms': 613,
 'scan_time_ms': 0,
 'rewrite_time_ms': 585}

In [9]:
con.sql(f"""
        SELECT *
FROM delta_scan('{path_minio_bronze}')
        """)

┌───────────────┬────────────────────────────────┬──────────────┐
│ cod_municipio │         nome_municipio         │ sigla_estado │
│     int32     │            varchar             │   varchar    │
├───────────────┼────────────────────────────────┼──────────────┤
│       1502855 │ Curuá                          │ PA           │
│       1505304 │ Oriximiná                      │ PA           │
│       1505437 │ Ourilândia do Norte            │ PA           │
│       2110609 │ São Bernardo                   │ MA           │
│       2110708 │ São Domingos do Maranhão       │ MA           │
│       2111029 │ São João do Carú               │ MA           │
│       2111409 │ São Luís Gonzaga do Maranhão   │ MA           │
│       3162955 │ São José da Lapa               │ MG           │
│       3163003 │ São José da Safira             │ MG           │
│       3164431 │ São Sebastião da Vargem Alegre │ MG           │
│          ·    │    ·                           │ ·            │
│         

In [10]:
con.close()