# <b style="color: white; background-color: #00bbff; padding: 5px 10px; border-radius: 5px;">LIBRARY and SETTINGS</b>

In [3]:
import re
from collections import defaultdict
from dotenv import load_dotenv
import boto3
from datetime import datetime, timedelta
from botocore.config import Config
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import logging
import os

# -------------------------
# Configuração Avançada de Logging
# -------------------------
def setup_logger():
    log_directory = "/opt/notebook/logs"
    os.makedirs(log_directory, exist_ok=True)
    log_filename = f"bronze_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
    log_file = os.path.join(log_directory, log_filename)
    
    logger = logging.getLogger("minio_silver")
    if logger.handlers:
        logger.handlers = []
    
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(fmt='{"level": "%(levelname)s", "message": "%(message)s"}')
    
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    file_handler = logging.FileHandler(log_file)
    file_handler.setFormatter(formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger


logger = setup_logger()

# Carrega variáveis de ambiente
load_dotenv()
s3_endpoint = os.getenv("S3_ENDPOINT")
s3_access_key = os.getenv("S3_ACCESS_KEY")
s3_secret_key = os.getenv("S3_SECRET_KEY")

logger.info(f"🔐 Conectando ao endpoint: {s3_endpoint}")

# -------------------------
# Configuração do Cliente S3
# -------------------------
s3 = boto3.client(
    "s3",
    endpoint_url=s3_endpoint,
    aws_access_key_id=s3_access_key,
    aws_secret_access_key=s3_secret_key,
    config=Config(
        signature_version="s3v4",
        retries={
            'max_attempts': 5,
            'mode': 'standard'
        }
    ),
    region_name="us-east-1"
)

{"level": "INFO", "message": "🔐 Conectando ao endpoint: http://minio:9000"}


# <b style="color: white; background-color: #00bbff; padding: 5px 10px; border-radius: 5px;">FUNCTIONS</b>

In [4]:
# -------------------------
# Funções Auxiliares
# -------------------------
def list_csv_files_recursive(bucket, prefix=""):
    """Lista arquivos CSV de forma recursiva com paginação otimizada"""
    csv_files = []
    paginator = s3.get_paginator('list_objects_v2')
    operation_parameters = {
        'Bucket': bucket,
        'Prefix': prefix,
        'Delimiter': '/'
    }
    
    try:
        for page in paginator.paginate(**operation_parameters):
            csv_files.extend([
                obj['Key'] for obj in page.get('Contents', [])
                if obj['Key'].endswith('.csv')
            ])
            
            for subfolder in page.get('CommonPrefixes', []):
                csv_files.extend(list_csv_files_recursive(bucket, subfolder['Prefix']))
                
    except Exception as e:
        logger.error(f"Erro ao listar arquivos: {str(e)}")
        raise
    
    return csv_files

def optimize_iceberg_table(spark, table_path):
    """Executa rotinas de otimização para tabela Iceberg com sintaxe compatível"""
    try:
        # 1. Compactação de arquivos de dados (sem comentários no SQL)
        spark.sql(f"""
            CALL local.system.rewrite_data_files(
                table => '{table_path}',
                options => map(
                    'target-file-size-bytes', '67108864',
                    'min-input-files', '5',
                    'min-file-size-bytes', '33554432'
                )
            )
        """)
        logger.info(f"✅ Compactação concluída para {table_path}")

        # 2. Expurgo de snapshots antigos
        retention_days = 7
        cutoff_date = (datetime.now() - timedelta(days=retention_days)).strftime('%Y-%m-%d %H:%M:%S')
        spark.sql(f"""
            CALL local.system.expire_snapshots(
                table => '{table_path}',
                older_than => timestamp '{cutoff_date}',
                retain_last => 3
            )
        """)
        logger.info(f"🗑️ Snapshots antigos removidos para {table_path}")

        # 3. Limpeza de arquivos órfãos
        spark.sql(f"""
            CALL local.system.remove_orphan_files(
                table => '{table_path}',
                older_than => timestamp '{cutoff_date}'
            )
        """)
        logger.info(f"🧹 Arquivos órfãos removidos para {table_path}")

        # 4. Otimização de metadados
        spark.sql(f"CALL local.system.rewrite_manifests('{table_path}')")
        logger.info(f"📦 Manifestos otimizados para {table_path}")

    except Exception as e:
        logger.error(f"❌ Falha na otimização de {table_path}: {str(e)}")
        raise   
# -------------------------
# Configuração do Spark com Otimizações
# -------------------------
def create_spark_session():
    """Cria uma SparkSession otimizada para Iceberg com MinIO"""
    from pyspark import SparkConf
    
    # Configuração inicial para controle de logs
    conf = SparkConf()
    conf.set("spark.logConf", "false")
    conf.set("spark.ui.showConsoleProgress", "false")
    conf.set("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:/opt/spark/conf/log4j.properties")
    
    # Verifica se os JARs existem
    iceberg_jar = "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar"
    required_jars = [
        iceberg_jar,
        "/opt/spark/jars/hadoop-aws-3.3.4.jar"
    ]
    
    # Configura a SparkSession
    spark = SparkSession.builder \
        .config(conf=conf) \
        .appName("IcebergOptimizedPipeline") \
        .config("spark.jars", ",".join([j for j in required_jars if os.path.exists(j)])) \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.local.type", "hadoop") \
        .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
        .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
        .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.fast.upload", "true") \
        .config("spark.hadoop.fs.s3a.fast.upload.buffer", "disk") \
        .config("spark.hadoop.fs.s3a.connection.maximum", "100") \
        .config("spark.hadoop.fs.s3a.threads.max", "20") \
        .config("spark.sql.catalog.local.default-namespace", "default") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.shuffle.partitions", "4") \
        .config("spark.default.parallelism", "4") \
        .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
        .getOrCreate()
    
    # Configuração adicional de logs
    spark.sparkContext.setLogLevel("ERROR")
    return spark



# <b style="color: white; background-color: #00bbff; padding: 5px 10px; border-radius: 5px;">EXECUTION</b>

In [5]:

# -------------------------
# Processamento Principal
# -------------------------
def main():
    spark = create_spark_session()
    spark.sparkContext.setLogLevel("ERROR")
    
    bucket_name = "ingestion"
    try:
        csv_files = list_csv_files_recursive(bucket_name)
        if not csv_files:
            logger.info("Nenhum arquivo .csv encontrado. Encerrando.")
            return
        
        logger.info(f"📂 Total de arquivos encontrados: {len(csv_files)}")
        
        # Agrupamento por prefixo (nome da tabela)
        prefix_groups = defaultdict(list)
        for file in csv_files:
            folder_name = file.split('/')[-2] if '/' in file else "root"
            prefix_groups[folder_name].append(file)
        
        # Processamento para cada tabela
        for prefix, files in prefix_groups.items():
            table_path = f"local.bronze.{prefix}"
            file_paths = [f"s3a://{bucket_name}/{file}" for file in files]
            
            try:
                logger.info(f"🔧 Processando tabela: {prefix} ({len(files)} arquivos)")
                
                # Leitura dos arquivos
                df = spark.read.option("header", "true").csv(file_paths)
                
                if df.count() == 0:
                    logger.warning(f"⚠️ Dados vazios para {prefix}. Pulando...")
                    continue
                
                # Adiciona metadados temporais
                df = df.withColumn("created_at", current_timestamp())
                df = df.dropDuplicates(["id"]).filter("id IS NOT NULL")
                
                # Criação da tabela com propriedades otimizadas
                cols = ", ".join(
                    [f"{field.name} STRING" for field in df.schema.fields if field.name != "created_at"] +
                    ["created_at TIMESTAMP"]
                )
                
                spark.sql(f"""
                    CREATE TABLE IF NOT EXISTS {table_path} (
                        {cols}
                    )
                    USING iceberg
                    PARTITIONED BY (days(created_at))
                    TBLPROPERTIES (
                        'write.format.default'='parquet',
                        'write.parquet.compression-codec'='zstd',
                        'write.target-file-size-bytes'='67108864',
                        'commit.retry.num-retries'='5',
                        'history.expire.max-snapshot-age-ms'='604800000',
                        'write.metadata.delete-after-commit.enabled'='true',
                        'write.metadata.previous-versions-max'='3'
                    )
                """)
                
                # Escreve dados com estratégia MERGE
                df.createOrReplaceTempView("temp_df")
                
                spark.sql(f"""
                    MERGE INTO {table_path} target
                    USING temp_df source
                    ON target.id = source.id
                    WHEN MATCHED THEN UPDATE SET *
                    WHEN NOT MATCHED THEN INSERT *
                """)
                
                logger.info(f"💾 Dados atualizados na tabela {table_path}")
                
                # Otimização pós-escrita
                optimize_iceberg_table(spark, table_path)
                
                # Limpeza dos arquivos processados
                for file in files:
                    try:
                        s3.delete_object(Bucket=bucket_name, Key=file)
                        logger.debug(f"Arquivo removido: {file}")
                    except Exception as e:
                        logger.error(f"Erro ao remover {file}: {str(e)}")
                
            except Exception as e:
                logger.error(f"❌ Falha no processamento de {prefix}: {str(e)}")
                continue
                
    finally:
        spark.stop()
        logger.info("🏁 Processamento concluído")

if __name__ == "__main__":
    main()

ERROR StatusLogger Reconfiguration failed: No configuration found for '25af5db5' at 'null' in 'null'
ERROR StatusLogger Reconfiguration failed: No configuration found for 'Default' at 'null' in 'null'
25/09/13 19:01:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
{"level": "INFO", "message": "📂 Total de arquivos encontrados: 2"}
{"level": "INFO", "message": "🔧 Processando tabela: clientes (1 arquivos)"}
{"level": "INFO", "message": "💾 Dados atualizados na tabela local.bronze.clientes"}
{"level": "INFO", "message": "✅ Compactação concluída para local.bronze.clientes"}
{"level": "INFO", "message": "🗑️ Snapshots antigos removidos para local.bronze.clientes"}
{"level": "INFO", "message": "🧹 Arquivos órfãos removidos para local.bronze.clientes"}
{"level": "INFO", "message": "📦 Manifes