In [0]:
from pyspark.sql.functions import current_timestamp
def add_ingestion_date(input_df):
    input_df.withColumn('ingestion_date', current_timestamp())
    return input_df

In [0]:
def overwrite_partition(input_df, db_name, table_name, column_partition):
    for item_list in input_df.select(f"{column_partition}").distinct().collect():
        if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
            spark.sql(f"alter table {db_name}.{table_name} DROP IF EXISTS partition({column_partition}= '{item_list[column_partition]}')")

In [0]:
def merge_delta_data(df, base_path, db_name, table_name, partition_col, merge_key):
    from delta.tables import DeltaTable

    full_path = f"{base_path}/{table_name}"
    full_table = f"{db_name}.{table_name}"

    print(f"Procesando tabla: {full_table}")
    print(f"Ruta física: {full_path}")

    # Verificar si existe físicamente (con _delta_log)
    delta_log_path = full_path + "/_delta_log/"

    tabla_existe = False
    try:
        files = dbutils.fs.ls(delta_log_path)
        tabla_existe = True
    except:
        tabla_existe = False

    if tabla_existe:
        print("✔ La tabla Delta existe (carpeta con _delta_log).")

        print("✔ Ejecutando MERGE...")

        delta = DeltaTable.forPath(spark, full_path)

        delta.alias("tgt").merge(
            df.alias("src"),
            f"{merge_key} and tgt.{partition_col} = src.{partition_col}"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

        print("✔ MERGE finalizado.")

    else:
        print("✔  La tabla NO existe. Creándola desde cero...")

        # 1️⃣ escribir archivos Delta
        #df.write.format("delta").mode("overwrite").partitionBy(partition_col).saveAsTable(full_path)
        #df.write.format("delta").mode("overwrite").partitionBy(partition_col).saveAsTable(f"{base_path}/{table_name}")
        df.write.format("delta").mode("overwrite").partitionBy(partition_col).saveAsTable(f"{db_name}.{table_name}")


