This notebook is used to read data from bronze tables, transform it, and then populate the silver tables  

Transformations: 
- Make sure dimension tables dont have duplicates, only store recent data 
- Make sure there aren't any null values for High priority columns 

Fact Tables: Make sure they dont have nulls, duplicates MIGHT not be an issue  


* Only transforming the new data ingested in bronze, Using loaded_ts col for that because COPY INTO does not support DELTA format (Bz layer data is in delta format)

In [0]:
class silver():
    def __init__(self):
        self.silver_home = 'abfss://silver@datalakeselectivaproject.dfs.core.windows.net/edw'
        self.bronze_home = 'abfss://bronze@datalakeselectivaproject.dfs.core.windows.net/'

        self.expectedDims = ['channels/', 'customers/', 'products/', 'promotions/', 
                             'supplementary_demographics/', 'times/']
        self.expectedFacts = ['costs/', 'sales/']
        self.pk_dict = {'channels':'channel_id', 
           'costs': 'PROD_ID, CHANNEL_ID, TIME_ID, PROMO_ID', 
           'customers': 'CUST_ID', 
           'products': 'product_id', 
           'promotions': 'PROMO_ID', 
           'times': 'TIME_ID',
           'sales': 'PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID',
           'supplementary_demographics': 'CUST_ID'
           }

    def get_table_list(self, layer):
        if layer == 'silver':
            tableList = [t.name for t in dbutils.fs.ls(self.silver_home)]
            return tableList
        elif layer == 'bronze':
            tableList = [t.name for t in dbutils.fs.ls(self.bronze_home)]
            return tableList
        else:
            print(f'Invalid layer! cant get Table List for layer {layer}')

    ############################
    def get_recent_ts(self, table):
        from pyspark.sql.functions import max
        tgtTable = '`selectiva-project`.edw.' + table[:len(table) - 1] + '_edw'
        print(f"\tGetting most recent loaded_ts for: {tgtTable}")

        if spark._jsparkSession.catalog().tableExists(f"{tgtTable}"):
            df = spark.read.format('delta').option('header', 'true').table(tgtTable)
            if df.count() > 0:
                return (df.agg(max("loaded_ts")).collect()[0][0])
            else:
                return None
        else:
            return None

    def get_bz_table(self, table):
        bzTablePath = self.bronze_home + table 
        print(f"\tLoading Bronze Table at: {bzTablePath}")
        return (spark.read.format('delta').option('header', 'true').load(bzTablePath)
        )
    
    # only take the most recent data in bronzeDf based on the loaded_ts
    def filter_bz_table(self, bzDf, ts):
        from pyspark.sql.functions import col
        if ts is not None:
            print(f"\tFiltering Bronze Data on loaded_ts")
            return (bzDf.filter(col("loaded_ts") > ts))
        else:
            print("\tMost recent ts is Null")
            return bzDf
    
    # drop duplicates and null Primary key cols from the dataframe
    def clean_df(self, df, table):
        table_name = table[:len(table) - 1]
        pk_cols = self.pk_dict[table_name]
        if isinstance(pk_cols, str):
            pk_cols = [col.strip() for col in pk_cols.split(",")] 
        return (df.dropDuplicates()
                  .na.drop(subset=pk_cols)
        )

    # SCD type 1 For dimension tables 
    def write_dim(self, df, table):
        from delta import DeltaTable

        tableName = table[:len(table) - 1]
        pk = self.pk_dict[tableName]

        if table in self.expectedDims:
            print(f"\tAdding {tableName} to {self.silver_home}/{table}")
            silverDim = DeltaTable.forPath(spark, f"{self.silver_home}/{table}")
            (silverDim.alias("tgt")
                    .merge(df.alias("src"), f"src.{pk} = tgt.{pk}")
                    .whenMatchedUpdateAll()
                    .whenNotMatchedInsertAll()
                    .execute()
            )
    
    # Add All records, as long as no duplicates in silver table
    def write_fact(self, df, table):
        from delta import DeltaTable

        tableName = table[:len(table) - 1]
        pk_columns = self.pk_dict[tableName]
        pk_columns_list = [col.strip() for col in pk_columns.split(",")]
        pk_columns_list.append("loaded_ts")
        merge_condition = " AND ".join([f"src.{col} = tgt.{col}" for col in pk_columns_list])
        if table in self.expectedFacts:
            print(f"\tAdding {tableName} to {self.silver_home}/{table}")
            silverFact = DeltaTable.forPath(spark, f"{self.silver_home}/{table}")
            (silverFact.alias("tgt")
                    .merge(df.alias("src"), merge_condition)
                    .whenNotMatchedInsertAll()
                    .execute()
            )

    def process(self):
        bronzeList = self.get_table_list('bronze')
        silverList = self.get_table_list('silver')

        for table in bronzeList:
            if table in silverList and table in self.expectedDims:
                print(f"Dim table: {table}")
                bronzeDf = self.get_bz_table(table)
                recentTs = self.get_recent_ts(table)
                # get the most recent loaded_ts from bronze
                filteredBronzeDf = self.filter_bz_table(bronzeDf, recentTs)
                cleanedFilteredDf = self.clean_df(filteredBronzeDf, table)
                self.write_dim(cleanedFilteredDf, table)
            
            elif table in silverList and table in self.expectedFacts:
                print(f"Fact table: {table}")
                bronzeDf = self.get_bz_table(table)
                recentTs = self.get_recent_ts(table)
                # get the most recent loaded_ts from bronze
                filteredBronzeDf = self.filter_bz_table(bronzeDf, recentTs)
                cleanedFilteredDf = self.clean_df(filteredBronzeDf, table)
                self.write_fact(cleanedFilteredDf, table)

            print(f"Finished processing {table}. Added {filteredBronzeDf.count()} records!\n")




In [0]:
s = silver()
s.process()

Dim table: channels/
	Loading Bronze Table at: abfss://bronze@datalakeselectivaproject.dfs.core.windows.net/channels/
	Getting most recent loaded_ts for: `selectiva-project`.edw.channels_edw
	Filtering Bronze Data on loaded_ts
	Adding channels to abfss://silver@datalakeselectivaproject.dfs.core.windows.net/edw/channels/
Finished processing channels/. Added 0 records!

Fact table: costs/
	Loading Bronze Table at: abfss://bronze@datalakeselectivaproject.dfs.core.windows.net/costs/
	Getting most recent loaded_ts for: `selectiva-project`.edw.costs_edw
	Filtering Bronze Data on loaded_ts
	Adding costs to abfss://silver@datalakeselectivaproject.dfs.core.windows.net/edw/costs/
Finished processing costs/. Added 0 records!

Dim table: customers/
	Loading Bronze Table at: abfss://bronze@datalakeselectivaproject.dfs.core.windows.net/customers/
	Getting most recent loaded_ts for: `selectiva-project`.edw.customers_edw
	Filtering Bronze Data on loaded_ts
	Adding customers to abfss://silver@datalakes