## Silver DLT Pipeline Task 



### Global Imports

In [0]:
import dlt

from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Import the required modules from PySpark SQL
from pyspark.sql import SparkSession, Row
# from pyspark.sql.functions import col, lit, expr, when, upper, create_map, row_number
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, TimestampType, DateType, BooleanType, MapType, ShortType
# from pyspark.sql.utils import AnalysisException
from collections import defaultdict

import os
import sys
import re
from datetime import datetime


### Include folder path to include framework python files

#### Auto merges schema mismatch issues. 

In [0]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

### Parameters required for this DLT

In [0]:
"""
     
    1. Fetch the run_date based on the dimension_table and status = 'IN_PROGRESS' from the workflow_runtime_audit table.
    2. Only 1 DLT pipeline per dimension_table can be run at a time, so you should always see 1 workflow IN_PROGRESS for any given source and run_date
    # E.g. DF_GROUP_FACETS, DF_GROUP_VHP
"""

dimension_table = spark.conf.get("dimension_table")

catalog = spark.conf.get("catalog")
schema = spark.conf.get("schema")

# This parameter is used only for testing purpose, for multiple people working on their own DLT pipleines. This is not used in the production.
# Note: When using Databricks Asset Bundle, this can be handled automatically as a configuration
table_suffix = spark.conf.get("table_suffix")
if table_suffix == "N/A":
    table_suffix = ""
else:
    # only set for testing or individual developer testing their DLT's
    table_suffix = f"_{table_suffix}"

test_mode = spark.conf.get("test_mode")

# This is the time when the DLT pipeline is run. 
# TODO: Rename to dlt_start_time
processing_time = int(datetime.now().timestamp())
print(f"Parameters passed from the DLT pipeline: catalog:{catalog}, schema:{schema}, processing_time::{processing_time}")


### Framework specific imports

In [0]:
import sys, os
# This is required as there is no way to pass a libary to DLT right now.
sys.path.append(os.path.abspath('../../src'))

from framework.utils import *
from framework.dlt_utils import *
from framework.udf_utils import *


import importlib

module_name = f"framework.{dimension_table.lower()}.{dimension_table.lower()}_silver"
module = importlib.import_module(module_name)

### Static Variables

In [0]:
business_date_format="yyyyMMdd"
dlt_scd_sequence_column_name = "business_date"
# "loadtimestamp"
# "processing_time"

dlt_runtime_config_table = "dlt_runtime_config"

dlt_metadata_source_config = "dlt_meta_source_config"
dlt_metadata_cdc_config = "dlt_meta_cdc_config"
dlt_metadata_dq_config  = "dlt_meta_dq_config"
dlt_metadata_transform_config = "dlt_meta_transform_config"

dlt_sink_audit_table_name = f"dlt_dq_audit{table_suffix}"

dlt_stream_landing_table_name = f"dlt_landing_[TABLE_NAME]{table_suffix}"
dlt_stream_bronze_table_name = f"dlt_brz_[TABLE_NAME]{table_suffix}"
dlt_stream_silver_dq_table_name = f"dlt_slv_clean_[TABLE_NAME][STEP_NAME]{table_suffix}"
dlt_stream_silver_transform_table_name = f"dlt_slv_transform_[TABLE_NAME]{table_suffix}"
dlt_stream_silver_mapped_table_name = f"dlt_slv_mapped_[TABLE_NAME]{table_suffix}"
dlt_stream_silver_cdc_table_name = f"dlt_slv_scd_[TABLE_NAME]{table_suffix}"
# dlt_materialized_silver_target_table_name = f"[TABLE_NAME]{table_suffix}"
# dlt_materialized_gold_target_table_name = f"[TABLE_NAME]{table_suffix}"
dlt_stream_silver_scd_target_table = f"dlt_slv_scd_[TABLE_NAME]{table_suffix}"
dlt_stream_silver_target_table = f"dlt_[TABLE_NAME]{table_suffix}"


### DLT Logic starts here
This API  is responsible for creating the DLT pipelines for particular dimension table and target table

In [0]:
def run_preprocessing_dlt(
    p_spark,
    p_catalog,
    p_schema,
    p_dimension_table,
    p_target_table,
    p_dlt_metadata_source_config,
    p_business_date,
    p_processing_time,
    p_scenario,
    p_source_system,
    p_test_mode
):
    """
    Runs the Delta Live Tables (DLT) pipeline for the specified dimension table and target table,
    including Change Data Capture (CDC) processing at the end.

    Parameters:
    - p_dimension_table (str): The dimension table to process. E.g: D_GROUP
    - p_target_table (str): The target table to create or update. DF_GROUP, DF_GROUP_COUNT, DF_GROUP_HISTORY
    - p_business_date (str): The business date for the data.

    Returns:
    - None: This function does not return any value. It processes the DLT pipeline.
    """
    print(
        f"run_dlt:: for dimension_table:{p_dimension_table} and cdm table :{p_target_table} for business_date:{p_business_date} at  processing_time:{p_processing_time}"
    )

    # fetch the metadata from the metadata table
    source_metadata = get_source_metadata(
        p_spark=p_spark,
        p_catalog=p_catalog,
        p_schema=p_schema,
        p_dimension_table=p_dimension_table,
        p_source_metadata_table=p_dlt_metadata_source_config,
        p_target_table=p_target_table,
        p_business_date=p_business_date,
        p_scenario=p_scenario,
        p_source_system=p_source_system,
        p_test_mode = p_test_mode
    )

    print(f"run_dlt:: metadata_source_config::{source_metadata}")
    if source_metadata is None:
        return
    # source metadata row
    row = source_metadata
    # source name (E.g. DF_GROUP_FACETS, DF_GROUP_VHP)
    source = row.source_system 
    # row.source
    # source name (E.g. dbo, audit, cdc)
    source_schema = row.source_schema

    derived_input_table_list = [
        item.strip() for item in row.derived_input_tables.split(",")
    ]

    derived_target_table_list = [
        item.strip() for item in row.derived_target_tables.split(",")
    ]
    is_active = row.is_active
    print(       
        f"source_schema::{source_schema}\n"
        f"derived_input_table_lgenerate_bronzetablesist::{derived_input_table_list}\n"
        f"derived_target_table_list::{derived_target_table_list}"
        f"is_active::{is_active}"
    )

    for index, input_table in enumerate(derived_input_table_list):
        print(
            f"run_dlt:: for input_table:{input_table}, target_table_list:{derived_target_table_list[index]}"
        )
        # actual DF_GROUP, DF_GROUP_COUNT, DF_GROUP_HISTORY
        target_table = derived_target_table_list[index].lower()

        # Step 1: Handle Bronze
        dlt_source_table = get_table_name(dlt_stream_landing_table_name, input_table)
        dlt_target_table = get_table_name(dlt_stream_bronze_table_name, input_table)

        # TODO: The code fails if the landing zone delta table does not exist. Create an empty schema table in generate_bronze to handle this use case
        # try:
        #     spark.table(dlt_source_table)
        # except Exception:
        #     continue
        bronze_table_to_column_map = module.get_bronze_table_to_column_map()

        global_cols = get_bronze_table_columns(bronze_table_to_column_map, "GLOBALS", "all", "all", "global_columns")
        print(f"generate_bronzetables:: global_cols ::{global_cols}")
        required_cols = get_bronze_table_columns(
            bronze_table_to_column_map, source, source_schema, input_table, "required_columns"
        )
        additional_cols = get_bronze_table_columns(
            bronze_table_to_column_map, source, source_schema, input_table, "additional_columns"
        )
        print(f"generate_bronzetables:: additional_cols ::{additional_cols}")
        df = generate_bronzetables(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_dimension_table=p_dimension_table,
            p_input_table=input_table,
            p_dlt_source_table=dlt_source_table,
            p_dlt_target_table=dlt_target_table,
            p_source=source,
            p_source_schema=source_schema,
            p_business_date=p_business_date,
            p_business_date_format=business_date_format,
            p_processing_time=p_processing_time,
            p_global_cols=global_cols,
            p_required_cols=required_cols,
            p_additional_cols=additional_cols
        )

        # print(f"=====%%%%%%%%========bronzetables df::{df.collect()}")

        # Step 3: Data Quality validations
        # Step 1: FAIL
        dlt_source_table = dlt_target_table
        dlt_target_table = get_table_name(
            dlt_stream_silver_dq_table_name, input_table, "fail_step"
        )
        generate_dq_table(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_dimension_table=p_dimension_table,
            p_source_system = source,
            p_target_table=target_table,
            p_dlt_source_table=dlt_source_table,
            p_dlt_target_table=dlt_target_table,
            p_dlt_audit_table=dlt_sink_audit_table_name,
            p_dq_type="FAIL",
            p_dlt_metadata_dq_config=dlt_metadata_dq_config
        )

        # Step 2: QUARANTINE
        dlt_source_table = dlt_target_table
        dlt_target_table = get_table_name(
            dlt_stream_silver_dq_table_name, input_table, "quarantine_step"
        )
        generate_dq_table(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_dimension_table=p_dimension_table,
            p_source_system = source,
            p_target_table=target_table,
            p_dlt_source_table=dlt_source_table,
            p_dlt_target_table=dlt_target_table,
            p_dlt_audit_table=dlt_sink_audit_table_name,
            p_dq_type="QUARANTINE",
            p_dlt_metadata_dq_config=dlt_metadata_dq_config
        )

        # Step 3: WARN
        dlt_source_table = dlt_target_table
        dlt_target_table = get_table_name(
            dlt_stream_silver_dq_table_name, input_table, "warn_step"
        )
        generate_dq_table(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_dimension_table=p_dimension_table,
            p_source_system = source,
            p_target_table=target_table,
            p_dlt_source_table=dlt_source_table,
            p_dlt_target_table=dlt_target_table,
            p_dlt_audit_table=dlt_sink_audit_table_name,
            p_dq_type="WARN",
            p_dlt_metadata_dq_config=dlt_metadata_dq_config
        )

        # Apply simple transformation rules.
        dlt_source_table = dlt_target_table
        dlt_target_table = get_table_name(
            dlt_stream_silver_transform_table_name, input_table
        )

        generate_tranformation_rules_df(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_dimension_table=p_dimension_table,
            p_source_system = source,
            p_target_table=target_table,
            p_dlt_source_table=dlt_source_table,
            p_dlt_target_table=dlt_target_table,
            p_dlt_metadata_transform_config=dlt_metadata_transform_config
        )

        dlt_source_table = dlt_target_table
        dlt_target_table = get_table_name(
            f"{dlt_stream_silver_mapped_table_name}_{p_source_system.lower()}", target_table.lower()
        )
        generate_mapped_tables_by_source_system(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_source=source,
            p_dimension_table=p_dimension_table,
            p_input_table=input_table,
            p_target_table=target_table,
            p_dlt_source_table=dlt_source_table,
            p_dlt_target_table=dlt_target_table,
            p_business_date=p_business_date,
            
        )

        


        




In [0]:
def run_postprocessing_dlt(
    p_spark,
    p_catalog,
    p_schema,
    p_dlt_metadata_source_config,
    p_dimension_table,
    p_sources_metadata,
    scenario
):
    print("Starting run_postprocessing_dlt")
    # Final step
    target_tables_dict = get_target_tables_for_run(
        p_spark,
        catalog,
        schema,
        dlt_metadata_source_config,
        dimension_table,
        p_sources_metadata,
        scenario
    )

    for target_table, source_system_list in target_tables_dict.items():
        print(
            f"run_postprocessing_dlt: processing target_table:{target_table} and source_system_list:{source_system_list}"
        )
        dlt_target_table = get_table_name(
            dlt_stream_silver_mapped_table_name, target_table.lower()
        )
        dlt.create_streaming_table(dlt_target_table)

        #  Code to generate df tables from multiple source system
        for source_system in source_system_list:
            dlt_source_table = get_table_name(
                f"{dlt_stream_silver_mapped_table_name}_{source_system.lower()}",
                target_table.lower(),
            )
            generate_df_append_tables(
                p_spark=p_spark,
                p_catalog=catalog,
                p_schema=schema,
                p_dlt_source_table=dlt_source_table,
                p_dlt_target_table=dlt_target_table,
            )

        scd_row = get_scd_attributes(
            p_spark=p_spark,
            p_catalog=p_catalog,
            p_schema=p_schema,
            p_dlt_metadata_cdc_config=dlt_metadata_cdc_config,
            p_target_table=target_table,
        )
        print(f"run_postprocessing_dlt:: scd_row::{scd_row}")
        # For History Load , no CDC
        if scd_row is not None:
            dlt_source_table = dlt_target_table
            dlt_target_table = get_table_name(
                f"{dlt_stream_silver_scd_target_table}",
                target_table.lower(),
            )

            key_attr = scd_row.key_attr
            scd_type = scd_row.scd_type
            exclude_columns_list = scd_row.exclude_columns
            sequence_column_name = scd_row.sequence_col
            df = generate_scd_tables(
                p_target_table=target_table,
                p_dlt_source_table=dlt_source_table,
                p_dlt_target_table=dlt_target_table,
                p_dlt_stream_silver_target_table=dlt_stream_silver_target_table,
                p_keys=key_attr,
                p_seq_col=sequence_column_name,
                p_scd_type=scd_type,
                p_exclude_column_list=exclude_columns_list
            )

        else:
            raise Exception(
                "run_postprocessing_dlt:: No SCD attributes found for target table::{target_table}. This condition needs to be handled"
            )

### Execution starts here
This section initiates the execution of the defined DLT flow.   
- Reads the data from dim_group_runtime_metadata, which is set from the ControlM job (run_entity_job for POC)
- Runs the DLT pipeline


In [0]:
runtime_params_row = get_runtime_parameters(
    spark, catalog, schema, dlt_runtime_config_table, dimension_table
)

dimension_table = runtime_params_row["dimension_table"]
source_params = runtime_params_row["source_params"]  # this is a dict
scenario = runtime_params_row["scenario"]

print(f"\nüîç Processing Dimension Table: {dimension_table}")

for key, param_value_map in source_params.items():
    target_table = param_value_map["target_table"]
    business_date = param_value_map["business_date"]
    source_system = param_value_map["source_system"]
    # load_type = "incremental"

    # Set to default value for historical load
    # if business_date is None or business_date == "" or business_date == "N/A":
    #     business_date = "1900-01-01"
    #     load_type = "one_time"

    # Custom logic here
    print(
        f"Start: DLT pipeline for : target_table:: {target_table} for business Date: {business_date}"
    )

    run_preprocessing_dlt(
        spark,
        catalog,
        schema,
        dimension_table,
        target_table,
        dlt_metadata_source_config,
        business_date,
        p_processing_time = processing_time,
        p_scenario=scenario,
        p_source_system=source_system,
        p_test_mode=test_mode
    )

run_postprocessing_dlt(
    spark,
    catalog,
    schema,
    dlt_metadata_source_config,
    dimension_table,
    source_params,
    scenario
)
