In [1]:
def chama_spark():
    from minio import Minio
    from minio.commonconfig import CopySource
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, explode, current_timestamp, from_utc_timestamp, lit
    import datetime
    import pytz
    import re
    import psycopg2
    import time

    # === Spark Session ===
    spark = (
        SparkSession
        .builder
        .appName('pipeline_sptrans')
        .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
        .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
        .config("spark.hadoop.fs.s3a.access.key", "datalake")
        .config("spark.hadoop.fs.s3a.secret.key", "datalake")
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .getOrCreate()
    )

    # === MinIO Client ===
    client = Minio(
        "minio:9000",
        access_key="datalake",
        secret_key="datalake",
        secure=False
    )

    # === Datas ===
    hoje = datetime.datetime.today()
    d = hoje.astimezone(pytz.timezone('America/Sao_Paulo'))
    data_str = d.strftime('%Y-%m-%d')
    data_padrao = d.strftime('%Y%m%d')

    bucket_raw = "raw"
    origem_raw = f"api/{data_str}/"

    # === PostgreSQL ===
    postgres_url = "jdbc:postgresql://db:5432/db"
    properties = {
        "user": "admin",
        "password": "admin",
        "driver": "org.postgresql.Driver"
    }
    psql_conn_info = {
        "host": "db",
        "port": 5432,
        "database": "db",
        "user": "admin",
        "password": "admin"
    }

    # === Espera arquivos no raw ===
    objetos = []
    tentativas = 0
    max_tentativas = 10
    espera_segundos = 10

    while not objetos and tentativas < max_tentativas:
        todos_objetos = list(client.list_objects(bucket_raw, prefix=origem_raw, recursive=False))
        objetos = [obj for obj in todos_objetos if data_padrao in obj.object_name.split('/')[-1]]
        
        if objetos:
            break
        
        print(f"Nenhum arquivo {data_padrao} encontrado em {bucket_raw}/{origem_raw}. Aguardando {espera_segundos}s...")
        time.sleep(espera_segundos)
        tentativas += 1

    if not objetos:
        print(f"Nenhum arquivo chegou após {max_tentativas*espera_segundos}s. Encerrando execução.")
        spark.stop()
        return

    print(f"Arquivos encontrados em {bucket_raw}/{origem_raw}:")
    for obj in objetos:
        print(f"- Nome: {obj.object_name}, Tamanho: {obj.size} bytes")

    # === Processamento de cada arquivo ===
    for obj in objetos:
        if obj.size and obj.size > 0 and re.match(r'^[a-zA-Z0-9]', obj.object_name.split('/')[-1]):
            path_file = f"s3a://{bucket_raw}/{obj.object_name}"
            print(f"\nProcessando arquivo: {path_file}")

            conn = psycopg2.connect(**psql_conn_info)
            cur = conn.cursor()

            try:
                # === Log no processamento_arquivo ===
                cur.execute("""
                    INSERT INTO db.monitoramento_log.processamento_arquivo(nm_arquivo, dt_processamento, nr_tamanho_byte)
                    VALUES (%s, NOW() AT TIME ZONE 'America/Sao_Paulo', %s)
                    RETURNING id
                """, (obj.object_name, obj.size))
                id_arquivo = cur.fetchone()[0]
                conn.commit()

                # === Leitura JSON ===
                df_raw = spark.read.option("multiline", "true").json(path_file)

                # === Verifica vazio ===
                arquivo_vazio = False
                if df_raw.rdd.isEmpty():
                    arquivo_vazio = True
                elif df_raw.count() == 1:
                    first_row = df_raw.head()
                    if all(v is None for v in first_row):
                        arquivo_vazio = True

                if arquivo_vazio:
                    msg = f"Arquivo vazio/nulo: {obj.object_name}"
                    print(msg)

                    cur.execute("""
                        INSERT INTO db.monitoramento_log.processos_log
                            (nm_processo, dt_processamento, dt_delta, ds_status, ds_log, qtde_registro)
                        VALUES (%s, NOW() AT TIME ZONE 'America/Sao_Paulo', %s, %s, %s, %s)
                    """, ("Arquivo para bronze", data_str, 0, msg, 0))
                    conn.commit()

                    destino_arquivo = obj.object_name.replace(origem_raw, origem_raw + "processed/", 1)
                    source = CopySource(bucket_raw, obj.object_name)
                    client.copy_object(bucket_raw, destino_arquivo, source)
                    client.remove_object(bucket_raw, obj.object_name)

                    cur.close()
                    conn.close()
                    continue

                # === Transformações ===
                df_exploded = df_raw.select(col("hr"), explode(col("l")).alias("item"))
                df_final = df_exploded.select(
                    col("hr"), col("item.c").alias("c"), col("item.cl").alias("cl"),
                    col("item.sl").alias("sl"), col("item.lt0").alias("lt0"),
                    col("item.lt1").alias("lt1"), col("item.qv").alias("qv"),
                    explode(col("item.vs")).alias("vs_item")
                ).select(
                    col("hr"), col("c"), col("cl"), col("sl"), col("lt0"), col("lt1"),
                    col("qv"), col("vs_item.p").alias("p"), col("vs_item.a").alias("a"),
                    col("vs_item.ta").alias("ta"), col("vs_item.py").alias("py"),
                    col("vs_item.px").alias("px"), col("vs_item.sv").alias("sv"),
                    col("vs_item.is").alias("vs_is")
                ).withColumn("dt_processamento", from_utc_timestamp(current_timestamp(), "America/Sao_Paulo")) \
                 .withColumn("id_arquivo", lit(id_arquivo))

                # === Grava trusted ===
                nome_arquivo_raw = obj.object_name.split('/')[-1].split('.')[0]
                path_trusted = f"s3a://trusted/api/{data_str}/{nome_arquivo_raw}_p.parquet"
                df_final.coalesce(1).write.mode("overwrite").parquet(path_trusted)

                # === Inserir no bronze ===
                df_final.write.jdbc(url=postgres_url, table="bronze_sptrans.posicao", mode="append", properties=properties)

                qtde_registro = df_final.count()
                msg = f"Arquivo {obj.object_name.strip()} processado com sucesso."

                cur.execute("""
                    INSERT INTO db.monitoramento_log.processos_log
                        (nm_processo, dt_processamento, dt_delta, ds_status, ds_log, qtde_registro)
                    VALUES (%s, NOW() AT TIME ZONE 'America/Sao_Paulo', %s, %s, %s, %s)
                """, ("Arquivo para bronze", data_str, 1, msg, qtde_registro))
                conn.commit()

                destino_arquivo = obj.object_name.replace(origem_raw, origem_raw + "processed/", 1)
                source = CopySource(bucket_raw, obj.object_name)
                client.copy_object(bucket_raw, destino_arquivo, source)
                client.remove_object(bucket_raw, obj.object_name)

                print(msg)

            except Exception as e:
                erro_msg = f"Erro ao processar {obj.object_name}: {e}"
                print(erro_msg)

                cur.execute("""
                    INSERT INTO db.monitoramento_log.processos_log
                        (nm_processo, dt_processamento, dt_delta, ds_status, ds_log, qtde_registro)
                    VALUES (%s, NOW() AT TIME ZONE 'America/Sao_Paulo', %s, %s, %s, %s)
                """, ("Arquivo para bronze", data_str, 0, erro_msg, 0))
                conn.commit()

            finally:
                cur.close()
                conn.close()

    # === Finaliza Spark ===
    spark.stop()
