In [29]:
import os
import sys
import boto3
import logging
import duckdb
import polars as pl
from functools import cache, lru_cache
from pathlib import Path
from abc import ABC, abstractmethod, ABCMeta
from boto3.session import Session
from botocore.exceptions import ClientError

In [5]:
logger = logging.getLogger("TableCatalog")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
logger.addHandler(handler)

In [6]:
client_glue = boto3.client(service_name="glue")
client_s3 = boto3.client(service_name="s3")

In [7]:
class CatalogInterface:
    """
    Classe singleton de interface para operações no catálogo de Glue.

    Atributos:
    _instance : CatalogInterface
        Instância única da classe.
    _client_glue : boto3.client
        Cliente boto3 para o serviço AWS Glue.
    _cache : dict[str, list]
        Dicionário de cache para armazenar partições recuperadas.
    """

    _instance = None
    _client_glue = None
    _cache: dict[str, list] = {}

    def __new__(cls):
        """Inicializa a classe e garante que apenas uma instância seja criada."""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._client_glue = cls.create_client("glue")
        return cls._instance

    @classmethod
    def create_client(cls, service: str) -> Session.client:
        """
        Cria um cliente boto3 para o serviço AWS especificado.

        :param service: Nome do serviço AWS.
        :return: Um cliente boto3 para o serviço especificado.
        """
        return boto3.client(service_name=service)

    @classmethod
    def get_client(cls) -> Session.client:
        """
        Retorna o cliente boto3 do Glue se já existente ou criando-o se necessário.

        :return: Cliente boto3 para o AWS Glue.
        """
        if cls._client_glue is None:
            cls._client_glue = cls.create_client("glue")
        return cls._client_glue

    @classmethod
    def get_partition(cls, database: str, table: str, partition: list) -> dict | None:
        """
        Recupera uma partição específica do Catálogo do Glue.

        :param database: Nome do banco de dados.
        :param table: Nome da tabela.
        :param partition: Os valores da partição.
        :return: Os detalhes da partição se encontrada, caso contrário, None.
        """
        client_glue = cls.get_client()
        partition = list(map(str, partition))

        try:
            return client_glue.get_partition(
                DatabaseName=database,
                TableName=table,
                PartitionValues=partition
            )
        except client_glue.exceptions.EntityNotFoundException:
            logger.warning(f"Partição {partition} não existe na tabela: {database}.{table}")
            return None

    @classmethod
    def get_all_partitions(cls, database: str, table: str, only_first_value: bool = True) -> list:
        """
        Recupera todas as partições de uma tabela específica no Catálogo do Glue.

        :param database: Nome do banco de dados.
        :param table: Nome da tabela.
        :param only_first_value: Se True, apenas o primeiro valor de cada partição é retornado.
        :return: Uma lista das partições.
        """
        client_glue = cls.get_client()

        if f"{database}.{table}" in cls._cache:
            if only_first_value:
                return [item[0] for item in cls._cache[f"{database}.{table}"]]
            return cls._cache[f"{database}.{table}"]

        next_token = None
        partitions = []
        try:
            while True:
                response = client_glue.get_partitions(
                    DatabaseName=database,
                    TableName=table,
                    MaxResults=500,
                    ExcludeColumnSchema=True,
                    NextToken=next_token
                )
                partitions += [item["Values"] for item in sorted(
                    response["Partitions"],
                    key=lambda x: x["Values"][0],
                    reverse=True)
                ]
                if "NextToken" not in response:
                    break
                next_token = response["NextToken"]
        except client_glue.exceptions.EntityNotFoundException:
            logger.warning(f"Erro ao buscar partições da tabela: {database}.{table}")

        cls._cache[f"{database}.{table}"] = partitions
        return partitions

    @classmethod
    def create_partition(cls, database: str, table: str, partition: list, location: str) -> None:
        """
        Cria uma nova partição no Catálogo do Glue.

        :param database: Nome do banco de dados.
        :param table: Nome da tabela.
        :param partition: Os valores da partição.
        :param location: O local dos dados da partição no S3.
        """
        client_glue = cls.get_client()

        if cls.get_partition(database, table, partition):
            logger.warning("Partição já existe!")
            return

        partition = list(map(str, partition))
        try:
            logger.debug(f"Criando partição {partition} em {location}...")
            response = client_glue.create_partition(
                DatabaseName=database,
                TableName=table,
                PartitionInput={
                    "Values": partition,
                    "StorageDescriptor": {
                        "Location": location,
                        "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
                        "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
                        "SerdeInfo": {
                            "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                            "Parameters": {"serialization.format": "1"}
                        }
                    }
                }
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
                logger.debug("Criado partição com sucesso!")
        except Exception as e:
            logger.error(f"Erro ao criar partição {partition} para tabela {database}.{table}")
            logger.error(e)

    @classmethod
    def get_table_detail(cls, database: str, table: str) -> dict:
        client_glue = cls.get_client()
        try:
            response = client_glue.get_table(
                # CatalogId="",
                DatabaseName=database,
                Name=table,
                TransactionId='string',
                IncludeStatusDetails=True
            )
        except client_glue.exceptions.EntityNotFoundException as err:
            logger.error("")
            raise err
        return response["Table"]
        # return {
        #     'name': details['Name'],
        #     'database': details['DatabaseName'],
        #     'partitions': details['PartitionKeys'],
        #     'table_type': details['TableType'],
        #     'location': details['StorageDescriptor']['Location'],
        #     'is_compressed': details['StorageDescriptor']['Compressed'],
        #     'compression': details['Parameters'].get('compressionType'),
        #     'classification': details['Parameters'].get('classification'),
        # }

In [38]:
class MetaTable(type):

    location_cache: dict = {}

    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, *args, **kwargs)

    @classmethod
    @cache
    def __call__(cls, name: str, full: bool = False) -> str:
        try:
            database, table = name.split('.')
        except ValueError:
            raise ValueError(f"Nome da tabela inválido: {name}. Deve ser no formato 'database.table'.")
        definition = CatalogInterface.get_table_detail(database, table)
        location = definition['StorageDescriptor']['Location']
        if not location.endswith('/'):
            location += '/'
        return cls.format_full_url(location) if full else location
        # if name not in cls.location_cache:
        #     try:
        #         database, table = name.split('.')
        #     except ValueError:
        #         raise ValueError(f"Nome da tabela inválido: {name}. Deve ser no formato 'database.table'.")
        #     definition = CatalogInterface.get_table_detail(database, table)
        #     location = definition['StorageDescriptor']['Location']
        #     if not location.endswith('/'):
        #         location += '/'
        #     location = cls.format_full_url(location) if not full else location
        #     cls.location_cache[name] = location
        # return cls.location_cache[name]

    @staticmethod
    def parse_s3_url(s3_url: str) -> tuple[str, str]:
        parts = s3_url[5:].split('/', 1)
        bucket_name = parts[0]
        prefix = parts[1] if len(parts) > 1 else ''
        return bucket_name, prefix.rstrip('/')

    @classmethod
    def format_full_url(cls, s3_url: str, file_format: str = "") -> str:
        if file_format != "" and not file_format.startswith('.'):
            file_format = "." + file_format
        bucket_name, prefix = cls.parse_s3_url(s3_url)
        paginator = client_s3.get_paginator('list_objects_v2')
        len_prefix = len(prefix.split("/"))
        for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
            if 'Contents' in page:
                for obj in page['Contents']:
                    key = obj['Key']
                    path = Path(key)
                    len_key = ["*"] * (len(key.split("/")) - len_prefix)
                    return f"s3://{bucket_name}/{prefix}/{'/'.join(len_key)}{path.suffix}"
        raise ValueError("Not found")

In [39]:
class Table(metaclass=MetaTable):
    ...    

In [44]:
Table("base.tb_current", full=True)

's3://data-us-east-1-891377318910/feature_quality/tb_current/*.parquet'

## Teste

### polars

In [12]:
creds = pl.CredentialProviderAWS(profile_name="default")

In [13]:
# df = pl.read_parquet(
#     source='s3://data-us-east-1-891377318910/feature_quality/tb_current/',
#     parallel="auto",
#     # use_pyarrow=True,
# )

In [45]:
df = pl.read_parquet(Table("base.tb_current"))

In [46]:
df.head()

fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,ph,sulphates,alcohol,target,prediction
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64
6.1,0.48,0.09,1.7,0.078,18.0,30.0,0.99402,3.45,0.54,11.2,6,6
6.1,0.715,0.1,2.6,0.053,13.0,27.0,0.99362,3.57,0.5,11.9,5,6
5.4,0.42,0.27,2.0,0.092,23.0,55.0,0.99471,3.78,0.64,12.3,7,6
7.2,0.66,0.33,2.5,0.068,34.0,102.0,0.99414,3.27,0.78,12.8,6,8
10.6,0.36,0.57,2.3,0.087,6.0,20.0,0.99676,3.14,0.72,11.1,7,6


In [47]:
df = pl.scan_parquet(Table("base.amazon_reviews", full=False), hive_partitioning=True)

In [48]:
df_filtered = df.filter(pl.col("product_category") == "Baby").collect()

In [101]:
df_filtered.count()

marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,product_category
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
1752932,1752932,1752932,1752932,1752932,1752909,1752932,1752932,1752932,1752932,1752932,1752921,1752774,1752932,1752932


### duckdb

In [50]:
conn = duckdb.connect(database=":memory:", read_only=False)
conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")
conn.execute("SET s3_region='us-east-1';")
conn.execute("""
CREATE SECRET (
    TYPE S3,
    PROVIDER CREDENTIAL_CHAIN
);
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7f212d35ae70>

In [51]:
# df_rel = conn.read_parquet(Table("base.amazon_reviews"))

In [52]:
df = conn.sql(f"""
SELECT * 
FROM read_parquet('{Table("base.amazon_reviews", full=True)}')
WHERE product_category = 'Baby';
""").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [53]:
df.count()

marketplace          1752932
customer_id          1752932
review_id            1752932
product_id           1752932
product_parent       1752932
product_title        1752909
star_rating          1752932
helpful_votes        1752932
total_votes          1752932
vine                 1752932
verified_purchase    1752932
review_headline      1752921
review_body          1752774
review_date          1752932
product_category     1752932
dtype: int64

In [67]:
def parse_s3_url(s3_url: str) -> tuple[str, str]:
    parts = s3_url[5:].split('/', 1)
    bucket_name = parts[0]
    prefix = parts[1] if len(parts) > 1 else ''
    return bucket_name, prefix.rstrip('/')


def find_s3_objects(s3_url: str, file_format: str = "") -> str:
    if file_format != "" and not file_format.startswith('.'):
        file_format = "." + file_format
    bucket_name, prefix = parse_s3_url(s3_url)
    paginator = client_s3.get_paginator('list_objects_v2')
    len_prefix = len(prefix.split("/"))
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                path = Path(key)
                len_key = ["*"] * (len(key.split("/")) - len_prefix)
                return f"s3://{bucket_name}/{prefix}/{'/'.join(len_key)}{path.suffix}"
                # if path.suffix == "":
                #     len_key = ["*"] * (len(key.split("/")) - len_prefix)
                #     return f"s3://{bucket_name}/{prefix}/{'/'.join(len_key)}{file_format}"
    raise ValueError("Not found")

In [68]:
table_location = find_s3_objects(Table("base.amazon_reviews"))
table_location

's3://data-us-east-1-891377318910/datasets/amazon_reviews/*/*'

In [65]:
df = conn.sql(f"""
SELECT * 
FROM read_parquet('{table_location}')
WHERE product_category = 'Baby';
""").df()

In [66]:
df.count()

marketplace          1752932
customer_id          1752932
review_id            1752932
product_id           1752932
product_parent       1752932
product_title        1752909
star_rating          1752932
helpful_votes        1752932
total_votes          1752932
vine                 1752932
verified_purchase    1752932
review_headline      1752921
review_body          1752774
review_date          1752932
product_category     1752932
dtype: int64