In [0]:
import pyspark.sql.functions as F

In [0]:
def normalize_column_value(df, column_name, old_value1, new_value1,old_value2, new_value2):
    """
    Cleans and standardizes categorical values in a given column.

    - Applies TRIM to remove leading/trailing spaces.
    - Applies UPPER to make the comparison case-insensitive.
    - Replaces matching values:
        * old_value1 → new_value1
        * old_value2 → new_value2
    - All other values are replaced with 'n/a'.

    Parameters
    ----------
    df : pyspark.sql.DataFrame
        Input DataFrame.
    column_name : str
        Column to normalize.
    old_value1 : str
        Original value (after trim/upper) to be mapped.
    new_value1 : str
        Replacement value for old_value1.
    old_value2 : str
        Original value (after trim/upper) to be mapped.
    new_value2 : str
        Replacement value for old_value2.

    Returns
    -------
    pyspark.sql.DataFrame
        DataFrame with the normalized column.
    """
    df_new = df.withColumn(column_name,F.when(F.upper(F.trim(F.col(column_name)))==old_value1,new_value1)\
        .when(F.upper(F.trim(F.col(column_name)))==old_value2,new_value2)
        .otherwise('n/a'))
    return df_new


In [0]:
def get_access_data_lake():
    client_id= dbutils.secrets.get(scope="salesdwhscope", key="clientid")
    tenant_id=dbutils.secrets.get(scope="salesdwhscope", key="tenantid")
    client_secret=dbutils.secrets.get(scope="salesdwhscope", key="clientsecret")
    spark.conf.set("fs.azure.account.auth.type.salesdwh.dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type.salesdwh.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id.salesdwh.dfs.core.windows.net", client_id)
    spark.conf.set("fs.azure.account.oauth2.client.secret.salesdwh.dfs.core.windows.net", client_secret)
    spark.conf.set("fs.azure.account.oauth2.client.endpoint.salesdwh.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
from delta.tables import DeltaTable
def icremental_load(df_input,catalog_name,schema_name,table_name,merge_condition):
    """
    Perform an incremental load (Upsert) into a Delta Lake table.

    This function checks if a Delta table already exists in the given 
    catalog and schema:
      - If the table exists → it performs a MERGE operation between 
        the target table and the input DataFrame (`df_input`) using 
        the provided `merge_condition`.
          * Matching rows are updated (Update All).
          * Non-matching rows are inserted (Insert All).
      - If the table does not exist → it creates a new Delta table 
        and writes the input DataFrame as the initial load.

    Spark dynamic partition pruning is enabled for optimization.

    Parameters
    ----------
    df_input : pyspark.sql.DataFrame
        The source DataFrame containing new or updated records.

    catalog_name : str
        The Unity Catalog name (or hive_metastore if not using UC).

    schema_name : str
        The schema (database) name where the Delta table resides.

    table_name : str
        The target Delta table name.

    merge_condition : str
        SQL-style merge condition string (e.g. "tgt.id = src.id").

    Returns
    -------
    None
        The function writes/updates the Delta table in-place.
    """
    
    if (spark.catalog.tableExists(f"{catalog_name}.{schema_name}.{table_name}")):
        deltaTable = DeltaTable.forName(spark, f"{catalog_name}.{schema_name}.{table_name}")
        merge_result = (deltaTable.alias("tgt").merge(df_input.alias("src"), merge_condition)\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute())

        df_pandas = merge_result.toPandas()
        affected =df_pandas['num_affected_rows'].iloc[0]
        updated = df_pandas['num_updated_rows'].iloc[0]
        inserted = df_pandas['num_inserted_rows'].iloc[0]
        if updated == 0 and inserted == 0 and affected == 0 :
            message =f" No changes applied :[The affected rows = {affected}]-----[The updated rows = {updated}]----[The inserted rows = {inserted}] "
        else:
            message = f" Merge completed: [The affected rows = {affected}]-----[The updated rows = {updated}]----[The inserted rows ={inserted}] "
        return message

    else:
        df_input.write.format("delta").saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
        message = f" Table {catalog_name}.{schema_name}.{table_name}  is created the first one."
        return message


In [0]:
def clean_date_column(df, col_name):
    """
    Cleans a date column in a Spark DataFrame.

    Logic:
    - If the value = 0 OR the string length of the value is not 8 → set it to NULL.
    - Otherwise, cast the value to string and parse it as a date in format 'yyyyMMdd'.

    Parameters
    ----------
    df : pyspark.sql.DataFrame
        The input DataFrame.
    col_name : str
        The name of the column to clean and convert.

    Returns
    -------
    pyspark.sql.DataFrame
        A new DataFrame with the specified column cleaned and converted to a proper DateType.
    """
    return df.withColumn(col_name,F.when((F.col(col_name) == 0) | (F.length(F.col(col_name).cast("string")) != 8),None)\
        .otherwise(F.to_date(F.col(col_name).cast("string"), "yyyyMMdd")))

