In [0]:
import delta
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, desc

In [0]:
tabela = dbutils.widgets.get("tabela")
#tabela ="cotacoes"
schema = "cotacoes_bronze"
catalog = "prod"


In [0]:
if not spark.catalog.tableExists(f"{catalog}.{schema}.{tabela}"):

        print("Tabela não existente, criando.")

        df_full = spark.read.format("delta").table(f"transacional.cotacoes_db.{tabela}")

        (df_full.coalesce(1)
                .write
                .format("delta")
                .mode("overwrite")
                .option("delta.enableChangeDataFeed", "true")
                .saveAsTable(f"{catalog}.{schema}.{tabela}"))
else:
        print("Tabela já existente, ignorando FULL LOAD")

In [0]:
bronze = delta.DeltaTable.forName(spark, f"{catalog}.{schema}.{tabela}")

def upsert (df, delta):

      # Filtra os tipos de mudança que não interessam
      df_filtered = df.filter(col("_change_type") != 'update_preimage')

      # Define a "janela" para encontrar o registro mais recente por Símbolo
      windowSpec = Window.partitionBy("Simbolo").orderBy(desc("_commit_timestamp"), desc("_commit_version"))

      # Aplica a lógica de dedetização (pega a última atualização para cada Símbolo)
      df_cotacoes_cdc = (df_filtered.withColumn("row_num", row_number().over(windowSpec))
                                    .filter(col("row_num") == 1)
                                    .drop("row_num"))

      # df.createOrReplaceGlobalTempView(f"{tabela}_cdc")
      # query = f'''
      #       SELECT * 
      #       FROM global_temp.{tabela}_cdc
      #       WHERE _change_type <> 'update_preimage'
      #       QUALIFY ROW_NUMBER() OVER (PARTITION BY Simbolo ORDER BY _commit_timestamp DESC, _commit_version DESC) = 1;
      #       '''

      # df_cotacoes_cdc = spark.sql(query)

      (delta.alias("b")
        .merge(df_cotacoes_cdc.alias("c"),"b.Simbolo = c.Simbolo")
        .whenMatchedDelete(condition = "c._change_type = 'delete'")
        .whenMatchedUpdateAll(condition = "c._change_type = 'update_postimage'")
        .whenNotMatchedInsertAll(condition = "c._change_type = 'insert' OR c._change_type = 'update_postimage'")  
        .execute()
      )
tabela_cdc= (spark.readStream
                  .format("delta")
                  .option("readChangeFeed", "true")
                  .option("startingVersion", 0)  # começa do início
                  .table(f"transacional.cotacoes_db.{tabela}")
                  )

checkpointpath = f"/Volumes/transacional/cotacoes_db/checkpoint/checkpoint_{tabela}"
stream = (tabela_cdc.writeStream
                    .trigger(availableNow=True)
                    .option("checkpointLocation", checkpointpath)
                    .foreachBatch(lambda df, batchId: upsert(df, bronze))
                    )

start = stream.start()
    