In [0]:
import delta
from pyspark.sql import functions as F
import json
from pyspark.sql import types

def tabel_exists(catalog,database, tableName):
    
    count = (spark.sql(f"SHOW TABLES FROM {catalog}.{database}")
    .filter(f"database = '{database}' AND tableName = '{tableName}'")
    .count())

    return count == 1

def import_schema(table):
    # df_full = spark.read.format("parquet").load(f"/Volumes/raw/{db_schema}/cdc/{table}/")
    with open(f'{table}.json', 'r') as open_file:
        schema_json = json.load(open_file)
    df_schema = types.StructType.fromJson(schema_json)
    return df_schema 







   

In [0]:

db_schema = 'sistema_pontos'
catalog = 'bronze'
# table = 'clientes'
# table_cdc = 'clientes'
# id_field = 'IdClientes'
table = dbutils.widgets.get('tablename')
table_cdc = dbutils.widgets.get('tablename_cdc') ## tabela que contem o cdc
id_field = dbutils.widgets.get('id_field')
df_schema = import_schema(table)

In [0]:
if not tabel_exists(catalog,db_schema,table):

  print("Tabela nao existente, criando com full_load")


  df_full = spark.read.format("parquet").load(f"/Volumes/raw/{db_schema}/full_load/{table}/")
  (df_full.coalesce(1)
  .write
  .format("delta")
  .mode("overwrite")
  .saveAsTable(f"{catalog}.{db_schema}.{table}"))

else:
  print("Tabela ja existente, ignorando full_load")

In [0]:
cdc_path = f"/Volumes/raw/{db_schema}/cdc/{table_cdc}/"

def path_exists(path: str) -> bool:
    try:
        dbutils.fs.ls(path)
        return True
    except Exception:
        return False


if not path_exists(cdc_path):
    print(f"CDC ainda n√£o existe em {cdc_path}. Pipeline finalizado apenas com FULL LOAD.")
    dbutils.notebook.exit("FULL LOAD executado. CDC inexistente.")


In [0]:
bronze = delta.DeltaTable.forName(spark, f"{catalog}.{db_schema}.{table}")

def upsert(df, deltaTable):

    w = Window.partitionBy(id_field).orderBy(F.col("_cdc_ts").desc())

    df_cdc_unique = (df
        .withColumn("_rn", F.row_number().over(w))
        .filter(F.col("_rn") == 1)
        .drop("_rn")
    )

    (
        deltaTable.alias("b")
        .merge(df_cdc_unique.alias("d"), f"b.{id_field} = d.{id_field}")
        .whenMatchedDelete(condition = "d.op = 'D'")
        .whenMatchedUpdateAll(condition = "d.op = 'U'")
        .whenNotMatchedInsertAll(condition = "d.op = 'I' or d.op = 'U'")
        .execute()
    )

In [0]:
df_stream = (spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "parquet")
 .schema(df_schema)
 .load(f"/Volumes/raw/sistema_pontos/cdc/{table_cdc}/"))

def executar_upsert_no_bronze(df, batchID):
    upsert(df, bronze)

stream = (df_stream.writeStream.option("checkpointLocation", f"/Volumes/raw/sistema_pontos/cdc/{table_cdc}_checkpoint/")
           .foreachBatch(executar_upsert_no_bronze))

In [0]:
start = stream.trigger(availableNow=True).start()
