In [7]:
import os
from pyspark.sql import SparkSession
import re

# Caminho base do bucket MinIO
base_path = "s3a://ingestion/"
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('dataincode') \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://192.168.0.202:9000")\
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()


# Ajuste para reduzir o n√≠vel de log
spark.sparkContext.setLogLevel("ERROR")


# Lista todos os arquivos no bucket (via Hadoop API)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
path = spark._jvm.org.apache.hadoop.fs.Path(base_path)
files = fs.listStatus(path)

# Extrair nomes dos arquivos CSV
csv_files = [f.getPath().getName() for f in files if f.getPath().getName().endswith(".csv")]

# Agrupar por prefixo antes do √∫ltimo underscore "_"
from collections import defaultdict
prefix_groups = defaultdict(list)

for file in csv_files:
    match = re.match(r"(.+?)_\d+\.csv", file)
    if match:
        prefix = match.group(1)
    else:
        prefix = file.replace(".csv", "")  # arquivos √∫nicos

    prefix_groups[prefix].append(file)

# Para cada prefixo, carregar os arquivos e criar tabela Iceberg
for prefix, files in prefix_groups.items():
    print(f"Processando prefixo: {prefix}")

    file_paths = [f"s3a://ingestion/{file}" for file in files]

    # Carregar os CSVs
    df = spark.read.option("header", "true").csv(file_paths)

    # Inferir schema uma vez (ou printar se quiser criar CREATE TABLE manual)
    df.printSchema()

    # Criar tabela se n√£o existir (ajustar tipos conforme seu schema real)
    # Aqui usamos STRING como exemplo para todos os campos
    cols = ", ".join([f"{field.name} STRING" for field in df.schema.fields])

    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS local.bronze.{prefix} (
            {cols}
        )
        USING iceberg
    """)

    # Inserir dados no modo append
    df.writeTo(f"local.bronze.{prefix}").append()


IllegalArgumentException: Wrong FS: s3a://ingestion/, expected: file:///

In [11]:
import os
import re
from dotenv import load_dotenv
from collections import defaultdict
from pyspark.sql import SparkSession

# Carrega vari√°veis de ambiente
load_dotenv()
s3_endpoint = os.getenv("S3_ENDPOINT")
s3_access_key = os.getenv("S3_ACCESS_KEY")
s3_secret_key = os.getenv("S3_SECRET_KEY")

print(f"EndPoint {s3_endpoint} | access {s3_access_key} | key {s3_secret_key} ")

# Caminho base do bucket MinIO
base_path = "s3a://ingestion/"

# SparkSession com suporte a Iceberg + MinIO
spark = SparkSession.builder \
    .appName("dataincode") \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

# Listar arquivos no bucket
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
path = spark._jvm.org.apache.hadoop.fs.Path(base_path)
files = fs.listStatus(path)

# Filtrar arquivos CSV
csv_files = [f.getPath().getName() for f in files if f.getPath().getName().endswith(".csv")]

# Agrupar por prefixo
prefix_groups = defaultdict(list)
for file in csv_files:
    match = re.match(r"(.+?)_\d+\.csv", file)
    prefix = match.group(1) if match else file.replace(".csv", "")
    prefix_groups[prefix].append(file)

# Processar cada grupo
for prefix, files in prefix_groups.items():
    print(f"üîß Processando prefixo: {prefix}")
    
    file_paths = [f"s3a://ingestion/{file}" for file in files]
    df = spark.read.option("header", "true").csv(file_paths)
    
    # Printar o schema inferido
    df.printSchema()
    
    # Montar os campos para CREATE TABLE
    cols = ", ".join([f"{field.name} STRING" for field in df.schema.fields])
    
    # Criar tabela Iceberg se n√£o existir
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS local.bronze.{prefix} (
            {cols}
        )
        USING iceberg
    """)
    
    # Inserir os dados (append)
    df.writeTo(f"local.bronze.{prefix}").append()
    print(f"‚úÖ Tabela 'local.bronze.{prefix}' criada e populada!")

print("üöÄ Todas as tabelas foram processadas com sucesso.")


EndPoint http://minio:9000 | access minio | key minio123 


IllegalArgumentException: Wrong FS: s3a://ingestion/, expected: file:///

In [12]:
import os
import re
from collections import defaultdict
from dotenv import load_dotenv
import boto3
from botocore.config import Config
from pyspark.sql import SparkSession

# Carrega vari√°veis de ambiente do .env
load_dotenv()
s3_endpoint = os.getenv("S3_ENDPOINT")
s3_access_key = os.getenv("S3_ACCESS_KEY")
s3_secret_key = os.getenv("S3_SECRET_KEY")

print(f"üîê Endpoint: {s3_endpoint} | Access Key: {s3_access_key}")

# Conex√£o com MinIO via boto3
s3 = boto3.client(
    "s3",
    endpoint_url=s3_endpoint,
    aws_access_key_id=s3_access_key,
    aws_secret_access_key=s3_secret_key,
    config=Config(signature_version="s3v4"),
    region_name="us-east-1"
)

# Lista arquivos no bucket "ingestion"
bucket_name = "ingestion"
response = s3.list_objects_v2(Bucket=bucket_name)
csv_files = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".csv")]

# Agrupa arquivos por prefixo
prefix_groups = defaultdict(list)
for file in csv_files:
    match = re.match(r"(.+?)_\d+\.csv", file)
    prefix = match.group(1) if match else file.replace(".csv", "")
    prefix_groups[prefix].append(file)

# Cria sess√£o Spark com suporte a Iceberg + MinIO (S3A)
spark = SparkSession.builder \
    .appName("IcebergMinIOIngestion") \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Processa os arquivos agrupados por prefixo
for prefix, files in prefix_groups.items():
    print(f"\nüîß Processando prefixo: {prefix}")

    file_paths = [f"s3a://{bucket_name}/{file}" for file in files]
    df = spark.read.option("header", "true").csv(file_paths)

    # Exibe o schema inferido
    df.printSchema()

    # Prepara os campos para cria√ß√£o da tabela Iceberg
    cols = ", ".join([f"{field.name} STRING" for field in df.schema.fields])

    # Cria tabela Iceberg se n√£o existir
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS local.bronze.{prefix} (
            {cols}
        )
        USING iceberg
    """)

    # Inser√ß√£o com append
    df.writeTo(f"local.bronze.{prefix}").append()
    print(f"‚úÖ Tabela 'local.bronze.{prefix}' criada/populada com sucesso!")

print("\nüöÄ Todas as tabelas foram processadas com sucesso.")


üîê Endpoint: http://minio:9000 | Access Key: minio

üîß Processando prefixo: clientes


                                                                                

root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- email: string (nullable = true)
 |-- data_cadastro: string (nullable = true)
 |-- status: string (nullable = true)



                                                                                

‚úÖ Tabela 'local.bronze.clientes' criada/populada com sucesso!

üîß Processando prefixo: pedido
root
 |-- id: string (nullable = true)
 |-- cliente_id: string (nullable = true)
 |-- data_pedido: string (nullable = true)
 |-- valor_total: string (nullable = true)
 |-- status: string (nullable = true)



                                                                                

‚úÖ Tabela 'local.bronze.pedido' criada/populada com sucesso!

üîß Processando prefixo: stgPedidos
root
 |-- _id: string (nullable = true)
 |-- ikey: string (nullable = true)
 |-- idate: string (nullable = true)
 |-- ihour: string (nullable = true)
 |-- itype: string (nullable = true)
 |-- chave: string (nullable = true)
 |-- classe: string (nullable = true)
 |-- chcriacao: string (nullable = true)
 |-- chpedbaixa: string (nullable = true)
 |-- chdevoluc: string (nullable = true)
 |-- chfatura: string (nullable = true)
 |-- baixado: string (nullable = true)
 |-- recurso: string (nullable = true)
 |-- locentrega: string (nullable = true)
 |-- locescritu: string (nullable = true)
 |-- pessoa: string (nullable = true)
 |-- emissao: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- seriesubs: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- nucleo: string (nullable = true)
 |-- movimentac: string (nullable = true)
 |-- emissaomov: string (nullable = true

                                                                                

‚úÖ Tabela 'local.bronze.stgPedidos' criada/populada com sucesso!

üöÄ Todas as tabelas foram processadas com sucesso.


In [1]:
import os
import re
from collections import defaultdict
from dotenv import load_dotenv
import boto3
from botocore.config import Config
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from datetime import datetime
import logging

# -------------------------
# Configura√ß√£o do Logging
# -------------------------
def setup_logger():
    logger = logging.getLogger("minio_upload")
    logger.setLevel(logging.INFO)

    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        fmt='{"level": "%(levelname)s", "message": "%(message)s"}'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger

logger = setup_logger()

# Carrega vari√°veis de ambiente do .env
load_dotenv()
s3_endpoint = os.getenv("S3_ENDPOINT")
s3_access_key = os.getenv("S3_ACCESS_KEY")
s3_secret_key = os.getenv("S3_SECRET_KEY")

logger.info(f"üîê Endpoint: {s3_endpoint} | Access Key: {s3_access_key[:4]}***")

# Conex√£o com MinIO via boto3
s3 = boto3.client(
    "s3",
    endpoint_url=s3_endpoint,
    aws_access_key_id=s3_access_key,
    aws_secret_access_key=s3_secret_key,
    config=Config(signature_version="s3v4"),
    region_name="us-east-1"
)

# Lista arquivos no bucket "ingestion"
bucket_name = "ingestion"
response = s3.list_objects_v2(Bucket=bucket_name)
csv_files = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".csv")]

# Valida√ß√£o: existem arquivos CSV?
if not csv_files:
    logger.info("Nenhum arquivo .csv encontrado no bucket 'ingestion'. Abortando script.", level="WARNING")
    exit(0)


logger.info(f"{len(csv_files)} arquivos .csv encontrados no bucket.")

# Agrupa arquivos por prefixo
prefix_groups = defaultdict(list)
for file in csv_files:
    match = re.match(r"(.+?)_\d+\.csv", file)
    prefix = match.group(1) if match else file.replace(".csv", "")
    prefix_groups[prefix].append(file)

# Cria sess√£o Spark com suporte a Iceberg + MinIO (S3A)
spark = SparkSession.builder \
    .appName("IcebergMinIOIngestion") \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Processa os arquivos agrupados por prefixo
for prefix, files in prefix_groups.items():
    logger.info(f"üîß Processando prefixo: {prefix}")

    file_paths = [f"s3a://{bucket_name}/{file}" for file in files]
    df = spark.read.option("header", "true").csv(file_paths)

    # Adiciona a coluna created_at com a data/hora atual
    from pyspark.sql.functions import current_timestamp
    df = df.withColumn("created_at", current_timestamp())

    # Exibe o schema inferido
    df.printSchema()

    # Prepara os campos para cria√ß√£o da tabela Iceberg
    cols = ", ".join([f"{field.name} STRING" for field in df.schema.fields])

    # Cria tabela Iceberg se n√£o existir
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS local.bronze.{prefix} (
            {cols}
        )
        USING iceberg
    """)

    # Inser√ß√£o com append
    #df.writeTo(f"local.bronze.{prefix}").append()
    #Mais performatico para grandes volumes
    df.writeTo(f"local.bronze.{prefix}").overwritePartitions()
    logger.info(f"‚úÖ Tabela 'local.bronze.{prefix}' criada/populada com sucesso!")

    # Remove arquivos processados do bucket
    for file in files:
        s3.delete_object(Bucket=bucket_name, Key=file)
        logger.info(f"üóëÔ∏è Arquivo deletado do bucket: {file}")

logger.info("üöÄ Todas as tabelas foram processadas e os arquivos .csv foram exclu√≠dos com sucesso.")


{"level": "INFO", "message": "üîê Endpoint: http://minio:9000 | Access Key: mini***"}
{"level": "INFO", "message": "1 arquivos .csv encontrados no bucket."}
25/04/20 01:10:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/20 01:10:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
{"level": "INFO", "message": "üîß Processando prefixo: clientes"}
                                                                                

root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- email: string (nullable = true)
 |-- data_cadastro: string (nullable = true)
 |-- status: string (nullable = true)
 |-- created_at: timestamp (nullable = false)



{"level": "INFO", "message": "‚úÖ Tabela 'local.bronze.clientes' criada/populada com sucesso!"}
{"level": "INFO", "message": "üóëÔ∏è Arquivo deletado do bucket: clientes.csv"}
{"level": "INFO", "message": "üöÄ Todas as tabelas foram processadas e os arquivos .csv foram exclu√≠dos com sucesso."}


# TESTE COM API

In [1]:
import os
import re
from collections import defaultdict
from dotenv import load_dotenv
import boto3
from botocore.config import Config
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from datetime import datetime
import logging

# -------------------------
# Configura√ß√£o do Logging
# -------------------------
def setup_logger():
    # Evita m√∫ltiplos handlers
    logger = logging.getLogger("minio_upload")
    if logger.handlers:  # Remove handlers existentes
        logger.handlers = []
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter(fmt='{"level": "%(levelname)s", "message": "%(message)s"}')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = setup_logger()

# Carrega vari√°veis de ambiente do .env
load_dotenv()
s3_endpoint = os.getenv("S3_ENDPOINT")
s3_access_key = os.getenv("S3_ACCESS_KEY")
s3_secret_key = os.getenv("S3_SECRET_KEY")

logger.info(f"üîê Endpoint: {s3_endpoint} | Access Key: {s3_access_key[:4]}***")

# Conex√£o com MinIO via boto3
s3 = boto3.client(
    "s3",
    endpoint_url=s3_endpoint,
    aws_access_key_id=s3_access_key,
    aws_secret_access_key=s3_secret_key,
    config=Config(signature_version="s3v4"),
    region_name="us-east-1"
)

# Lista arquivos no bucket "ingestion"
bucket_name = "ingestion"
response = s3.list_objects_v2(Bucket=bucket_name)
csv_files = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".csv")]

# Valida√ß√£o: existem arquivos CSV?
if not csv_files:
    logger.info("Nenhum arquivo .csv encontrado no bucket 'ingestion'. Abortando script.")
    exit(0)

logger.info(f"{len(csv_files)} arquivos .csv encontrados no bucket.")

# Agrupa arquivos por prefixo
prefix_groups = defaultdict(list)
for file in csv_files:
    match = re.match(r"(.+?)_\d+\.csv", file)
    prefix = match.group(1) if match else file.replace(".csv", "")
    prefix_groups[prefix].append(file)

# Cria sess√£o Spark com suporte a Iceberg + MinIO (S3A)
spark = SparkSession.builder \
    .appName("IcebergMinIOIngestion") \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.sql.catalog.local.default-namespace", "default") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Processa os arquivos agrupados por prefixo
for prefix, files in prefix_groups.items():
    logger.info(f"üîß Processando prefixo: {prefix}")

    # --------------------------------
    # Camada Bronze
    # --------------------------------
    file_paths = [f"s3a://{bucket_name}/{file}" for file in files]
    df = spark.read.option("header", "true").csv(file_paths)

    # Adiciona a coluna created_at com a data/hora atual
    df = df.withColumn("created_at", current_timestamp())

    # Valida√ß√£o: Remove duplicatas e nulos na chave prim√°ria (id)
    df = df.dropDuplicates(["id"]).filter("id IS NOT NULL")

    # Exibe o schema inferido
    #df.printSchema()

    # Prepara os campos para cria√ß√£o da tabela Iceberg
    cols = ", ".join([f"{field.name} STRING" for field in df.schema.fields if field.name != "created_at"] + ["created_at TIMESTAMP"])

    # Cria tabela Iceberg na camada bronze se n√£o existir
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS local.bronze.{prefix} (
            {cols}
        )
        USING iceberg
        PARTITIONED BY (days(created_at))
        TBLPROPERTIES (
            'write.format.default'='parquet',
            'write.parquet.compression-codec'='snappy',
            'write.target-file-size-bytes'='134217728',
            'commit.retry.num-retries'='10'
        )
    """)

    # Carrega a tabela bronze existente
    bronze_df = spark.table(f"local.bronze.{prefix}")

    # Separa registros para atualiza√ß√£o (existem na tabela bronze) e inser√ß√£o (novos)
    existing_ids = bronze_df.select("id").distinct()
    update_df = df.join(existing_ids, "id", "inner")  # Registros que j√° existem
    insert_df = df.join(existing_ids, "id", "left_anti")  # Registros novos

    # Escreve novos registros (inser√ß√£o)
    if not insert_df.isEmpty():
        insert_df.writeTo(f"local.bronze.{prefix}").append()
        logger.info(f"‚úÖ Inseridos novos registros na tabela 'local.bronze.{prefix}'.")

    # Escreve atualiza√ß√µes (sobrescreve parti√ß√µes afetadas)
    if not update_df.isEmpty():
        update_df.writeTo(f"local.bronze.{prefix}").overwritePartitions()
        logger.info(f"‚úÖ Atualizados registros existentes na tabela 'local.bronze.{prefix}'.")

    # Manuten√ß√£o na camada bronze
    spark.sql(f"CALL local.system.rewrite_data_files(table => 'local.bronze.{prefix}')")
    logger.info(f"üßπ Compacta√ß√£o de arquivos executada na tabela 'local.bronze.{prefix}'.")

    # Remove arquivos processados do bucket
    for file in files:
        s3.delete_object(Bucket=bucket_name, Key=file)
        logger.info(f"üóëÔ∏è Arquivo deletado do bucket: {file}")

logger.info("üöÄ Todas as tabelas foram processadas e os arquivos .csv foram exclu√≠dos com sucesso.")

# Fecha a sess√£o Spark
spark.stop()

{"level": "INFO", "message": "üîê Endpoint: http://minio:9000 | Access Key: mini***"}
{"level": "INFO", "message": "20 arquivos .csv encontrados no bucket."}
25/04/20 15:06:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
{"level": "INFO", "message": "üîß Processando prefixo: clientes"}
{"level": "INFO", "message": "‚úÖ Inseridos novos registros na tabela 'local.bronze.clientes'."}
{"level": "INFO", "message": "üßπ Compacta√ß√£o de arquivos executada na tabela 'local.bronze.clientes'."}
{"level": "INFO", "message": "üóëÔ∏è Arquivo deletado do bucket: clientes_01.csv"}
{"level": "INFO", "message": "üóëÔ∏è Arquivo deletado do bucket: clientes_02.csv"}
{"level": "INFO", "message": "üóëÔ∏è Arquivo deletado do bucket: clientes_03.csv"}
{"level": "INFO", "message": "üóëÔ∏è Arquiv

In [3]:
print(spark.version)

3.5.1


In [1]:
import re
from collections import defaultdict
from dotenv import load_dotenv
import boto3
from botocore.config import Config
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import logging
import os  # Adicionado para importar os.getenv

# -------------------------
# Configura√ß√£o do Logging
# -------------------------
def setup_logger():
    logger = logging.getLogger("minio_upload")
    if logger.handlers:
        logger.handlers = []
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter(fmt='{"level": "%(levelname)s", "message": "%(message)s"}')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = setup_logger()

# Carrega vari√°veis de ambiente do .env
load_dotenv()
s3_endpoint = os.getenv("S3_ENDPOINT")
s3_access_key = os.getenv("S3_ACCESS_KEY")
s3_secret_key = os.getenv("S3_SECRET_KEY")

logger.info(f"üîê Endpoint: {s3_endpoint} | Access Key: {s3_access_key[:4]}***")

# Conex√£o com MinIO via boto3
s3 = boto3.client(
    "s3",
    endpoint_url=s3_endpoint,
    aws_access_key_id=s3_access_key,
    aws_secret_access_key=s3_secret_key,
    config=Config(signature_version="s3v4"),
    region_name="us-east-1"
)

# Lista arquivos no bucket "ingestion"
bucket_name = "ingestion"
response = s3.list_objects_v2(Bucket=bucket_name)
csv_files = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".csv")]

# Valida√ß√£o: existem arquivos CSV?
if not csv_files:
    logger.info("Nenhum arquivo .csv encontrado no bucket 'ingestion'. Abortando script.")
    exit(0)

logger.info(f"{len(csv_files)} arquivos .csv encontrados no bucket.")

# Agrupa arquivos por prefixo
prefix_groups = defaultdict(list)
for file in csv_files:
    match = re.match(r"(.+?)_\d+\.csv", file)
    prefix = match.group(1) if match else file.replace(".csv", "")
    prefix_groups[prefix].append(file)

# Cria sess√£o Spark com suporte a Iceberg + MinIO (S3A)
spark = SparkSession.builder \
    .appName("IcebergMinIOIngestion") \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.sql.catalog.local.default-namespace", "default") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Lista para armazenar arquivos a serem deletados
files_to_delete = []

# Processa os arquivos agrupados por prefixo
for prefix, files in prefix_groups.items():
    logger.info(f"üîß Processando prefixo: {prefix}")

    # --------------------------------
    # Camada Bronze
    # --------------------------------
    file_paths = [f"s3a://{bucket_name}/{file}" for file in files]
    df = spark.read.option("header", "true").csv(file_paths)

    # Adiciona a coluna created_at com a data/hora atual
    df = df.withColumn("created_at", current_timestamp())

    # Valida√ß√£o: Remove duplicatas e nulos na chave prim√°ria (id)
    df = df.dropDuplicates(["id"]).filter("id IS NOT NULL")

    # Prepara os campos para cria√ß√£o da tabela Iceberg
    cols = ", ".join([f"{field.name} STRING" for field in df.schema.fields if field.name != "created_at"] + ["created_at TIMESTAMP"])

    # Cria tabela Iceberg na camada bronze se n√£o existir
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS local.bronze.{prefix} (
            {cols}
        )
        USING iceberg
        PARTITIONED BY (days(created_at))
        TBLPROPERTIES (
            'write.format.default'='parquet',
            'write.parquet.compression-codec'='snappy',
            'write.target-file-size-bytes'='134217728',
            'commit.retry.num-retries'='10'
        )
    """)

    # Carrega a tabela bronze existente
    bronze_df = spark.table(f"local.bronze.{prefix}")

    # Separa registros para atualiza√ß√£o (existem na tabela bronze) e inser√ß√£o (novos)
    existing_ids = bronze_df.select("id").distinct()
    update_df = df.join(existing_ids, "id", "inner")  # Registros que j√° existem
    insert_df = df.join(existing_ids, "id", "left_anti")  # Registros novos

    # Escreve novos registros (inser√ß√£o)
    if not insert_df.isEmpty():
        insert_df.writeTo(f"local.bronze.{prefix}").append()
        logger.info(f"‚úÖ Inseridos novos registros na tabela 'local.bronze.{prefix}'.")

    # Escreve atualiza√ß√µes (sobrescreve parti√ß√µes afetadas)
    if not update_df.isEmpty():
        update_df.writeTo(f"local.bronze.{prefix}").overwritePartitions()
        logger.info(f"‚úÖ Atualizados registros existentes na tabela 'local.bronze.{prefix}'.")

    # Manuten√ß√£o na camada bronze
    spark.sql(f"CALL local.system.rewrite_data_files(table => 'local.bronze.{prefix}')")
    logger.info(f"üßπ Compacta√ß√£o de arquivos executada na tabela 'local.bronze.{prefix}'.")

    # Adiciona arquivos processados √† lista de exclus√£o
    files_to_delete.extend(files)

# Deleta arquivos processados do bucket ap√≥s todas as opera√ß√µes Spark
for file in files_to_delete:
    s3.delete_object(Bucket=bucket_name, Key=file)
    logger.info(f"üóëÔ∏è Arquivo deletado do bucket: {file}")

logger.info("üöÄ Todas as tabelas foram processadas e os arquivos .csv foram exclu√≠dos com sucesso.")

# Fecha a sess√£o Spark
spark.stop()

{"level": "INFO", "message": "üîê Endpoint: http://minio:9000 | Access Key: mini***"}
{"level": "INFO", "message": "20 arquivos .csv encontrados no bucket."}
25/04/21 09:57:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/21 09:57:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
{"level": "INFO", "message": "üîß Processando prefixo: clientes"}
{"level": "INFO", "message": "‚úÖ Atualizados registros existentes na tabela 'local.bronze.clientes'."}
{"level": "INFO", "message": "üßπ Compacta√ß√£o de arquivos executada na tabela 'local.bronze.clientes'."}
{"level": "INFO", "message": "üóëÔ∏è Arquivo deletado do bucket: clientes_01.csv"}
{"level": "INFO", "message": "üóëÔ∏è Arquivo deletado do bucket: clientes_02.csv"}
{"level": "INFO", "mes