## testa conexão

In [1]:
import json
import logging
from collections import Counter
import boto3
from resources.utils.utils import load_config, print_header


class IngestionValidation:
    config = load_config()
    _minio_endpoint_url = config["minio_dev"]["endpoint_url"]
    _minio_access_key = config["minio_dev"]["access_key"]
    _minio_secret_key = config["minio_dev"]["secret_key"]
    _storage_brew_bucket = config["storages"]["brew_bucket"]["name"]
    _storage_brew_landing = config["storages"]["brew_landing_key"]["lading"]
    
    _expected_schema = {
        "id": str,
        "name": str,
        "brewery_type": str,
        "address_1": (str, type(None)),
        "address_2": (str, type(None)),
        "address_3": (str, type(None)),
        "city": str,
        "state_province": str,
        "postal_code": str,
        "country": str,
        "longitude": (str, type(None)),
        "latitude": (str, type(None)),
        "phone": (str, type(None)),
        "website_url": (str, type(None)),
        "state": str,
        "street": (str, type(None))
    }



    def __init__(self):
        # Inicializa o cliente S3 (ou MinIO) com as credenciais e endpoint
        self.s3_client = boto3.client(
            's3',
            endpoint_url=self._minio_endpoint_url,
            aws_access_key_id=self._minio_access_key,
            aws_secret_access_key=self._minio_secret_key
        )

    def _list_files_in_bronze(self):
        """Lista todos os arquivos no bucket S3 na pasta bronze."""
        try:
            response = self.s3_client.list_objects_v2(
                Bucket=self._storage_brew_bucket,
                Prefix=self._storage_brew_landing
            )
            files = [content['Key'] for content in response.get('Contents', [])]
            # print("Arquivos encontrados:", files)
            return files
        except Exception as e:
            logging.error(f"Erro ao listar arquivos no bucket: {e}")
            return []

    def _read_s3_file(self, key):
        """Lê o conteúdo de um arquivo JSON do S3."""
        try:
            response = self.s3_client.get_object(Bucket=self._storage_brew_bucket, Key=key)
            content = response['Body'].read().decode('utf-8')
            return json.loads(content)
        except Exception as e:
            logging.error(f"Erro ao ler o arquivo {key}: {e}")
            return None


    def execute_test(self):
        msg = "INICIANDO VALIDAÇÃO DE CONEXÂO COM MINIO"
        print_header(msg)
        files = self._list_files_in_bronze()
        print(f"\nArquivos encontrados\n: {files}")
        for file in files:
            data = self._read_s3_file(file)
            print(f"\nArquivo {file} conteúdo")



if __name__ == "__main__":
    ingestion_validation = IngestionValidation()
    ingestion_validation.execute_test()

************************************************************
*                                                          *
*     >>> INICIANDO VALIDAÇÃO DE CONEXÂO COM MINIO <<<     *
*                                                          *
************************************************************

Arquivos encontrados
: ['bronze/PART_1_AT_2024_11_08.json', 'bronze/PART_2_AT_2024_11_08.json', 'bronze/PART_3_AT_2024_11_08.json', 'bronze/PART_4_AT_2024_11_08.json', 'bronze/PART_5_AT_2024_11_08.json', 'bronze/PART_6_AT_2024_11_08.json', 'bronze/PART_7_AT_2024_11_08.json', 'bronze/PART_8_AT_2024_11_08.json', 'bronze/PART_9_AT_2024_11_08.json']

Arquivo bronze/PART_1_AT_2024_11_08.json conteúdo

Arquivo bronze/PART_2_AT_2024_11_08.json conteúdo

Arquivo bronze/PART_3_AT_2024_11_08.json conteúdo

Arquivo bronze/PART_4_AT_2024_11_08.json conteúdo

Arquivo bronze/PART_5_AT_2024_11_08.json conteúdo

Arquivo bronze/PART_6_AT_2024_11_08.json conteúdo

Arquivo bronze/PART_7_AT_2024_11_08.jso

## Duck direto no minio



In [1]:
import duckdb
from resources.utils.utils import load_config, print_header

def create_duckdb_connection(
    minio_endpoint: str, minio_access_key: str, minio_secret_key: str
) -> duckdb.DuckDBPyConnection:
    """
    Estabelece uma conexão com o DuckDB e configura para usar as credenciais do MinIO.

    Esta função conecta ao DuckDB, instala e carrega as extensões HTTPFS e Delta, e cria um segredo S3 para configurar o
    acesso ao MinIO.

    Args:
        minio_endpoint (str): Endpoint do MinIO.
        minio_access_key (str): Chave de acesso do MinIO.
        minio_secret_key (str): Chave secreta do MinIO.

    Returns:
        duckdb.DuckDBPyConnection: Objeto de conexão do DuckDB se a conexão for bem-sucedida; caso contrário, None.

    Raises:
        Exception: Se houver um erro ao conectar ao DuckDB ou configurar o MinIO.
    """

    try:
        print("Conectando ao DuckDB")
        conn = duckdb.connect(database=':memory:')
        
        # Instala e carrega as extensões necessárias
        print("Instalando e carregando extensões HTTPFS e Delta")
        conn.execute("INSTALL httpfs;")
        conn.execute("LOAD httpfs;")
        conn.execute("INSTALL delta;")
        conn.execute("LOAD delta;")
        
        # Criação do segredo S3 para acessar o MinIO
        print("Criando e configurando o segredo S3")
        conn.execute(f"""
            CREATE SECRET delta_s3 (
                TYPE S3,
                KEY_ID '{minio_access_key}',  -- Chave de acesso do MinIO
                SECRET '{minio_secret_key}',  -- Chave secreta do MinIO
                REGION '',  -- Região vazia para MinIO
                ENDPOINT '{minio_endpoint}',  -- Endpoint do MinIO
                URL_STYLE 'path',  -- Acesso via estilo de caminho
                USE_SSL 'false'  -- Sem SSL para MinIO local
            );
        """)
        
        print("Conexão com DuckDB estabelecida e segredo configurado")
        return conn
    except Exception as e:
        print(f"Erro ao conectar ao DuckDB: {e}")
        return None


def load_silver_data(conn: duckdb.DuckDBPyConnection, silver_path: str):
    """
    Carrega os dados da camada Silver no DuckDB a partir do MinIO.

    Args:
        conn (duckdb.DuckDBPyConnection): Conexão DuckDB.
        silver_path (str): Caminho da camada Silver no MinIO.

    Returns:
        None
    """

    try:
        print(f"Tentando carregar dados do caminho {silver_path}...")
        conn.execute(f"CREATE TABLE silver_data AS SELECT * FROM delta_scan('{silver_path}')")
        print("Tabela 'silver_data' criada e dados carregados com sucesso.")
        
        # Consultas de exemplo
        total_records = conn.execute("SELECT COUNT(*) AS total_records FROM silver_data;").fetchdf()
        print("\nTotal de registros:", total_records)

        distinct_countries = conn.execute("SELECT DISTINCT country FROM silver_data;").fetchdf()
        print("\nPaíses distintos:", distinct_countries)

    except Exception as e:
        print(f"Erro ao carregar dados do MinIO com DuckDB: {e}")



config = load_config()

# Configurações do MinIO
minio_endpoint = config['minio_dev']['endpoint_url_duckdb']
minio_access_key = config['minio_dev']['access_key']
minio_secret_key = config["minio_dev"]["secret_key"]
minio_silver_path = config["storages"]["brew_paths_duckdb"]["silver"]

# Conectar ao DuckDB e carregar dados
conn = create_duckdb_connection(minio_endpoint, minio_access_key, minio_secret_key)
if conn:
    load_silver_data(conn, minio_silver_path)
    conn.close()


Conectando ao DuckDB
Instalando e carregando extensões HTTPFS e Delta
Criando e configurando o segredo S3
Conexão com DuckDB estabelecida e segredo configurado
Tentando carregar dados do caminho s3://lakehouse/silver/...
Tabela 'silver_data' criada e dados carregados com sucesso.

Total de registros:    total_records
0           8355

Países distintos:           country
0          france
1         england
2          poland
3        portugal
4        scotland
5     south korea
6          sweden
7     isle of man
8   united states
9         austria
10      singapore
11        germany
12        ireland


In [2]:
import duckdb
from resources.utils.utils import load_config, print_header

class DuckDBMinIOConnector:
    def __init__(self):
        """
        Inicializa a conexão DuckDB com MinIO e carrega as configurações de conexão e caminhos.
        """
        # Carregar as configurações de MinIO e os caminhos do pipeline
        config = load_config()
        
        self.minio_endpoint = config['minio_dev']['endpoint_url_duckdb']
        self.minio_access_key = config['minio_dev']['access_key']
        self.minio_secret_key = config["minio_dev"]["secret_key"]
        self.brew_paths = config["storages"]["brew_paths_duckdb"]

        # Configurar a conexão
        self.conn = self._create_duckdb_connection()
    
    def _create_duckdb_connection(self) -> duckdb.DuckDBPyConnection:
        """
        Estabelece uma conexão com o DuckDB e configura o acesso ao MinIO.

        Returns:
            duckdb.DuckDBPyConnection: Objeto de conexão DuckDB.
        """
        try:
            print("Conectando ao DuckDB")
            conn = duckdb.connect(database=':memory:')
            
            # Instala e carrega as extensões necessárias
            print("Instalando e carregando extensões HTTPFS e Delta")
            conn.execute("INSTALL httpfs;")
            conn.execute("LOAD httpfs;")
            conn.execute("INSTALL delta;")
            conn.execute("LOAD delta;")
            
            # Criação do segredo S3 para acessar o MinIO
            print("Criando e configurando o segredo S3")
            conn.execute(f"""
                CREATE SECRET delta_s3 (
                    TYPE S3,
                    KEY_ID '{self.minio_access_key}',
                    SECRET '{self.minio_secret_key}',
                    REGION '',
                    ENDPOINT '{self.minio_endpoint}',
                    URL_STYLE 'path',
                    USE_SSL 'false'
                );
            """)
            
            print("Conexão com DuckDB estabelecida e segredo configurado")
            return conn
        except Exception as e:
            print(f"Erro ao conectar ao DuckDB: {e}")
            return None

    def execute_query(self, layer: str, query: str):
        """
        Executa uma consulta SQL na camada especificada (Bronze, Silver ou Gold).

        Args:
            layer (str): A camada dos dados (bronze, silver ou gold).
            query (str): A consulta SQL a ser executada.

        Returns:
            DataFrame: Resultado da consulta como DataFrame do DuckDB.
        """
        # Verifica o caminho da camada
        layer_path = self.brew_paths.get(layer)
        if not layer_path:
            print(f"Erro: Camada '{layer}' não encontrada nas configurações.")
            return None

        # Constrói a consulta para acessar os dados da camada específica
        try:
            print(f"Tentando carregar dados do caminho {layer_path} e executar a query...")
            self.conn.execute(f"CREATE TABLE {layer}_data AS SELECT * FROM delta_scan('{layer_path}')")
            result = self.conn.execute(query).fetchdf()
            print(f"Consulta executada com sucesso na camada {layer}.")
            return result
        except Exception as e:
            print(f"Erro ao carregar dados e executar consulta: {e}")
            return None
    
    def close_connection(self):
        """Fecha a conexão com o DuckDB."""
        if self.conn:
            self.conn.close()
            print("Conexão com o DuckDB fechada.")


# Exemplo de uso
connector = DuckDBMinIOConnector()
result = connector.execute_query("silver", "SELECT * FROM silver_data LIMIT 10;")
print(result)
connector.close_connection()


Conectando ao DuckDB
Instalando e carregando extensões HTTPFS e Delta
Criando e configurando o segredo S3
Conexão com DuckDB estabelecida e segredo configurado
Tentando carregar dados do caminho s3://lakehouse/silver/ e executar a query...
Consulta executada com sucesso na camada silver.
                                     id                              name  \
0  0a366f8e-fd0d-4fe6-8ee5-054d8b725c8c  Wieselburger Brauerei (Heineken)   
1  3fe1de22-c30c-446d-b2f2-913ca9f6378e                        Wimitzbr�u   
2  def4c1dd-dfdb-42ff-a03a-d2a00c9f3d7f        Zipfer Brauerei (Heineken)   
3  f51b3f3d-be14-40da-914a-3b5c83f75bbf     Villacher Brauerei (Heineken)   
4  867eee9e-2138-4e62-97a9-b25e7fd1a1ff  Klosterbrauerei Stift Engelszell   
5  b04c9621-bb87-439d-a499-94cb432bde90                         Cafe Luis   
6  9355bc7c-bdc7-40c3-8932-e840c996ed86                         Caf� Okei   
7  16fb2c6e-c7e1-4d46-b9d0-1c86816eb129                 Die Bierbotschaft   
8  fcd9312c-7531-4

In [7]:
import duckdb
from resources.utils.utils import load_config

class DuckDBMinIOConnector:
    def __init__(self):
        """
        Inicializa a conexão DuckDB com MinIO e carrega as configurações de conexão e caminhos.
        """
        # Carregar as configurações de MinIO e os caminhos do pipeline
        config = load_config()
        
        self.minio_endpoint = config['minio_dev']['endpoint_url_duckdb']
        self.minio_access_key = config['minio_dev']['access_key']
        self.minio_secret_key = config["minio_dev"]["secret_key"]
        self.brew_paths = config["storages"]["brew_paths_duckdb"]

        # Configurar a conexão
        self.conn = self._create_duckdb_connection()
    
    def _create_duckdb_connection(self) -> duckdb.DuckDBPyConnection:
        """
        Estabelece uma conexão com o DuckDB e configura o acesso ao MinIO.

        Returns:
            duckdb.DuckDBPyConnection: Objeto de conexão DuckDB.
        """
        try:
            print("Conectando ao DuckDB")
            conn = duckdb.connect(database=':memory:')
            
            # Instala e carrega as extensões necessárias
            print("Instalando e carregando extensões HTTPFS e Delta")
            conn.execute("INSTALL httpfs;")
            conn.execute("LOAD httpfs;")
            conn.execute("INSTALL delta;")
            conn.execute("LOAD delta;")
            
            # Criação do segredo S3 para acessar o MinIO
            print("Criando e configurando o segredo S3")
            conn.execute(f"""
                CREATE SECRET delta_s3 (
                    TYPE S3,
                    KEY_ID '{self.minio_access_key}',
                    SECRET '{self.minio_secret_key}',
                    REGION '',
                    ENDPOINT '{self.minio_endpoint}',
                    URL_STYLE 'path',
                    USE_SSL 'false'
                );
            """)
            
            print("Conexão com DuckDB estabelecida e segredo configurado")
            return conn
        except Exception as e:
            print(f"Erro ao conectar ao DuckDB: {e}")
            return None

    def execute_query(self, layer: str, file_format: str, query: str):
        """
        Executa uma consulta SQL na camada especificada (Bronze, Silver ou Gold) com o formato de arquivo adequado.

        Args:
            layer (str): A camada dos dados (bronze, silver ou gold).
            file_format (str): O formato dos arquivos na camada ('json', 'delta', etc.).
            query (str): A consulta SQL a ser executada.

        Returns:
            DataFrame: Resultado da consulta como DataFrame do DuckDB.
        """
        # Verifica o caminho da camada
        layer_path = self.brew_paths.get(layer)
        if not layer_path:
            print(f"Erro: Camada '{layer}' não encontrada nas configurações.")
            return None

        # Constrói a consulta para acessar os dados com base no formato do arquivo
        try:
            print(f"Tentando carregar dados do caminho {layer_path} no formato {file_format} e executar a query...")
            if file_format.lower() == 'json':
                self.conn.execute(f"CREATE TABLE {layer}_data AS SELECT * FROM read_json_auto('{layer_path}*.json')")
            elif file_format.lower() == 'delta':
                self.conn.execute(f"CREATE TABLE {layer}_data AS SELECT * FROM delta_scan('{layer_path}')")
            else:
                print(f"Formato de arquivo '{file_format}' não suportado para a camada {layer}.")
                return None

            # Executa a query solicitada
            result = self.conn.execute(query).fetchdf()
            print(f"Consulta executada com sucesso na camada {layer}.")
            return result
        except Exception as e:
            print(f"Erro ao carregar dados e executar consulta: {e}")
            return None
    
    def close_connection(self):
        """Fecha a conexão com o DuckDB."""
        if self.conn:
            self.conn.close()
            print("Conexão com o DuckDB fechada.")


# Exemplo de uso
connector = DuckDBMinIOConnector()
result = connector.execute_query("silver", "delta", "SELECT * FROM bronze_data LIMIT 10;")
print(result)
connector.close_connection()


Conectando ao DuckDB
Instalando e carregando extensões HTTPFS e Delta
Criando e configurando o segredo S3
Conexão com DuckDB estabelecida e segredo configurado
Tentando carregar dados do caminho s3://lakehouse/silver/ no formato delta e executar a query...
Erro ao carregar dados e executar consulta: Catalog Error: Table with name bronze_data does not exist!
Did you mean "silver_data"?
LINE 1: SELECT * FROM bronze_data LIMIT 10;
                      ^
None
Conexão com o DuckDB fechada.


In [28]:
import duckdb
from resources.utils.utils import load_config

class DuckDBMinIOConnector:
    def __init__(self):
        """
        Inicializa a conexão DuckDB com MinIO e carrega as configurações de conexão e caminhos.
        """
        # Carregar as configurações de MinIO e os caminhos do pipeline
        config = load_config()
        
        self.minio_endpoint = config['minio_dev']['endpoint_url_duckdb']
        self.minio_access_key = config['minio_dev']['access_key']
        self.minio_secret_key = config["minio_dev"]["secret_key"]
        self.brew_paths = config["storages"]["brew_paths_duckdb"]

        # Configurar a conexão
        self.conn = self._create_duckdb_connection()
    
    def _create_duckdb_connection(self) -> duckdb.DuckDBPyConnection:
        """
        Estabelece uma conexão com o DuckDB e configura o acesso ao MinIO.

        Returns:
            duckdb.DuckDBPyConnection: Objeto de conexão DuckDB.
        """
        try:
            print("Conectando ao DuckDB")
            conn = duckdb.connect(database=':memory:')
            
            # Instala e carrega as extensões necessárias
            print("Instalando e carregando extensões HTTPFS e Delta")
            conn.execute("INSTALL httpfs;")
            conn.execute("LOAD httpfs;")
            conn.execute("INSTALL delta;")
            conn.execute("LOAD delta;")
            
            # Criação do segredo S3 para acessar o MinIO
            print("Criando e configurando o segredo S3")
            conn.execute(f"""
                CREATE SECRET delta_s3 (
                    TYPE S3,
                    KEY_ID '{self.minio_access_key}',
                    SECRET '{self.minio_secret_key}',
                    REGION '',
                    ENDPOINT '{self.minio_endpoint}',
                    URL_STYLE 'path',
                    USE_SSL 'false'
                );
            """)
            
            print("Conexão com DuckDB estabelecida e segredo configurado")
            return conn
        except Exception as e:
            print(f"Erro ao conectar ao DuckDB: {e}")
            return None

    def execute_query(self, layer: str, file_format: str, query: str):
        """
        Executa uma consulta SQL na camada especificada (Bronze, Silver ou Gold) com o formato de arquivo adequado.

        Args:
            layer (str): A camada dos dados (bronze, silver ou gold).
            file_format (str): O formato dos arquivos na camada ('json', 'delta', etc.).
            query (str): A consulta SQL a ser executada.

        Returns:
            DataFrame: Resultado da consulta como DataFrame do DuckDB.
        """
        # Verifica o caminho da camada
        layer_path = self.brew_paths.get(layer)
        if not layer_path:
            print(f"Erro: Camada '{layer}' não encontrada nas configurações.")
            return None

        # Define o nome da tabela a partir da camada
        table_name = f"{layer}_data"

        # Constrói a consulta para acessar os dados com base no formato do arquivo
        try:
            print(f"Tentando carregar dados do caminho {layer_path} no formato {file_format} e executar a query...")
            if file_format.lower() == 'json':
                self.conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_json_auto('{layer_path}*.json')")
            elif file_format.lower() == 'delta':
                self.conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM delta_scan('{layer_path}')")
            else:
                print(f"Formato de arquivo '{file_format}' não suportado para a camada {layer}.")
                return None

            # Substitui o nome da tabela correta na consulta
            query_with_table = query.replace("{{table}}", table_name)
            result = self.conn.execute(query_with_table).fetchdf()
            print(f"Consulta executada com sucesso na camada {layer}.")
            return result
        except Exception as e:
            print(f"Erro ao carregar dados e executar consulta: {e}")
            return None
    
    def close_connection(self):
        """Fecha a conexão com o DuckDB."""
        if self.conn:
            self.conn.close()
            print("Conexão com o DuckDB fechada.")


# Exemplo de uso

query= """
SELECT * FROM {{table}}
ORDER BY breweries_count desc
"""


connector = DuckDBMinIOConnector()
result = connector.execute_query("gold", "delta", query)
display_result(result)
connector.close_connection()


Conectando ao DuckDB
Instalando e carregando extensões HTTPFS e Delta
Criando e configurando o segredo S3
Conexão com DuckDB estabelecida e segredo configurado
Tentando carregar dados do caminho s3://lakehouse/gold/ no formato delta e executar a query...
Consulta executada com sucesso na camada gold.
+------+--------------+---------------+----------------------+----------------------------+-----------------+---------------------------+
|      | brewery_type |    country    |        state         |            city            | breweries_count |     last_updated_gold     |
+------+--------------+---------------+----------------------+----------------------------+-----------------+---------------------------+
|  0   |    micro     | united states |       colorado       |           denver           |       58        | 2024-11-08 08:05:55-03:00 |
|  1   |    micro     | united states |      california      |         san diego          |       45        | 2024-11-08 08:05:55-03:00 |
|  2   |

In [16]:
from IPython.display import display

In [27]:

from tabulate import tabulate

def display_result(result):
    print(tabulate(result, headers='keys', tablefmt='pretty'))

In [25]:
def display_result(result):
    print(result.to_markdown())

## Abordagem -- Pega files boto via boto e le com Duck

In [1]:
import boto3
import duckdb
import pandas as pd
import json
from resources.utils.utils import load_config, print_header

# Carregar configurações
config = load_config()
_minio_endpoint_url = config["minio_dev"]["endpoint_url"]
_minio_access_key = config["minio_dev"]["access_key"]
_minio_secret_key = config["minio_dev"]["secret_key"]
_storage_brew_bucket = config["storages"]["brew_bucket"]["name"]
_storage_brew_landing = config["storages"]["brew_landing_key"]["lading"]

# Mensagem de início
print_header("INICIANDO TESTE DE CONEXÃO COM MINIO E DUCKDB")

# Configuração do MinIO (ou S3)
s3_client = boto3.client(
    's3',
    endpoint_url=_minio_endpoint_url,
    aws_access_key_id=_minio_access_key,
    aws_secret_access_key=_minio_secret_key
)

# Lista de arquivos JSON no MinIO (S3)
response = s3_client.list_objects_v2(Bucket=_storage_brew_bucket, Prefix=_storage_brew_landing)
files = [content['Key'] for content in response.get('Contents', [])]
print("Arquivos encontrados:", files)

# Conexão com DuckDB
conn = duckdb.connect(database=':memory:')

# Função para carregar dados JSON do S3 para DuckDB
def load_json_to_duckdb(file_key):
    try:
        # Baixar o arquivo JSON do MinIO
        response = s3_client.get_object(Bucket=_storage_brew_bucket, Key=file_key)
        content = response['Body'].read().decode('utf-8')
        
        # Carregar o conteúdo JSON como lista de dicionários
        data = json.loads(content)  # Garante que o conteúdo seja lido como uma lista de registros JSON
        df = pd.DataFrame(data)  # Converter para DataFrame do pandas
        
        # Inserir o DataFrame no DuckDB
        conn.execute("CREATE TABLE IF NOT EXISTS bronze_data AS SELECT * FROM df LIMIT 0")  # Estrutura da tabela
        conn.execute("INSERT INTO bronze_data SELECT * FROM df")
        
    except Exception as e:
        print(f"Erro ao carregar {file_key} para o DuckDB: {e}")

# Carregar todos os arquivos JSON encontrados no MinIO para o DuckDB
for file in files:
    print(f"Carregando {file} para o DuckDB...")
    load_json_to_duckdb(file)

# Consultas de exemplo no DuckDB
print("\nExemplo de consultas no DuckDB:")
print(conn.execute("SELECT * FROM bronze_data LIMIT 10;").fetchdf())  # Seleciona os primeiros 10 registros
print(conn.execute("SELECT COUNT(*) as total_records FROM bronze_data;").fetchdf())  # Conta o total de registros
print(conn.execute("SELECT DISTINCT country FROM bronze_data;").fetchdf())  # Lista de países distintos


************************************************************
*                                                          *
*  >>> INICIANDO TESTE DE CONEXÃO COM MINIO E DUCKDB <<<   *
*                                                          *
************************************************************
Arquivos encontrados: ['bronze/PART_1_AT_2024_11_07.json', 'bronze/PART_2_AT_2024_11_07.json', 'bronze/PART_3_AT_2024_11_07.json', 'bronze/PART_4_AT_2024_11_07.json', 'bronze/PART_5_AT_2024_11_07.json', 'bronze/PART_6_AT_2024_11_07.json', 'bronze/PART_7_AT_2024_11_07.json', 'bronze/PART_8_AT_2024_11_07.json', 'bronze/PART_9_AT_2024_11_07.json']
Carregando bronze/PART_1_AT_2024_11_07.json para o DuckDB...
Carregando bronze/PART_2_AT_2024_11_07.json para o DuckDB...
Carregando bronze/PART_3_AT_2024_11_07.json para o DuckDB...
Carregando bronze/PART_4_AT_2024_11_07.json para o DuckDB...
Carregando bronze/PART_5_AT_2024_11_07.json para o DuckDB...
Carregando bronze/PART_6_AT_2024_11_07.json

In [17]:
import os
import boto3
import duckdb

# Configurações do MinIO
minio_endpoint = "http://10.96.188.1:9000"
minio_access_key = "miniouser"
minio_secret_key = "miniosecret"
brew_bucket = "lakehouse"
brew_prefix = "bronze/"

# Configuração do cliente MinIO com boto3
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key
)

# Conexão com DuckDB em memória
conn = duckdb.connect(database=':memory:')
table_name = "bronze_data"

# Função para baixar e carregar JSON no DuckDB
def load_json_to_duckdb(file_key):
    """Baixa o arquivo JSON do MinIO e carrega no DuckDB."""
    local_file_path = file_key.split('/')[-1]

    # Baixa o arquivo JSON do MinIO
    s3_client.download_file(brew_bucket, file_key, local_file_path)
    print(f"Arquivo {file_key} baixado com sucesso para {local_file_path}")

    try:
        # Insere os dados JSON na tabela DuckDB
        conn.execute(f"INSERT INTO {table_name} SELECT * FROM read_json_auto('{local_file_path}')")
        print(f"Dados do arquivo '{local_file_path}' carregados com sucesso no DuckDB.")

    except Exception as e:
        print(f"Erro ao carregar dados no DuckDB: \n{e}")

    finally:
        # Exclui o arquivo temporário
        os.remove(local_file_path)
        print(f"Arquivo temporário {local_file_path} removido.")

# Cria a tabela no DuckDB com base na estrutura do primeiro arquivo
response = s3_client.list_objects_v2(Bucket=brew_bucket, Prefix=brew_prefix)
files = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.json')]

# Carrega o primeiro arquivo para definir a estrutura da tabela
first_file = files[0]
local_file_path = first_file.split('/')[-1]
s3_client.download_file(brew_bucket, first_file, local_file_path)
conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_json_auto('{local_file_path}')")
os.remove(local_file_path)
print(f"Tabela '{table_name}' criada com sucesso no DuckDB.")

# Carrega o restante dos arquivos na tabela
for file_key in files[1:]:  # Ignora o primeiro arquivo que já foi carregado
    load_json_to_duckdb(file_key)

# Consultas de exemplo no DuckDB
try:
    # Contagem total de registros
    total_records = conn.execute(f"SELECT COUNT(*) as total_records FROM {table_name};").fetchdf()
    print("\nTotal de registros:", total_records)

    # Listar países distintos
    distinct_countries = conn.execute(f"SELECT DISTINCT country FROM {table_name};").fetchdf()
    print("\nPaíses distintos:")
    print(distinct_countries)

except Exception as e:
    print(f"Erro ao executar as consultas: \n{e}")

finally:
    conn.close()


Tabela 'bronze_data' criada com sucesso no DuckDB.
Arquivo bronze/PART_2_AT_2024_11_07.json baixado com sucesso para PART_2_AT_2024_11_07.json
Dados do arquivo 'PART_2_AT_2024_11_07.json' carregados com sucesso no DuckDB.
Arquivo temporário PART_2_AT_2024_11_07.json removido.
Arquivo bronze/PART_3_AT_2024_11_07.json baixado com sucesso para PART_3_AT_2024_11_07.json
Dados do arquivo 'PART_3_AT_2024_11_07.json' carregados com sucesso no DuckDB.
Arquivo temporário PART_3_AT_2024_11_07.json removido.
Arquivo bronze/PART_4_AT_2024_11_07.json baixado com sucesso para PART_4_AT_2024_11_07.json
Dados do arquivo 'PART_4_AT_2024_11_07.json' carregados com sucesso no DuckDB.
Arquivo temporário PART_4_AT_2024_11_07.json removido.
Arquivo bronze/PART_5_AT_2024_11_07.json baixado com sucesso para PART_5_AT_2024_11_07.json
Dados do arquivo 'PART_5_AT_2024_11_07.json' carregados com sucesso no DuckDB.
Arquivo temporário PART_5_AT_2024_11_07.json removido.
Arquivo bronze/PART_6_AT_2024_11_07.json baix

## parquet rodando torto silver

In [36]:
import os
import boto3
import duckdb

# Configurações do MinIO
minio_endpoint = "http://10.96.188.1:9000"
minio_access_key = "miniouser"
minio_secret_key = "miniosecret"
brew_bucket = "lakehouse"
brew_prefix = "silver/"  # Mude para "bronze/" se precisar

# Configuração do cliente MinIO com boto3
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key
)

# Conexão com DuckDB em memória e carregamento dos módulos necessários
conn = duckdb.connect(database=':memory:')
conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")
conn.execute("INSTALL parquet;")  # Certifique-se de que o módulo Parquet está instalado
conn.execute("LOAD parquet;")
table_name = "silver_data"

# Função para baixar arquivos Parquet e carregar no DuckDB
def load_parquet_to_duckdb(file_key):
    """Baixa o arquivo Parquet do MinIO e carrega no DuckDB."""
    local_file_path = file_key.split('/')[-1]

    # Baixa o arquivo Parquet do MinIO
    s3_client.download_file(brew_bucket, file_key, local_file_path)
    print(f"Arquivo {file_key} baixado com sucesso para {local_file_path}")

    try:
        # Insere os dados do Parquet na tabela DuckDB
        conn.execute(f"INSERT INTO {table_name} SELECT * FROM read_parquet('{local_file_path}')")
        print(f"Dados do arquivo '{local_file_path}' carregados com sucesso no DuckDB.")

    except Exception as e:
        print(f"Erro ao carregar dados no DuckDB: \n{e}")

    finally:
        # Exclui o arquivo temporário
        os.remove(local_file_path)
        print(f"Arquivo temporário {local_file_path} removido.")

# Lista e baixa todos os arquivos Parquet no bucket especificado
response = s3_client.list_objects_v2(Bucket=brew_bucket, Prefix=brew_prefix)
files = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.parquet')]

# Inicializa a tabela no DuckDB com o primeiro arquivo para definir a estrutura
first_file = files[0]
local_file_path = first_file.split('/')[-1]
s3_client.download_file(brew_bucket, first_file, local_file_path)
conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{local_file_path}')")
os.remove(local_file_path)
print(f"Tabela '{table_name}' criada com sucesso no DuckDB.")

# Carrega o restante dos arquivos na tabela
for file_key in files[1:]:  # Ignora o primeiro arquivo que já foi carregado
    load_parquet_to_duckdb(file_key)

# Consultas de exemplo no DuckDB
try:
    # Contagem total de registros
    total_records = conn.execute(f"SELECT COUNT(*) as total_records FROM {table_name};").fetchdf()
    print("\nTotal de registros:", total_records)

    # Listar países distintos
    distinct_countries = conn.execute(f"SELECT DISTINCT country FROM {table_name};").fetchdf()
    print("\nPaíses distintos:")
    print(distinct_countries)

except Exception as e:
    print(f"Erro ao executar as consultas: \n{e}")

finally:
    conn.close()


Tabela 'silver_data' criada com sucesso no DuckDB.
Arquivo silver/country=england/part-00000-f9be67f8-6a4b-4ab6-acbb-cded77a703da.c000.snappy.parquet baixado com sucesso para part-00000-f9be67f8-6a4b-4ab6-acbb-cded77a703da.c000.snappy.parquet
Dados do arquivo 'part-00000-f9be67f8-6a4b-4ab6-acbb-cded77a703da.c000.snappy.parquet' carregados com sucesso no DuckDB.
Arquivo temporário part-00000-f9be67f8-6a4b-4ab6-acbb-cded77a703da.c000.snappy.parquet removido.
Arquivo silver/country=france/part-00000-cc7c7abe-7004-4d5d-9402-006c49d578f3.c000.snappy.parquet baixado com sucesso para part-00000-cc7c7abe-7004-4d5d-9402-006c49d578f3.c000.snappy.parquet
Dados do arquivo 'part-00000-cc7c7abe-7004-4d5d-9402-006c49d578f3.c000.snappy.parquet' carregados com sucesso no DuckDB.
Arquivo temporário part-00000-cc7c7abe-7004-4d5d-9402-006c49d578f3.c000.snappy.parquet removido.
Arquivo silver/country=germany/part-00000-ef230031-25a4-46bf-8d6e-01240ee45dab.c000.snappy.parquet baixado com sucesso para part-0

## delta nao rodando


In [35]:
import os
import boto3
import duckdb

# Configurações do MinIO
minio_endpoint = "http://10.96.188.1:9000"
minio_access_key = "miniouser"
minio_secret_key = "miniosecret"
brew_bucket = "lakehouse"
brew_prefix = "silver/"  # Mude para "bronze/" se precisar

# Configuração do cliente MinIO com boto3
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key
)

# Conexão com DuckDB em memória e carregamento dos módulos necessários
conn = duckdb.connect(database=':memory:')
conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")
conn.execute("INSTALL delta;")
conn.execute("LOAD delta;")
table_name = "silver_data"

# Função para baixar arquivos Delta e carregar no DuckDB
def load_delta_to_duckdb(file_key):
    """Baixa o arquivo Delta do MinIO e carrega no DuckDB."""
    local_file_path = file_key.split('/')[-1]

    # Baixa o arquivo Delta do MinIO
    s3_client.download_file(brew_bucket, file_key, local_file_path)
    print(f"Arquivo {file_key} baixado com sucesso para {local_file_path}")

    try:
        # Insere os dados do Delta na tabela DuckDB
        conn.execute(f"INSERT INTO {table_name} SELECT * FROM read_delta('{local_file_path}')")
        print(f"Dados do arquivo '{local_file_path}' carregados com sucesso no DuckDB.")

    except Exception as e:
        print(f"Erro ao carregar dados no DuckDB: \n{e}")

    finally:
        # Exclui o arquivo temporário
        os.remove(local_file_path)
        print(f"Arquivo temporário {local_file_path} removido.")

# Lista e baixa todos os arquivos Delta
response = s3_client.list_objects_v2(Bucket=brew_bucket, Prefix=brew_prefix)
files = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.parquet')]

# Inicializa a tabela no DuckDB com o primeiro arquivo para definir a estrutura
first_file = files[0]
local_file_path = first_file.split('/')[-1]
s3_client.download_file(brew_bucket, first_file, local_file_path)
conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_delta('{local_file_path}')")
os.remove(local_file_path)
print(f"Tabela '{table_name}' criada com sucesso no DuckDB.")

# Carrega o restante dos arquivos na tabela
for file_key in files[1:]:  # Ignora o primeiro arquivo que já foi carregado
    load_delta_to_duckdb(file_key)

# Consultas de exemplo no DuckDB
try:
    # Contagem total de registros
    total_records = conn.execute(f"SELECT COUNT(*) as total_records FROM {table_name};").fetchdf()
    print("\nTotal de registros:", total_records)

    # Listar países distintos
    distinct_countries = conn.execute(f"SELECT DISTINCT country FROM {table_name};").fetchdf()
    print("\nPaíses distintos:")
    print(distinct_countries)

except Exception as e:
    print(f"Erro ao executar as consultas: \n{e}")

finally:
    conn.close()


CatalogException: Catalog Error: Table Function with name read_delta does not exist!
Did you mean "read_parquet"?
LINE 1: ...ATE TABLE silver_data AS SELECT * FROM read_delta('part-00000-deeb568d-3d56-4c...
                                                  ^

## DuckDb fudendo pra ler delta.

In [38]:
import os
import boto3
import duckdb

# Configurações MinIO e Delta Lake
minio_endpoint = "http://10.96.188.1:9000"
minio_access_key = "miniouser"
minio_secret_key = "miniosecret"
bucket_name = "lakehouse"
prefix = "silver/"

# Inicializar cliente MinIO (via boto3)
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key
)

# Função para baixar os arquivos, mantendo a estrutura de pastas
def download_files_from_s3(prefix, local_base_dir="silver_data"):
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    for obj in response.get('Contents', []):
        file_key = obj['Key']
        local_path = os.path.join(local_base_dir, file_key.replace(prefix, ""))  # Mantém a estrutura de pastas
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        s3_client.download_file(bucket_name, file_key, local_path)
        print(f"Baixado: {local_path}")

# Baixar a estrutura completa
download_files_from_s3(prefix)

# Conexão com DuckDB em memória
conn = duckdb.connect(database=':memory:')
conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")

# Diretório local com os arquivos baixados
local_data_path = "silver_data/silver"

# Função para carregar todos os arquivos Parquet do diretório no DuckDB
def load_parquet_files_to_duckdb(base_dir, table_name="silver_data"):
    # Lista para armazenar os caminhos dos arquivos Parquet
    parquet_files = []
    for root, _, files in os.walk(base_dir):
        for file in files:
            if file.endswith(".parquet"):
                parquet_files.append(os.path.join(root, file))

    # Cria a tabela com base no primeiro arquivo Parquet e insere o restante
    if parquet_files:
        print(f"Carregando {len(parquet_files)} arquivos Parquet para DuckDB.")
        
        # Cria a tabela com o primeiro arquivo Parquet
        conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{parquet_files[0]}')")
        
        # Insere os dados dos arquivos restantes
        for parquet_file in parquet_files[1:]:
            conn.execute(f"INSERT INTO {table_name} SELECT * FROM read_parquet('{parquet_file}')")
            print(f"Inserido: {parquet_file}")
    else:
        print("Nenhum arquivo Parquet encontrado.")

# Carregar todos os arquivos Parquet no DuckDB
load_parquet_files_to_duckdb(local_data_path)

# Consultas de exemplo no DuckDB
try:
    # Contagem total de registros
    total_records = conn.execute("SELECT COUNT(*) as total_records FROM silver_data;").fetchdf()
    print("\nTotal de registros:", total_records)

    # Listar países distintos
    distinct_countries = conn.execute("SELECT DISTINCT country FROM silver_data;").fetchdf()
    print("\nPaíses distintos:")
    print(distinct_countries)

except Exception as e:
    print(f"Erro ao executar as consultas: \n{e}")

finally:
    conn.close()


Baixado: silver_data/_delta_log/00000000000000000000.json
Baixado: silver_data/country=austria/part-00000-deeb568d-3d56-4c54-980d-8fcf2f29758f.c000.snappy.parquet
Baixado: silver_data/country=england/part-00000-f9be67f8-6a4b-4ab6-acbb-cded77a703da.c000.snappy.parquet
Baixado: silver_data/country=france/part-00000-cc7c7abe-7004-4d5d-9402-006c49d578f3.c000.snappy.parquet
Baixado: silver_data/country=germany/part-00000-ef230031-25a4-46bf-8d6e-01240ee45dab.c000.snappy.parquet
Baixado: silver_data/country=ireland/part-00000-3e6fb001-c4c1-4335-910c-e6eaf419a95e.c000.snappy.parquet
Baixado: silver_data/country=isle of man/part-00000-1187f701-7c5c-49cc-931f-2b0661f23b37.c000.snappy.parquet
Baixado: silver_data/country=poland/part-00000-8e28fe9b-4be7-4de2-996f-714d6bda34d3.c000.snappy.parquet
Baixado: silver_data/country=portugal/part-00000-6803d7f9-00cf-4a3c-96d2-4520fc95a6cc.c000.snappy.parquet
Baixado: silver_data/country=scotland/part-00000-5621b1b3-e412-4357-a3dc-0ea90859f67e.c000.snappy.p