In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.functions import col,expr,lit
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DateType, DoubleType
from pyspark.sql.session import SparkSession
from pyspark.sql import Row
import json

In [0]:
from pyspark.sql.types import TimestampType

def generate_schema(input_df):
    """
    Generate a pyspark schema based on specified data type mapping

    Parameters:
     - input_df: PySpark DataFrame containing columns "TargetDataType," "ColumnName," and "ColumnOrder."
    Returns:
        -schema: PySpark StructType Schema.

    """
    # Define the data type mapping
    data_type_mapping ={
        "StringType":StringType(),
        "IntegerType":IntegerType(),
        "DoubleType":DoubleType(),
        "DateType":DateType(),
        "datetime":TimestampType(),
    }

    # Collect Distinct Values of "TargetDataType", " ColumnName," and "ColumnOrder"
    distinct_datatypes = (
        input_df.select("columndatatype","columnname","columnorder").distinct().collect()
    )

    # Sort Distinct Datatypes based on "ColumnOrder"
    distinct_datatypes = sorted(distinct_datatypes, key=lambda x: x.columnorder)

    # Create Schema Fields
    schema_fields = [
        StructField(row.columnname, data_type_mapping[row.columndatatype], True)
        for row in distinct_datatypes
    ]

    # Create and Return the Schema
    schema = StructType(schema_fields)
    return schema


In [0]:
def dlt_ingestion_metadata_function(row:Row, schema: StructType):

    table_name = row['sourcetablename']
    checks = row['sourcedataquality']
    checks = json.loads(checks)
    keys = ["circuitId"]
    sequence_by = row['sequenceby']
    file_path = row['sourcefilepath']
    cloud_file_options = eval(row['sourcefileoptions'])
    dq_rules = "({0})".format(" and ".join(checks.values()))

    @dlt.table(
        name="brz_load_"+table_name
    )

    def bronze_load():
        df = spark.read.format("csv").options(**cloud_file_options).schema(schema).load(file_path)
        df = df.withColumn("file_processed_date",F.date_format(F.current_timestamp(),"yyyy-MM-dd HH:mm:ss"))
        return df

    @dlt.table(
        name = "stag_silver_load_"+table_name,
    )

    @dlt.expect_all(checks)

    def stag_silver_table():
        df = dlt.readStream("brz_load_"+table_name)
        df = df.withColumn("dq_check",F.expr(dq_rules)).filter("dq_check=true")
        return df
    
    dlt.create_streaming_table(
        name="silver_load_"+table_name
    )

    dlt.apply_changes(
        target="silver_load_"+table_name,
        source="stag_silver_load_"+table_name,
        keys=keys,
        stored_as_scd_type="1",
        sequence_by=sequence_by
    )
    @dlt.table(
        name="err_silver_load_"+table_name,
    )
    @dlt.expect_all(checks)
    def err_silver_load_circuit():
        df=dlt.readStream("brz_load_"+table_name)
        df=df.withColumn("dq_check",F.expr(dq_rules)).filter("dq_check=false")
        return df



In [0]:
query = "SELECT * from `poc`.`sourcemetadata` where sourcemetadaid = 3"
df = spark.sql(query)
for row in df.collect():

    #print(row['sourcemetadaID'])
    schema_query = f"(select * from `poc`.`sourceschemaconfig` where sourcemetadataid = {row['sourcemetadaID']})"
    df_schema = spark.sql(schema_query)
    print(df_schema)
    schema=generate_schema(df_schema)
    dlt_ingestion_metadata_function(row=row,schema=schema)

