In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from  pyspark.sql.functions import input_file_name as N
from pyspark.sql.functions import current_timestamp
from pyspark.sql import functions as F
import json
#from pyspark.sql.functions import md5

def dlt_framework(source_table,file_location,file_type,schema,lookup_keys,source_keys,source_sequence,drop_expectations):

  @dlt.table(name=f"framework_bronze.{source_table}")
  def bronze():
    bronze_df = (spark.readStream.format('cloudFiles') 
                .option('cloudFiles.format', file_type) 
                .load('/Volumes/hls_de_workshop_dev/bronze/raw_data'+file_location) #need to figure out how to reference variable
                .select("*","_metadata")
                )

    return bronze_df
  

  @dlt.table(name=f"framework_silver.{source_table}_insert")
  def silver_insert():
    silver_insert_df = (dlt.readStream(f"framework_bronze.{source_table}")
            .withColumn("insert_timestamp", current_timestamp())
      )
    sm = json.loads(schema)
    #set the new name and data types for fields
    for mapping in sm:
      silver_insert_df = silver_insert_df.withColumnRenamed(mapping["source_column"], mapping["new_column"])
      silver_insert_df = silver_insert_df.withColumn(mapping["new_column"], silver_insert_df[mapping["new_column"]].cast(mapping["data_type"]))
    
    #use the lookups table to tranform fields to the lookup value
    if lookup_keys != "":
      lookups_df = spark.read.table("hls_de_workshop_dev.bronze.lookups").drop("insert_timestamp")
      lookup_keys_str = lookup_keys
      lookups = json.loads(lookup_keys_str)
      for lookup in lookups:
        for key, value in lookup.items():
            silver_insert_df = silver_insert_df.join(
                lookups_df,
                (silver_insert_df[key] == lookups_df.code) & (lookups_df.variable == f'{key}'),
                "left"
            ).withColumn(f"{value}", lookups_df.label).withColumn(f"{source_table}_key", md5(col(source_keys)))\
            .drop(lookups_df.code, lookups_df.variable, lookups_df.label,lookups_df._rescued_data, lookups_df._metadata, silver_insert_df[key])

    return silver_insert_df
  
  dlt.create_streaming_live_table(f"framework_silver.{source_table}")
      
  dlt.apply_changes(
    target = f"framework_silver.{source_table}",
    source = f"framework_silver.{source_table}_insert",
    keys = [source_keys],
    sequence_by = F.col(source_sequence),
    stored_as_scd_type = 1
  )



In [0]:
df = spark.sql("select * from hls_de_workshop_dev.metadata_framework.metadata where is_active = 1")
#df= df.withColumn("keyList",split(col("keys"),","))


for row in df.collect():
  dlt_framework(row['table'],row['file_location'],row['file_type'],row['schema_mapping'],row['lookups'],row['keys'],row['sequence'],row['drop_expectations'])