In [0]:
import dlt
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Step 0: Define the schema for the customer CSVs
customer_sch = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", IntegerType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True)
])

@dlt.table(
    name="bronze_raw",
    comment="Streaming ingestion of customer data from source1 and source2"
)
def customers_raw():
    # Step 1: Load static configuration table for transformations
    config_df = spark.table("`unity-veersa`.config_tables.sources_config")

    # Step 2: Read streaming sources
    input_df1 = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(customer_sch)
        .load("abfss://load-1@veersastorage.dfs.core.windows.net/")
    )

    input_df2 = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(customer_sch)
        .load("abfss://load-2@veersastorage.dfs.core.windows.net/")
    )

    # Step 3: Union the streams
    input_df = input_df1.unionByName(input_df2, allowMissingColumns=True)

    # Step 4: Apply config-driven transformations
    # Using a collect on config_df is OK because config should be small
    for row in config_df.collect():
        src = row['column_name']
        tgt = row['alias_name']
        transformation = row['expression']
        validation = row['validation_rule']
        is_required = row['is_active']
        default_value = row['default_value']
        dtype = row['data_type']

        # Apply transformation if expression is provided
        if transformation and transformation.strip():
            input_df = input_df.withColumn(tgt, F.expr(transformation))
        elif src in input_df.columns:
            # Rename column if no transformation expression
            input_df = input_df.withColumnRenamed(src, tgt)

        # Cast to desired data type if provided
        if dtype:
            input_df = input_df.withColumn(tgt, F.col(tgt).cast(dtype))

        # Fill default values or filter required fields
        if default_value is not None and default_value != "":
            input_df = input_df.fillna({tgt: default_value})
        elif is_required:
            input_df = input_df.filter(F.col(tgt).isNotNull())

        # Apply regex validation if specified
        if validation and validation.startswith('REGEX:'):
            pattern = validation.split(':', 1)[1]
            input_df = input_df.filter(F.col(tgt).rlike(pattern))

    return input_df

