In [0]:
# -------IMPORTS-------------------------------------------------------------------------------------------------------------------------
from pyspark.sql.functions import col, trim, regexp_replace, concat_ws, when
from delta.tables import DeltaTable

In [0]:
#---UPSERT FUNCTION------------------

def upsert_function(df, catalog_path, adls_path, merge_condition, partition_columns=None):
   if spark.catalog.tableExists(catalog_path) and DeltaTable.isDeltaTable(spark, adls_path):
       delta_table = DeltaTable.forName(spark, catalog_path)
       delta_table.alias("target") \
           .merge(df.alias("source"), merge_condition) \
           .whenMatchedUpdateAll() \
           .whenNotMatchedInsertAll() \
           .execute()
   else:
       df.write.format("delta").mode("overwrite")\
           .option("path", adls_path)\
           .saveAsTable(catalog_path)

#-----WRITE FUNCTION------------------

def write_function(catalog_path, adls_path, merge_condition):
    def validate_function(batch_df, batch_id):
        if batch_df.isEmpty():
            pass
        else:
            upsert_function(
                batch_df,
                catalog_path,
                adls_path,
                merge_condition
            )
    return validate_function

In [0]:
#------------TRANSFORMATIONS--------------

def transform_afiliacion_usuarios(df):
    return df.select("id_usuarioAfiliacion", "nombreUsuario")

def transform_sucursales(df):
    return (
        df.withColumn("estatus", regexp_replace(col("estatus"), r"\r|\n", ""))
          .filter(col("estatus") == "ACTIVO")
          .select("id_sucursal", "descripcionSucursal", "id_zona", "id_region")
    )

def transform_distribuidores(df):
    return (
        df.filter(regexp_replace(col("estatus"), r"\r|\n", "") == "activo")
          .select("id_distribuidor", "id_prospecto", "estatus")
    )

def transform_prospectos(df):
    return (
        df.filter(regexp_replace(col("estatus"), r"\r|\n", "") == "ACTIVO")
          .filter(col("updated_at") > "2024-01-01 00:00:00")
          .select("id_prospecto","id_referido","created_at","id_origen","id_perfil","primerNombre","segundoNombre","primerApellido","segundoApellido","id_sucursal")
          .fillna({"segundoNombre": ""})
          .withColumn("NombreCompleto",concat_ws(" ",col("primerNombre"),when(col("segundoNombre") != "", col("segundoNombre")),col("primerApellido"),col("segundoApellido")))
    )

def transform_distribuidores_perfiles(df):
    return df.filter(col("estatus") == "ACTIVO").select("id_perfil", "descripcionPerfil")

def transform_referidos_origenes(df):
    return df.filter(col("estatus") == "ACTIVO").select("id_origen", "descripcionOrigen")

def transform_regiones(df):
    return df.select("id_region", "descripcionRegion")

def transform_zonas(df):
    return (
        df.filter(regexp_replace(col("estatus"), r"\r|\n", "") == "ACTIVO")
          .select("id_zona", "descripcionZona", "id_region")
    )

def transform_ventas(df):
    return (
        df.filter(col("fechaVenta") > "2024-01-01 00:00:00")
          .select("id_venta", "fechaVenta", "id_distribuidor")
    )

def transform_referidos_asignaciones(df):
    return (
        df.filter(col("fechaAsignacion") > "2024-01-01 00:00:00")
          .select("id_asignacion", "fechaAsignacion", "id_referido", "id_usuarioAfiliacion")
    )

In [0]:
#---CONSTANTS AND PARAMETERS-----

BRONZE_PATH = "abfss://bronzecontainer@rohegastorage.dfs.core.windows.net/pipeline01/"
SILVER_PATH = "abfss://silvercontainer@rohegastorage.dfs.core.windows.net/pipeline01/"
CHECKPOINTS_PATH = "abfss://silvercontainer@rohegastorage.dfs.core.windows.net/checkpoints/"

table_parameters = [
    {
        "name": "afiliacion_usuarios",
        "merge_condition": "target.id_usuarioAfiliacion = source.id_usuarioAfiliacion",
        "transformation": transform_afiliacion_usuarios,
    },
    {
        "name": "distribuidores",
        "merge_condition": "target.id_distribuidor = source.id_distribuidor",
        "transformation": transform_distribuidores,
    },
    {
        "name": "distribuidores_perfiles",
        "merge_condition": "target.id_perfil = source.id_perfil",
        "transformation": transform_distribuidores_perfiles,
    },
    {
        "name": "prospectos",
        "merge_condition": "target.id_prospecto = source.id_prospecto",
        "transformation": transform_prospectos,
    },
    {
        "name": "referidos_origenes",
        "merge_condition": "target.id_origen = source.id_origen",
        "transformation": transform_referidos_origenes,
    },
    {
        "name": "regiones",
        "merge_condition": "target.id_region = source.id_region",
        "transformation": transform_regiones,
    },
    {
        "name": "zonas",
        "merge_condition": "target.id_zona = source.id_zona",
        "transformation": transform_zonas,
    },
    {
        "name": "sucursales",
        "merge_condition": "target.id_sucursal = source.id_sucursal",
        "transformation": transform_sucursales,
    },
    {
        "name": "ventas",
        "merge_condition": "target.id_venta = source.id_venta",
        "transformation": transform_ventas,
    },
    {
        "name": "referidos_asignaciones",
        "merge_condition": "target.id_asignacion = source.id_asignacion",
        "transformation": transform_referidos_asignaciones,
    },
]

#---READSTREAM AND UPSERT FOR EACH TABLE---------

for parameter in table_parameters:
    #df = spark.readStream.format("delta").load(f"{BRONZE_PATH}{parameter['name']}")
    df = spark.readStream.table(f"alex_catalog.bronze.{parameter['name']}")

    transformed_df = parameter["transformation"](df)

    write_table = write_function(
        f"alex_catalog.silver.{parameter['name']}",
        f"{SILVER_PATH}{parameter['name']}", 
        parameter["merge_condition"]
    )
    
    transformed_df.writeStream.format("delta")\
        .foreachBatch(write_table)\
        .option("checkpointLocation", f"{CHECKPOINTS_PATH}{parameter['name']}_checkpoint")\
        .trigger(once=True)\
        .start()\
        .awaitTermination()