In [ ]:
import requests
import json
import uuid
from requests.auth import HTTPBasicAuth
import logging
import csv
import pandas as pd
from io import StringIO
from pyspark.sql.window import Window as W

from datetime import datetime, timedelta
from notebookutils import mssparkutils
import threading
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import concurrent.futures
from itertools import repeat
import time
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc

logger = logging.getLogger('EdFiClient')

In [ ]:
%run OEA/modules/Ed-Fi/v0.7/src/utilities/edfi_v0_7_oea_py

### EdFi Extended Module of OEA

In [ ]:
class EdFiOEAChild(OEA):
    """ 
    NOTE: This class inherits features from the base class OEA and therefore,
    should be created / executed after running the notebook OEA_py
    """
    def __init__(self, workspace='dev', logging_level=logging.INFO, storage_account=None, keyvault=None, timezone=None):
        # Call the base class constructor to initialize inherited attributes
        super().__init__(workspace, logging_level, storage_account, keyvault, timezone)
        self.ingestionHistoryMode = False
        self.surrogateKeyMode = False
    
    def get_latest_changes(self, source_path, sink_path, filtering_date = 'LastModifiedDate',primary_key = ['id'],debugMode = False):
        """ Returns a dataframe representing the changes in the source data based on the max rundate in the sink data. 
            If the sink path is not found, all of the data from the source_path is returned (the assumption is that the sink delta table is being created for the first time).
            eg, get_latest_changes('stage2/Ingested/contoso/v0.1/students', 'stage2/Refined/contoso/v0.1/students')
        """   
        maxdatetime = None
        try:
            sink_df = self.query(sink_path, f'select max({filtering_date}) maxdatetime')
            maxdatetime = sink_df.first()['maxdatetime']
        except AnalysisException as e:
            # This means that there is no delta table at the sink_path yet.
            # We'll assume that the sink delta table is being created for the first time, meaning that all of the source data should be returned.
            pass

        changes_df = self.load(source_path)
        if maxdatetime and not(debugMode):
            # filter the source table for the latest changes (using the max rundate in the destination table as the watermark)
            changes_df = changes_df.where(f"{filtering_date} > '{maxdatetime}'")        
        
        if self.ingestionHistoryMode:
            table_name = source_path.split('/')[-1]
            # logger.info(f"{table_name}: Before De-Duplication: {changes_df.count()}")
            changes_df = self.get_deduplicated_records_by_datetime(df = changes_df,
                                                                    primary_key = primary_key,
                                                                    date_col = filtering_date)
            # logger.info(f"{table_name}: After De-Duplication: {changes_df.count()}")
        return changes_df
    
    def get_deduplicated_records_by_datetime(self, df, primary_key = ['lakeId'], date_col = 'rundate'):
        window = Window.partitionBy(*primary_key).orderBy(desc(date_col))
        df = df.withColumn('row', f.row_number().over(window))
        df = df.filter(f.col('row') == 1)
        df = df.drop('row')
        return df

    def process(self, source_path,foreach_batch_function, batch_type, natural_key = None,landingDateTimeFormat = 'yyyyMMddHHmmss',options={}):
        # FIXME: 2024-02-08 (Under Dev for high granularity and de-dup)
        """ This simplifies the process of using structured streaming when processing transformations.
            Provide a source_path and a function that receives a dataframe to work with (which will be a dataframe with data from the given source_path).
            Use it like this...
            def refine_contoso_dataset(df_source):
                metadata = oea.get_metadata_from_url('https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/gene/v0.7dev/modules/module_catalog/Student_and_School_Data_Systems/metadata.csv')
                df_pseudo, df_lookup = oea.pseudonymize(df, metadata['studentattendance'])
                oea.upsert(df_pseudo, 'stage2/Refined/contoso_sis/v0.1/studentattendance/general')
                oea.upsert(df_lookup, 'stage2/Refined/contoso_sis/v0.1/studentattendance/sensitive')
            oea.process('stage2/Ingested/contoso_sis/v0.1/studentattendance', refine_contoso_dataset)             
        """
        if not self.path_exists(source_path):
            raise ValueError(f'The given path does not exist: {source_path} (which resolves to: {self.to_url(source_path)})') 

        if natural_key is not None:
            natural_key_expr = [f.col(key_component).cast('string') for key_component in natural_key]
        
        def wrapped_function(df, batch_id):
            current_timestamp = datetime.now()
            df = df.withColumn('LastModifiedDate', F.lit(current_timestamp))
            df = df.withColumn("rundate", F.to_timestamp(F.col("rundate").cast('string'), landingDateTimeFormat))
            # df = df.orderBy(F.col("rundate").desc()).dropDuplicates(["id"])
            if natural_key is not None:
                df = df.withColumn("NATURAL_KEY_HASH",F.sha2(F.concat(*[F.concat(F.coalesce(column, F.lit('')), F.lit('_')) for column in natural_key_expr]), 256))
            if batch_type != 'delete':
                df = df.withColumn("rowIsActive", F.lit(True))
            
            df.persist() # cache the df so it doesn't get read in multiple times when we write to multiple destinations. See: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
            foreach_batch_function(df, batch_id)
            df.unpersist()

        spark.sql("set spark.sql.streaming.schemaInference=true")
        #source_path = source_path.replace(':', '\:')
        print(f"source_path is: {source_path}")
        streaming_df = spark.readStream.load(self.to_url(source_path), **options)
        streaming_df = streaming_df.withColumn('stage1_source_url', F.input_file_name())

        # for more info on append vs complete vs update modes for structured streaming: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts
        query = streaming_df.writeStream.format('delta').outputMode('append').trigger(once=True).option('checkpointLocation', self.to_url(source_path) + '/_checkpoints').foreachBatch(wrapped_function).start()
        query.awaitTermination()   # block until query is terminated, with stop() or with error; A StreamingQueryException will be thrown if an exception occurs.
        number_of_new_inbound_rows = query.lastProgress["numInputRows"]
        logger.info(f'[EDFIOEACHILD INGESTION STRUCTURED STREAMING PROCESS]: Number of new inbound rows processed: {number_of_new_inbound_rows}')
        logger.debug(query.lastProgress)
        return number_of_new_inbound_rows
    
    def return_pk_statement(self, pk_columns):
        pk_statement = ""
        for i, column in enumerate(pk_columns):
            pk_statement += f"sink.{column} = updates.{column}"
            if i < len(pk_columns) - 1:
                pk_statement += " AND "
        
        return pk_statement

    def return_upsert_cols(self,
                       columns, 
                       partitioning_cols, 
                       primary_key,
                       upsert_type,
                       skey = None):
        if type(primary_key) == list:
            pass
        else:
            primary_key = [primary_key]
        
        if upsert_type == 'update':
            if skey is not None:
                if type(skey) == list:
                    pass
                else:
                    skey = [skey]
                
                iter_columns = list(set(columns) - set(partitioning_cols) - set(primary_key) - set(skey))
            else:
                iter_columns = list(set(columns) - set(partitioning_cols) - set(primary_key))
            update_cols = dict()
            for column in iter_columns:
                update_cols[f"sink.{column}"] = f"updates.{column}"
            return update_cols
        
        elif upsert_type == 'insert':
            iter_columns = list(set(columns) - set(partitioning_cols))
            insert_cols = dict()
            for column in iter_columns:
                insert_cols[f"sink.{column}"] = f"updates.{column}"
            return insert_cols              

    def return_upsert_metrics(self, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated, **kwargs):
        numInputRows = kwargs['df'].count()
        if not(numOutputRows is None and numTargetRowsInserted is None and numTargetRowsUpdated is None):
            # NOTE: For subsequent calls to upsert (not initial write)
            first_row = kwargs['delta_table_sink'].history(1).select(
                'operationMetrics.numOutputRows', 
                'operationMetrics.numTargetRowsInserted', 
                'operationMetrics.numTargetRowsUpdated'
            ).first()

            numOutputRows = int(first_row[0]) if first_row[0] is not None else 0
            numTargetRowsInserted = int(first_row[1]) if first_row[1] is not None else 0
            numTargetRowsUpdated = int(first_row[2]) if first_row[2] is not None else 0
        else:
            # NOTE: For First Run 
            numOutputRows = int(numOutputRows) if numOutputRows is not None else numInputRows
            numTargetRowsInserted = int(numTargetRowsInserted) if numTargetRowsInserted is not None else numInputRows
            numTargetRowsUpdated = int(numTargetRowsUpdated) if numTargetRowsUpdated is not None else 0

        return numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated

    def are_partition_cols_pure(self, df, partitioning_cols):
        purity_indicator = 1
        if df.count() == 0:
            return purity_indicator == 1
        
        for partitioning_col in partitioning_cols:
            distinct_count = df.select(partitioning_col).distinct().count()
            purity_indicator *= int(distinct_count == 1)
        return purity_indicator == 1
    
    def construct_partitioned_url(self, incomingPartitionsCache, partitioning_cols, destination_path):
        # FIXME: 2024-02-07 Under Refactoring
        partitioning_dict = dict()
        for partitioning_col in partitioning_cols:
            partitioning_dict[partitioning_col] = incomingPartitionsCache[partitioning_col]
        destination_partition_path = destination_path
        for component_key, component_value in partitioning_dict.items():
            destination_partition_path = f"{destination_path}/{component_key}={component_value}"
        return self.to_url(destination_partition_path)

    def get_df_deDuplicated_records(self, df, partitioning, primary_key, partitioning_cols):
        if partitioning and type(primary_key) != list:
            df = df.dropDuplicates([primary_key] + partitioning_cols)
        elif partitioning and type(primary_key) == list:
            df = df.dropDuplicates(primary_key + partitioning_cols)
        elif not(partitioning) and type(primary_key) != list:
            df = df.dropDuplicates([primary_key])
        elif not(partitioning) and type(primary_key) == list:
            df = df.dropDuplicates(primary_key)
        else:
            df = df
        return df

    def get_df_latest_records_by_join(self, df, destination_path, primary_key = 'NATURAL_KEY_HASH',func_enabled = False):
        if not func_enabled:
            return df
        else:
            logger.info('[EDFIOEACHILD REFINEMENT RECORD HASHING] JOIN BASED COMPARSIONS / DELTA COMPARISONS BEFORE UPSERT IS ENABLED')
            df = df.withColumnRenamed('RECORD_VERSION', 'RECORD_VERSION_LEFT')
            
            df_destination = self.load(destination_path)            
            df.createOrReplaceTempView('temp_vw_df_source_table')
            df_destination.createOrReplaceTempView('temp_vw_df_destination_table')

            if type(primary_key) == str:
                query = f"""SELECT temp_vw_df_source_table.*, 
                                temp_vw_df_destination_table.RECORD_VERSION
                            FROM temp_vw_df_source_table 
                            LEFT JOIN temp_vw_df_destination_table 
                                ON temp_vw_df_source_table.{primary_key} = temp_vw_df_destination_table.{primary_key}
                            WHERE (temp_vw_df_source_table.RECORD_HASH != temp_vw_df_destination_table.RECORD_HASH)
                            OR (temp_vw_df_destination_table.RECORD_HASH IS NULL)
                        """
            else:
                # FIXME: 2024-02-22 To Be Dev
                logger.info(f"[EDFIOEACHILD REFINEMENT RECORD HASHING] Module does not support list as of now")

            df_joined = spark.sql(query)
            df_joined = df_joined.withColumn('RECORD_VERSION', F.col('RECORD_VERSION') + 1)
            df_joined = df_joined.drop('RECORD_VERSION_LEFT')

            logger.info(f"[EDFIOEACHILD REFINEMENT RECORD HASHING] --- NUM ROWS (SOURCE DELTA LAKE) - {df.count()}")
            logger.info(f"[EDFIOEACHILD REFINEMENT RECORD HASHING] --- NUM ROWS (DESTINATION DELTA LAKE) - {df_destination.count()}")
            logger.info(f'[EDFIOEACHILD REFINEMENT RECORD HASHING] --- NUM ROWS (ACTUALLY MODIFIED) - {df_joined.count()}')
            return df_joined
    
    def add_and_set_record_version1(self, df, etl_table):
        # FIXME: 2024-01-25: Temporary fix - Need to be refactor
        if etl_table:
            df = df.withColumn('RECORD_VERSION', F.lit(1)) # FIXME: 2024-01-25: Under Review 
        return df
    
    def add_surrogate_key_to_df(self, df, primary_key, surrogate_key = False,func_enabled = False):
        skey = None
        if func_enabled:
            if type(primary_key) == list:
                pk_statement = self.return_pk_statement(primary_key)
                skey = list()
                for pk_component in primary_key:
                    sk_component = pk_component[:-4] + 'SKey'
                    skey.append(sk_component)
            else:
                skey = primary_key[:-4] + 'SKey'
                if 'hkey' in primary_key.lower():
                    surrogate_key = True
                pk_statement = self.return_pk_statement([primary_key])
            
            if surrogate_key:
                if type(primary_key) == list:
                    for index, pk_component in enumerate(primary_key):
                        sk_component = skey[index]
                        df = df.withColumn('row_id_label', (F.monotonically_increasing_id()))
                        windowSpec = W.orderBy("row_id_label")
                        df = df.withColumn("row_id_label", F.row_number().over(windowSpec))
                        
                        df = df.withColumn(sk_component, F.when((F.col(pk_component).isNull()) | (F.col(sk_component) == -1), -1).otherwise(F.col('row_id_label')))
                        df = df.drop('row_id_label')
                else:
                    df = df.withColumn('row_id_label', (F.monotonically_increasing_id()))
                    windowSpec = W.orderBy("row_id_label")
                    df = df.withColumn("row_id_label", F.row_number().over(windowSpec))
                    
                    df = df.withColumn(skey, F.when((F.col(primary_key).isNull()) | (F.col(skey) == -1), -1).otherwise(F.col('row_id_label')))
                    df = df.drop('row_id_label')
        else:
            pass
        return df, skey, surrogate_key

    def upsert(self, 
               df, 
               destination_path, 
               primary_key='id', 
               partitioning=False, 
               partitioning_cols = [], 
               surrogate_key = False,
               join_based_upsert = False, 
               de_duplicate = True):
        # FIXME: Re-check Skey logic when maxSkey is None (not Int)
        # FIXME: Current Implementation may not work when partitioned df has mix of partititions
        """ Upserts the data in the given dataframe into the specified destination using the given primary_key_column to identify the updates.
            If there is no delta table found in the destination_path, one will be created.    
        """
        overwrite = True
        if partitioning:
            partitioning_cols = partitioning_cols
        else:
            partitioning_cols = []
        
        if not(self.are_partition_cols_pure(df, partitioning_cols)):
            raise Exception("[EDFIOEACHILD UPSERT] Partition column with non-unique values is present - current implementation does not support this")
    
        skey = None
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)
        if type(primary_key) == list:
            pk_statement = self.return_pk_statement(primary_key)
        else:
            pk_statement = self.return_pk_statement([primary_key])
        if surrogate_key:
            df, skey, surrogate_key = self.add_surrogate_key_to_df(df = df, 
                                                                   primary_key = primary_key, 
                                                                   surrogate_key = surrogate_key,
                                                                   func_enabled = self.surrogateKeyMode)
        else:
            # NOTE: Do not De-Duplicate when surrogate key is present
            if de_duplicate:
                df = self.get_df_deDuplicated_records(df, partitioning, primary_key, partitioning_cols)

        if DeltaTable.isDeltaTable(spark, destination_url) and df.count() <= 0:
            logger.info("[EDFIOEACHILD UPSERT] No Ingress Records")
            numInputRows = numOutputRows = numTargetRowsInserted = numTargetRowsUpdated = 0
        #FIX -2024-01-31 - 2024-03-07 Overwrite implementation
        #elif DeltaTable.isDeltaTable(spark, destination_url) and df.count() > 0:
        
        elif DeltaTable.isDeltaTable(spark, destination_url) and not overwrite:
            delta_table_sink = DeltaTable.forPath(spark, destination_url)
            if surrogate_key:
                if type(primary_key) == list:
                    for index, pk_component in enumerate(primary_key):
                        sk_component = skey[index]
                        sink_df = self.query(destination_path, f'select max({sk_component}) max_skey')
                        max_skey = int(sink_df.first()['max_skey'])
                        # df = df.withColumn(skey, F.col(skey) + max_skey)
                        df = df.withColumn(sk_component, F.when(F.col(sk_component) == -1, F.col(sk_component)).otherwise(F.col(sk_component) + max_skey))
                else:
                    sink_df = self.query(destination_path, f'select max({skey}) max_skey')
                    max_skey = int(sink_df.first()['max_skey'])
                    # df = df.withColumn(skey, F.col(skey) + max_skey)
                    df = df.withColumn(skey, F.when(F.col(skey) == -1, F.col(skey)).otherwise(F.col(skey) + max_skey))
            
            # FIXME: 2024-02-07 Under Dev (Renewed Logic)
            incomingPartitionsCache = df.select(*partitioning_cols).distinct().first()
            destination_partition_url = self.construct_partitioned_url(incomingPartitionsCache,partitioning_cols, destination_path)
            if DeltaTable.isDeltaTable(spark, destination_partition_url):
                # NOTE: MERGE into a partition if it already exists
                logger.info('[EDFIOEACHILD UPSERT] Upsert by Partitions + PK Cols')
                if surrogate_key:
                    update_cols = self.return_upsert_cols(df.columns, partitioning_cols, primary_key, 'update', skey)
                    insert_cols = self.return_upsert_cols(df.columns, partitioning_cols, primary_key, 'insert', None)
                    delta_table_sink.alias('sink').merge(df.alias('updates'), pk_statement).whenMatchedUpdate(set = update_cols).whenNotMatchedInsert(values = insert_cols).execute()
                else:
                    logger.info('[EDFIOEACHILD UPSERT] TRUE UPSERT')
                    df = self.get_df_latest_records_by_join(df, destination_path, primary_key = primary_key, func_enabled = join_based_upsert)
                    if df.count() > 0: 
                        delta_table_sink.alias('sink').merge(df.alias('updates'), pk_statement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()    
            else:
                # NOTE: Dynamically create a new partition if it does not exist
                # FIXME: ELSE CONDITION RELEVANCE AND EFFECT TO BE REVIEWED
                # FIXME: If partitioning_dict does not have a valid a dictionary; this may execute and corrupt data by overwriting partitions
                logger.info('[EDFIOEACHILD UPSERT] Dynamically over-write the partition')
                spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
                
                # df = self.add_and_set_record_version1(df, etl_table = etl_table)
                df.write.format('delta').mode('overwrite').partitionBy(*partitioning_cols).save(destination_url)
            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.return_upsert_metrics(numOutputRows = 0, 
                                                                                                                  numTargetRowsInserted = 0, 
                                                                                                                  numTargetRowsUpdated = 0, 
                                                                                                                  df = df,
                                                                                                                  delta_table_sink = delta_table_sink)

        elif overwrite:
            logger.info('Overwriting existing delta table found')
            if not(partitioning):
                logger.info('Writing unpartitioned delta lake')
                df.write.format('delta').mode('overwrite').save(destination_url)
            else:
                if partitioning and len(partitioning_cols) == 0:
                    logger.info('Partitioning columns absent - defaulting to DistrictId and SchoolYear as partitioning columns')
                    df.write.format('delta').mode('overwrite').partitionBy('DistrictId', 'SchoolYear').save(destination_url)
                else:
                    partitioning_str = ', '.join(partitioning_cols)
                    logger.info(f'Writing partitioned delta lake - partitioned by - {partitioning_str}')
                    df.write.format('delta').mode('overwrite').partitionBy(*partitioning_cols).save(destination_url)

            first_row = [None, None, None]
            numOutputRows = int(first_row[0]) if first_row[0] is not None else df.count()
            numTargetRowsInserted = int(first_row[1]) if first_row[1] is not None else df.count()
            numTargetRowsUpdated = int(first_row[2]) if first_row[2] is not None else 0

            numInputRows = df.count()
            return numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated


        else:
            logger.debug('[EDFIOEACHILD UPSERT] No existing delta table found. Creating delta table.')
            # df = self.add_and_set_record_version1(df, etl_table = etl_table)
            if not(partitioning):
                logger.info('[EDFIOEACHILD UPSERT] Writing unpartitioned delta lake')
                df.write.format('delta').save(destination_url)
            else:
                if partitioning and len(partitioning_cols) == 0:
                    logger.info('[EDFIOEACHILD UPSERT] Partitioning columns absent - defaulting to DistrictId and SchoolYear as partitioning columns')
                    df.write.format('delta').partitionBy('DistrictId', 'SchoolYear').save(destination_url)
                else:
                    partitioning_str = ', '.join(partitioning_cols)
                    logger.info(f'[EDFIOEACHILD UPSERT] Writing partitioned delta lake - partitioned by - {partitioning_str}')
                    df.write.format('delta').partitionBy(*partitioning_cols).save(destination_url)

            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.return_upsert_metrics(numOutputRows = None, 
                                                                                                                  numTargetRowsInserted = None, 
                                                                                                                  numTargetRowsUpdated = None, 
                                                                                                                  df = df)
        return numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated

    def delete_then_insert(self, df, destination_path, primary_key='id', partitioning=False, partitioning_cols = [], surrogate_key = False):
        # FIXME: Re-check Skey logic when maxSkey is None (not Int)
        """ Upserts the data in the given dataframe into the specified destination using the given primary_key_column to identify the updates.
            If there is no delta table found in the destination_path, one will be created.    
        """
        if partitioning:
            partitioning_cols = partitioning_cols
        else:
            partitioning_cols = []
        
        skey = None
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)
        if type(primary_key) == list:
            pk_statement = self.return_pk_statement(primary_key)
            skey = list()
            for pk_component in primary_key:
                sk_component = pk_component[:-4] + 'SKey'
                skey.append(sk_component)
        else:
            skey = primary_key[:-4] + 'SKey'
            if 'hkey' in primary_key.lower():
                surrogate_key = True
            pk_statement = self.return_pk_statement([primary_key])
        
        df_original = df
        
        if surrogate_key:
            df, skey, surrogate_key = self.add_surrogate_key_to_df(df = df, 
                                                                   primary_key = primary_key, 
                                                                   surrogate_key = surrogate_key,
                                                                   func_enabled = self.surrogateKeyMode)
            df_original = df
        else:
            pass
        
        if DeltaTable.isDeltaTable(spark, destination_url) and df.count() <= 0:
            logger.info("[EDFIOEACHILD DELETE THEN INSERT] No Ingress Records")
            numInputRows = numOutputRows = numTargetRowsInserted = numTargetRowsUpdated = 0

        elif DeltaTable.isDeltaTable(spark, destination_url) and df.count() > 0:
            delta_table_sink = DeltaTable.forPath(spark, destination_url)
            if surrogate_key:
                if type(primary_key) == list:
                    for index, pk_component in enumerate(primary_key):
                        sk_component = skey[index]
                        sink_df = self.query(destination_path, f'select max({sk_component}) max_skey')
                        max_skey = int(sink_df.first()['max_skey'])
                        # df = df.withColumn(skey, F.col(skey) + max_skey)
                        df = df.withColumn(sk_component, F.when(F.col(sk_component) == -1, F.col(sk_component)).otherwise(F.col(sk_component) + max_skey))
                else:
                    sink_df = self.query(destination_path, f'select max({skey}) max_skey')
                    max_skey = int(sink_df.first()['max_skey'])
                    # df = df.withColumn(skey, F.col(skey) + max_skey)
                    df = df.withColumn(skey, F.when(F.col(skey) == -1, F.col(skey)).otherwise(F.col(skey) + max_skey))

            # FIXME: 2024-02-07 Under Dev (Renewed Logic)
            incomingPartitionsCache = df.select(*partitioning_cols).distinct().first()
            destination_partition_url = self.construct_partitioned_url(incomingPartitionsCache,partitioning_cols, destination_path)
            if DeltaTable.isDeltaTable(spark, destination_partition_url):
                # NOTE: MERGE into a partition if it already exists
                logger.info('[EDFIOEACHILD DELETE THEN INSERT] Upsert by Partitions + PK Cols')
                if surrogate_key:
                    update_cols = self.return_upsert_cols(df.columns, partitioning_cols, primary_key, 'update', skey)
                    insert_cols = self.return_upsert_cols(df.columns, partitioning_cols, primary_key, 'insert', None)
                    delta_table_sink.alias('sink').merge(df.alias('updates'), pk_statement).whenMatchedUpdate(set = update_cols).whenNotMatchedInsert(values = insert_cols).execute()
                else:
                    logger.info('[EDFIOEACHILD DELETE THEN INSERT] DELETE THEN INSERT')
                    delta_table_sink.alias('sink').merge(df.alias('updates'), pk_statement).whenMatchedDelete().execute()#whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
                    df_original.write.format('delta').mode('append').option("mergeSchema", "true").partitionBy(*partitioning_cols).save(destination_url) 
            else:
                # NOTE: Dynamically create a new partition if it does not exist
                # FIXME: ELSE CONDITION RELEVANCE AND EFFECT TO BE REVIEWED
                logger.info('[EDFIOEACHILD DELETE THEN INSERT] Dynamically over-write the partition')
                spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
                df.write.format('delta').mode('overwrite').partitionBy(*partitioning_cols).save(destination_url)
            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.return_upsert_metrics(numOutputRows = 0, 
                                                                                                                  numTargetRowsInserted = 0, 
                                                                                                                  numTargetRowsUpdated = 0, 
                                                                                                                  df = df,
                                                                                                                  delta_table_sink = delta_table_sink)
        else:
            logger.debug('[EDFIOEACHILD DELETE THEN INSERT] No existing delta table found. Creating delta table.')
            if not(partitioning):
                logger.info('[EDFIOEACHILD DELETE THEN INSERT] Writing unpartitioned delta lake')
                df.write.format('delta').save(destination_url)
            else:
                if partitioning and len(partitioning_cols) == 0:
                    logger.info('[EDFIOEACHILD DELETE THEN INSERT] Partitioning columns absent - defaulting to DistrictId and SchoolYear as partitioning columns')
                    df.write.format('delta').partitionBy('DistrictId', 'SchoolYear').save(destination_url)
                else:
                    partitioning_str = ', '.join(partitioning_cols)
                    logger.info(f'[EDFIOEACHILD DELETE THEN INSERT] Writing partitioned delta lake - partitioned by - {partitioning_str}')
                    df.write.format('delta').partitionBy(*partitioning_cols).save(destination_url)

            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.return_upsert_metrics(numOutputRows = None, 
                                                                                                                  numTargetRowsInserted = None, 
                                                                                                                  numTargetRowsUpdated = None, 
                                                                                                                  df = df)
        return numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated

    def overwrite(self, 
               df, 
               destination_path, 
               primary_key='id', 
               partitioning=False, 
               partitioning_cols = [], 
               surrogate_key = False, 
               de_duplicate = True):
        """ Upserts the data in the given dataframe into the specified destination using the given primary_key_column to identify the updates.
            If there is no delta table found in the destination_path, one will be created.    
        """
        table_name = destination_path.split('/')[-1]
        if partitioning:
            partitioning_cols = partitioning_cols
        else:
            partitioning_cols = []
        
        if not(self.are_partition_cols_pure(df, partitioning_cols)):
            raise Exception(f"[EDFIOEACHILD OVERWRITE] {table_name} Partition column with non-unique values is present - current implementation does not support this")
    
        skey = None
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)
        if type(primary_key) == list:
            pk_statement = self.return_pk_statement(primary_key)
            skey = list()
            for pk_component in primary_key:
                sk_component = pk_component[:-4] + 'SKey'
                skey.append(sk_component)
        else:
            skey = primary_key[:-4] + 'SKey'
            if 'hkey' in primary_key.lower():
                logger.info('[EDFIOEACHILD OVERWRITE] HKey Exists: Entering if control block')
            pk_statement = self.return_pk_statement([primary_key])
        #surrogate_key= False #Testing this out
        if surrogate_key:
            logger.info('[EDFIOEACHILD OVERWRITE] Trying to create surrogate key')
            # df, skey, surrogate_key = self.add_surrogate_key_to_df(df = df, 
            #                                                        primary_key = primary_key, 
            #                                                        surrogate_key = surrogate_key,
            #                                                        func_enabled = self.surrogateKeyMode)
        else:
            # NOTE: Do not De-Duplicate when surrogate key is present
            if de_duplicate:
                df = self.get_df_deDuplicated_records(df, partitioning, primary_key, partitioning_cols)

        if DeltaTable.isDeltaTable(spark, destination_url) and df.count() <= 0:
            logger.info(f"[EDFIOEACHILD OVERWRITE] {table_name} No Ingress Records")
            numInputRows = numOutputRows = numTargetRowsInserted = numTargetRowsUpdated = 0
        else:
            logger.debug(f'[EDFIOEACHILD OVERWRITE] {table_name} Overwriting existing delta table found. Creating delta table.')
            # df = self.add_and_set_record_version1(df, etl_table = etl_table)
            if not(partitioning):
                logger.info(f'[EDFIOEACHILD OVERWRITE] {table_name} Writing unpartitioned delta lake')
                df.write.format('delta').mode('overwrite').save(destination_url)
            else:
                if partitioning and len(partitioning_cols) == 0:
                    logger.info(f'[EDFIOEACHILD OVERWRITE] {table_name} Partitioning columns absent - defaulting to DistrictId and SchoolYear as partitioning columns')
                    df.write.format('delta').mode('overwrite').partitionBy('DistrictId', 'SchoolYear').save(destination_url)
                else:
                    partitioning_str = ', '.join(partitioning_cols)
                    logger.info(f'[EDFIOEACHILD OVERWRITE] {table_name} Writing partitioned delta lake - partitioned by - {partitioning_str}')
                    df.write.format('delta').mode('overwrite').partitionBy(*partitioning_cols).save(destination_url)

            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.return_upsert_metrics(numOutputRows = None, 
                                                                                                                  numTargetRowsInserted = None, 
                                                                                                                  numTargetRowsUpdated = None, 
                                                                                                                  df = df)
        return numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated

    
    def append(self, df, destination_path, primary_key='id', partitioning = False, partitioning_cols = []):
        # TODO: Edit similarly as above
        """ Appends the given dataframe to the delta table in the specified destination.
            If there is no delta table found in the destination_path, one will be created.    
        """
        destination_url = self.to_url(destination_path)
        df = self.fix_column_names(df)

        if partitioning: 
            df = df.dropDuplicates([primary_key] + partitioning_cols)
        else:
            df = df.dropDuplicates([primary_key])

        if DeltaTable.isDeltaTable(spark, destination_url):
            df.write.format('delta').mode('append').save(destination_url)  # https://docs.delta.io/latest/delta-batch.html#append
        else:
            logger.debug('No existing delta table found. Creating delta table.')
            if not(partitioning):
                logger.info('[EDFIOEACHILD APPEND] Writing unpartitioned delta lake')
                df.write.format('delta').save(destination_url)
            elif partitioning and len(partitioning_cols) == 0:
                logger.info('[EDFIOEACHILD APPEND] Partitioning columns absent - defaulting to DistrictId and SchoolYear as partitioning columns')
                df.write.format('delta').partitionBy('DistrictId', 'SchoolYear').save(destination_url)
            else:
                partitioning_str = ', '.join(partitioning_cols)
                logger.info(f'[EDFIOEACHILD APPEND] Writing partitioned delta lake - partitioned by - {partitioning_str}')
                df.write.format('delta').partitionBy(*partitioning_cols).save(destination_url)
    
    def ingest(self, entity_path, primary_key='id', hashing = False,natural_key = None,landingDateTimeFormat = "yyyyMMddHHmmss",ingestionHistoryMode = False,options={}):
        """ Ingests the data for the entity in the given path.
            CSV files are expected to have a header row by default, and JSON files are expected to have complete JSON docs on each row in the file.
            To specify options that are different from these defaults, use the options param.
            eg, ingest('contoso_sis/v0.1/students') # ingests all entities found in that path
            eg, ingest('contoso_sis/v0.1/students', options={'header':False}) # for CSV files that don't have a header
        """
        self.ingestionHistoryMode = ingestionHistoryMode 
        if not(hashing):
            natural_key = None
        primary_key = self.fix_column_name(primary_key) # fix the column name, in case it has a space in it or some other invalid character
        ingested_path = f'stage2/Ingested/{entity_path}'
        raw_path = f'stage1/Transactional/{entity_path}'

        if not self.path_exists(raw_path):
            logger.error(f'[EDFIOEACHILD INGESTION] Failed to ingest data because the given source data was not found where expected: {raw_path}')
            return

        batches = self.get_batch_info(raw_path)
        number_of_inbound_changes = 0
        for batch in batches:
            batch_type = batch[0]
            source_data_format = batch[1]
            logger.info(f'[EDFIOEACHILD INGESTION] Ingesting from: {raw_path}, batch type of: {batch_type}, source data format of: {source_data_format}')
            source_url = self.to_url(f'{raw_path}/{batch_type}_batch_data')

            if oea.get_folder_size(f'{source_url}/{self.get_latest_folder(source_url)}') > 0:
                if batch_type == 'snapshot'or batch_type=='additive': source_url = f'{source_url}/{self.get_latest_folder(source_url)}' 
                    
                logger.debug(f'Processing {batch_type} data from: {source_url} and writing out to: {ingested_path}')
                if batch_type == 'snapshot':
                    def batch_func(df, batch_id): self.overwrite(df, ingested_path, primary_key)
                elif batch_type == 'additive':
                    def batch_func(df, batch_id): self.append(df, ingested_path, primary_key)
                elif batch_type == 'delta' and not ingestionHistoryMode:
                    def batch_func(df, batch_id): self.upsert(df, ingested_path, primary_key)
                elif batch_type == 'delta' and ingestionHistoryMode:
                    # FIXME: 2024-02-09 De-Duplication Based on latest record testing (via strategy1 => ingested data is ingested_history data)
                    #        This approach requires using append function
                    def batch_func(df, batch_id): self.append(df, ingested_path, primary_key)
                elif batch_type == "delete":
                    # FIXME: 2024-02-09 Reverted to using delete_rows (instead of soft_delete_rows)
                    #        Side Effects of the revert under review
                    def batch_func(df, batch_id): self.delete_rows(df, ingested_path, primary_key, batch_id)
                else:
                    raise ValueError("No valid batch folder was found at that path (expected to find a single folder with one of the following names: snapshot_batch_data, additive_batch_data, or delta_batch_data). Are you sure you have the right path?")                      

                if options == None: options = {}
                options['format'] = source_data_format # eg, 'csv', 'json'
                if source_data_format == 'csv' and (not 'header' in options or options['header'] == None): options['header'] = True  # default to expecting a header in csv files
                if source_data_format == 'json' and (not 'multiline' in options or options['multiline'] == None): options['multiline'] = True # default to expecting multiline formatted json data

                number_of_new_inbound_rows = self.process(source_path = source_url, 
                                                          foreach_batch_function = batch_func, 
                                                          batch_type = batch_type, 
                                                          natural_key = natural_key,
                                                          landingDateTimeFormat = landingDateTimeFormat, 
                                                          options = options)
                if number_of_new_inbound_rows > 0:    
                    self.add_to_lake_db(ingested_path, overwrite = True)
                number_of_inbound_changes += number_of_new_inbound_rows
        return number_of_inbound_changes
    
    def get_sink_general_sensitive_paths(self, source_path):
        path_dict = self.parse_path(source_path)
        
        sink_general_path = path_dict['entity_parent_path'].replace('Ingested', 'Refined') + '/general/' + path_dict['entity']
        sink_sensitive_path = path_dict['entity_parent_path'].replace('Ingested', 'Refined') + '/sensitive/' + path_dict['entity'] + '_lookup'

        return sink_general_path, sink_sensitive_path

    def refine(self, entity_path, metadata=None, primary_key='id'):
        source_path = f'stage2/Ingested/{entity_path}'
        primary_key = self.fix_column_name(primary_key) # fix the column name, in case it has a space in it or some other invalid character
        sink_general_path, sink_sensitive_path = get_sink_general_sensitive_paths(source_path)

        if not metadata:
            all_metadata = self.get_metadata_from_path(path_dict['entity_parent_path'])
            metadata = all_metadata[path_dict['entity']]
        
        df_changes = self.get_latest_changes(source_path, sink_general_path)
        spark_schema = self.to_spark_schema(metadata)
        df_changes = self.modify_schema(df_changes, spark_schema)        
        if df_changes.count() > 0:
            df_pseudo, df_lookup = self.pseudonymize(df_changes, metadata)
            self.upsert(df_pseudo, sink_general_path, f'{primary_key}_pseudonym') # todo: remove this assumption that the primary key will always be hashed during pseduonymization
            self.upsert(df_lookup, sink_sensitive_path, primary_key)    
            self.add_to_lake_db(sink_general_path)
            self.add_to_lake_db(sink_sensitive_path)
            logger.info(f'[EDFIOEACHILD REFINEMENT] Processed {df_changes.count()} updated rows from {source_path} into stage2/Refined')
        else:
            logger.info(f'[EDFIOEACHILD REFINEMENT] No updated rows in {source_path} to process.')
        
        return df_changes.count()

    def pseudonymize(self, df, metadata, transform_mode = False, debugging = True): #: list[list[str]]):
        """ Performs pseudonymization of the given dataframe based on the provided metadata (in the OEA format).
            For example, if the given df is for an entity called person, 
            2 dataframes will be returned, one called person that has hashed ids and masked fields, 
            and one called person_lookup that contains the original person_id, person_id_pseudo,
            and the non-masked values for columns marked to be masked.           
            The lookup table should be written to a "sensitive" folder in the data lake.
            eg, df_pseudo, df_lookup = oea.pseudonymize(df, metadata)
            [More info on this approach here: https://learn.microsoft.com/en-us/azure/databricks/security/privacy/gdpr-delta#pseudonymize-data]
        """
        salt = self._get_salt()
        df_pseudo = df
        df_lookup = df
        if transform_mode:
            lookup_cols = ['DistrictId', 'SchoolYear']
        else:
            lookup_cols = []
        if debugging:
            col_name = 'id'
            #df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
            #df_lookup = df_lookup.withColumn(col_name + "_pseudonym", F.sha2(F.concat(F.col(col_name), F.lit(salt)), 256))
            
            df_pseudo = df_pseudo.withColumnRenamed(col_name, col_name + "_pseudonym")
            df_lookup = df_lookup.withColumn(col_name + "_pseudonym", F.col(col_name))
            
            lookup_cols.append(col_name)
            lookup_cols.append(col_name + "_pseudonym")
        else:
            for row in metadata:
                col_name = row[0]
                dtype = row[1]
                op = row[2]
                if op == "hash-no-lookup" or op == "hnl":
                    # This means that the lookup can be performed against a different table so no lookup is needed.
                    df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
                    df_lookup = df_lookup.drop(col_name)           
                elif op == "hash" or op == 'h':
                    df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
                    df_lookup = df_lookup.withColumn(col_name + "_pseudonym", F.sha2(F.concat(F.col(col_name), F.lit(salt)), 256))
                    
                    lookup_cols.append(col_name)
                    lookup_cols.append(col_name + "_pseudonym")
                
                elif op == "mask" or op == 'm':
                    df_pseudo = df_pseudo.withColumn(col_name, F.lit('*'))
                elif op == "partition-by":
                    pass # make no changes for this column so that it will be in both dataframes and can be used for partitioning
                elif op == "no-op" or op == 'x':
                    df_lookup = df_lookup.drop(col_name)
		
        df_lookup = df_lookup.select(*lookup_cols)
        return (df_pseudo, df_lookup)
    
    # def add_to_lake_db(self, source_entity_path, overwrite = False, extension = None):
    #     """ Adds the given entity as a table (if the table doesn't already exist) to the proper lake db based on the path.
    #         This method will also create the lake db if it doesn't already exist.
    #         eg: add_to_lake_db('stage2/Ingested/contoso_sis/v0.1/students')

    #         Note that a spark db that points to source data in the delta format can't be queried via SQL serverless pool. More info here: https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/resources-self-help-sql-on-demand#delta-lake
    #     """
    #     source_dict = self.parse_path(source_entity_path)
        
    #     db_name = source_dict['ldb_name']
    #     if extension is not None:
    #         if not(extension.startswith('_')):
    #             extension = '_' + extension
    #         source_dict['entity'] = source_dict['entity'] + str(extension)

    #     spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
    #     if overwrite:
    #         spark.sql(f"drop table if exists {db_name}.{source_dict['entity']}")

    #     spark.sql(f"create table if not exists {db_name}.{source_dict['entity']} using DELTA location '{self.to_url(source_dict['entity_path'])}'")
    
    def add_to_lake_db(self, source_entity_path, overwrite = False, extension = None):
        """ Adds the given entity as a table (if the table doesn't already exist) to the proper lake db based on the path.
            This method will also create the lake db if it doesn't already exist.
            eg: add_to_lake_db('stage2/Ingested/contoso_sis/v0.1/students')

            Note that a spark db that points to source data in the delta format can't be queried via SQL serverless pool. More info here: https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/resources-self-help-sql-on-demand#delta-lake
        """
        source_dict = self.parse_path(source_entity_path)
        db_name = source_dict['ldb_name']
        if '/emptySchemas/' not in source_entity_path:
            if extension is not None:
                if not(extension.startswith('_')):
                    extension = '_' + extension
                source_dict['entity'] = source_dict['entity'] + str(extension)
            
            spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
            if overwrite:
                spark.sql(f"drop table if exists {db_name}.{source_dict['entity']}")

            spark.sql(f"create table if not exists {db_name}.{source_dict['entity']} using DELTA location '{self.to_url(source_dict['entity_path'])}'")
        
    def add_to_bucketed_lake_db(self, 
                                source_entity_path, 
                                df, 
                                primary_key,
                                partitioning_cols,
                                destination_url,
                                num_buckets = 5,
                                overwrite = False, 
                                extension = None):
        """ Adds the given entity as a table (if the table doesn't already exist) to the proper lake db based on the path.
            This method will also create the lake db if it doesn't already exist.
            eg: add_to_lake_db('stage2/Ingested/contoso_sis/v0.1/students')

            Note that a spark db that points to source data in the delta format can't be queried via SQL serverless pool. More info here: https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/resources-self-help-sql-on-demand#delta-lake
        """
        source_dict = self.parse_path(source_entity_path)
        
        db_name = source_dict['ldb_name']
        if extension is not None:
            source_dict['entity'] = source_dict['entity'] + str(extension)

        spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
        if overwrite:
            spark.sql(f"drop table if exists {db_name}.{source_dict['entity']}")
        
        df.write.format('delta').mode('append').option("mergeSchema", "true").partitionBy(*partitioning_cols).bucketBy(num_buckets, primary_key).option("path", destination_url).saveAsTable(f"{db_name}.{source_dict['entity']}")
        #spark.sql(f"create table if not exists {db_name}.{source_dict['entity']} using DELTA location '{self.to_url(source_dict['entity_path'])}'")

    def soft_delete_rows(self, df, destination_path, primary_key='id', batch_id=''):
        """Soft deletes the entities in the given dataframe in the specified destination Delta table
            Marks the entities as deleted by setting a status column using MERGE INTO
        """
        df = df.withColumn("rowIsActive", F.lit(False))
        ref = f'{destination_path.split("/")[-1]}{batch_id}'
        print(ref)
        df.createOrReplaceGlobalTempView(ref)
        source_dict = self.parse_path(destination_path)
        db_name = source_dict['ldb_name']
        entity_name = source_dict['entity']
        
        destination_url = self.to_url(destination_path)
        rundate = datetime.now().replace(microsecond=0) # use UTC for the datetime because when parsing it out later, spark's to_timestamp() assumes the local machine's timezone, and the timezone for the spark cluster will be UTC

        # Use MERGE INTO to update the Delta table based on the condition
        if DeltaTable.isDeltaTable(spark, destination_url):
            delta_table_sink = DeltaTable.forPath(spark, destination_url)
            delta_table_sink.alias('sink').merge(df.alias('source'), f'sink.{primary_key} = source.{primary_key}') \
                .whenMatchedUpdate(set={"rowIsActive": "false", "LastModifiedDate": f"'{rundate}'"}) \
                .execute()

        # Refresh the table to make the changes visible
        spark.sql(f"REFRESH TABLE {db_name}.{entity_name}")

### Ed-Fi Client - API Client

In [ ]:
class EdFiClient:
    #The constructor
    def __init__(self, workspace, kvName, moduleName, authUrl, dataManagementUrl, changeQueriesUrl, dependenciesUrl, apiVersion, batchLimit, minChangeVer="", maxChangeVer="", landingDateTimeFormat = "yyyyMMddHHmmss", schoolYear=None, districtId=None, kvSecret_clientId = None, kvSecret_clientSecret = None, retry_strategy = None, threadMode = False, devMode = False, error_logger = None):
        self.workspace = workspace
        if retry_strategy is None:
            self.retry_strategy = Retry(total=3,
                                        backoff_factor=1,
                                        status_forcelist=[429, 500, 502, 503, 504],
                                        allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "DELETE"])
        else:
            self.retry_strategy = retry_strategy 
        self.adapter = HTTPAdapter(max_retries=self.retry_strategy)
        self.threadMode = threadMode
        if self.threadMode:
            self.thread_local = threading.local()
        self.keyvault_linked_service = 'LS_KeyVault'
        if kvName is None:
            kvName = oea.keyvault

        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        for handler in logging.getLogger().handlers:
            handler.setFormatter(formatter)           
        # Customize log level for all loggers
        logging.getLogger().setLevel(logging.INFO)   
        logger.info(f"[LANDING]: minChangeVersion={minChangeVer} and maxChangeVersion={maxChangeVer}")

        if not kvName and workspace == "dev":
            logger.info("defaulting to test data")
            self.clientId = ""
            self.clientSecret = ""
        else:
            try:
                #try to get the credentials from keyvault
                if devMode:
                    self.clientId = kvSecret_clientId # oea._get_secret(kvSecret_clientId) if kvSecret_clientId is not None else kvSecret_clientId
                    self.clientSecret = kvSecret_clientSecret # oea._get_secret(kvSecret_clientSecret) if kvSecret_clientSecret is not None else kvSecret_clientSecret
                else:
                    self.clientId = oea._get_secret(kvSecret_clientId)
                    self.clientSecret = oea._get_secret(kvSecret_clientSecret)
            except Exception as e:
                #if there was an error getting the credentials
                #if this is the dev instance proceed with test data, otherwise raise the Exception
                logger.info(f"failed to retrieve clientId and clientSecret from keyvault with exception: {str(e)}")
                if workspace == "dev":
                    logger.info("defaulting to test data")
                    self.clientId = ""
                    self.clientSecret = ""
                else:
                    raise
        
        self.authUrl = authUrl
        self.dataManagementUrl = dataManagementUrl
        self.changeQueriesUrl = changeQueriesUrl
        self.dependenciesUrl = dependenciesUrl
        from datetime import datetime
        if landingDateTimeFormat == 'yyyyMMddHHmmss':
            self.runDate = datetime.utcnow().strftime('%Y%m%d%H%M%S')
        else:
            self.runDate = datetime.utcnow().strftime(landingDateTimeFormat)
        self.authTime = None
        self.expiresIn = None
        self.accessToken = None
        districtPath = districtId if districtId != None else "All"
        schoolYearPath = schoolYear if schoolYear != None else "All"
        self.transactionalFolder = f"Transactional/{moduleName}/{apiVersion}/DistrictId={districtPath}/SchoolYear={schoolYearPath}"
        self.batchLimit = batchLimit
        self.minChangeVer = minChangeVer
        self.maxChangeVer = maxChangeVer
        self.error_logger = error_logger
        self.failedBatchUrls = dict()
        self.lock = threading.Lock()
    
    def init_thread_local_vars(self):
        if self.threadMode:
            if not hasattr(self.thread_local, "authTime"):
                self.thread_local.authTime = None
            if not hasattr(self.thread_local, "expiresIn"):
                self.thread_local.expiresIn = None
            if not hasattr(self.thread_local, "accessToken"):
                self.thread_local.accessToken = None
            if not hasattr(self.thread_local, "session"):
                self.thread_local.session = requests.Session()
                self.thread_local.session.mount("https://", self.adapter)
                self.thread_local.session.mount("http://", self.adapter)

    def getSession(self):
        if self.threadMode:
            if not hasattr(self.thread_local, "session"):
                self.thread_local.session = requests.Session()
                self.thread_local.session.mount("https://", self.adapter)
                self.thread_local.session.mount("http://", self.adapter)
            return self.thread_local.session
        else:
            if not hasattr(self, "session"):
                self.session = requests.Session()
                self.session.mount("https://", self.adapter)
                self.session.mount("http://", self.adapter)
            return self.session

    #Method to get the access token for the test data set
    def authenticateWithAuthorization(self):
        #TODO: need to update this if we want it to work with other edfi provided test data set versions
        result = requests.post("https://api.ed-fi.org/v5.2/api/oauth/token",{"grant_type":"client_credentials"},headers={"Authorization":"Basic UnZjb2hLejl6SEk0OkUxaUVGdXNhTmY4MXh6Q3h3SGZib2xrQw=="})
        return result

    #Method to get the access token for a production system with basic auth
    def authenticateWithBasic(self):
        authHeader = HTTPBasicAuth(self.clientId, self.clientSecret)
        result = requests.post(self.authUrl,{"grant_type":"client_credentials"},auth=authHeader)
        return result

    #This method orchestrates the authentication
    def authenticate(self):
        if self.threadMode:
            self.thread_local.authTime = datetime.now()
            if not self.clientId or not self.clientSecret: #self.workspace == "dev":
                result = self.authenticateWithAuthorization().json()
                logger.info(result)
            else:
                result = self.authenticateWithBasic().json()
            self.thread_local.expiresIn = result["expires_in"]
            self.thread_local.accessToken = result["access_token"]
        else:
            self.authTime = datetime.now()
            if not self.clientId or not self.clientSecret: #self.workspace == "dev":
                result = self.authenticateWithAuthorization().json()
                logger.info(result)
            else:
                result = self.authenticateWithBasic().json()
            self.expiresIn = result["expires_in"]
            self.accessToken = result["access_token"]
    
    #This method manages the access token, refreshing it when required
    def getAccessToken(self):
        # IMPLEMENT THREADING
        # Get a new access token if none exists, or if the expires time is within 5 minutes of expiry
        currentTime = datetime.now()
        if self.threadMode:
             if self.thread_local.accessToken == None or (currentTime-self.thread_local.authTime).total_seconds() > self.thread_local.expiresIn - 300:
                self.authenticate()
                return self.thread_local.accessToken
             else:
                return self.thread_local.accessToken 
        else:
            if self.accessToken == None or (currentTime-self.authTime).total_seconds() > self.expiresIn - 300:
                self.authenticate()
                return self.accessToken
            else:
                return self.accessToken 

    def getChangeQueryVersion(self):
        access_token = self.getAccessToken()
        requests_session = self.getSession()
        response = requests_session.get(changeQueriesUrl + "/availableChangeVersions", headers={"Authorization":"Bearer " + access_token})
        return response.json(), response.status_code

    def getEntities(self):
        requests_session = self.getSession()
        return requests_session.get(self.dependenciesUrl).json()

    def getDeletes(self,resource, minChangeVersion, maxChangeVersion):
        if minChangeVersion is None and maxChangeVersion is None:
            url = f"{self.dataManagementUrl}{resource}/deletes"
        else:
            url = f"{self.dataManagementUrl}{resource}/deletes?MinChangeVersion={minChangeVersion}&MaxChangeVersion={maxChangeVersion}"

        requests_session = self.getSession()
        result = requests_session.get(url,headers = {"Authorization": f"Bearer {self.getAccessToken()}"})
        return result

    def writeToDeletesFile(self, resource, deletes):
        path = f"stage1/{self.transactionalFolder}{resource}/delete_batch_data/rundate={self.runDate}/data{uuid.uuid4()}.json"
        mssparkutils.fs.put(oea.to_url(path),deletes.text)
    
    def landEntityWithChangeVersion(self, input_tuple):
        self.init_thread_local_vars()
        entity, minChangeVersion, maxChangeVersion, debugMode, fetchHistory, requestedUrls = input_tuple
        resource = entity['resource']
        resourceMinChangeVersion = self.getChangeVersion(resource, minChangeVersion) if self.minChangeVer is None else minChangeVersion
        if fetchHistory:
            resourceMinChangeVersion = "0"
        self.landEntity(resource, resourceMinChangeVersion, maxChangeVersion, debugMode, requestedUrls)
        deletes = self.getDeletes(resource, resourceMinChangeVersion, maxChangeVersion)
        
        if len(deletes.json()):
            logger.info(f"[LANDING WITH CHANGE VERSION] DELETES: Writing deletes for the resource: {entity}")
            self.writeToDeletesFile(resource, deletes)

    def landEntities(self, entities = 'All', debugMode = False, fetchHistory = False, thread_max_workers = None, requestedUrls = dict()):
        if entities == 'All':
            entities = self.getEntities()
        elif requestedUrls != dict():
            entities = list(requestedUrls.keys())
            entities = self.getSpecifiedEntities(entities)
        else:
            entities = self.getSpecifiedEntities(entities)

        self.init_thread_local_vars()
        changeVersion, changeVersionResponseStatus = self.getChangeQueryVersion()
        if changeVersionResponseStatus < 400:
            # FIXME - Temporary fix for oldestChangeVersion casing
            try:
                minChangeVersion = changeVersion['OldestChangeVersion'] if self.minChangeVer == None else int(self.minChangeVer)
                maxChangeVersion = changeVersion['NewestChangeVersion']  if self.maxChangeVer == None else int(self.maxChangeVer)
            except:
                minChangeVersion = changeVersion['oldestChangeVersion'] if self.minChangeVer == None else int(self.minChangeVer)
                maxChangeVersion = changeVersion['newestChangeVersion']  if self.maxChangeVer == None else int(self.maxChangeVer)
        else:
            minChangeVersion = None
            maxChangeVersion = None
        
        if self.threadMode:
            with concurrent.futures.ThreadPoolExecutor(max_workers=thread_max_workers) as tpe:
                logger.info("[LANDING ENTITIES WITH THREADS] Entered Thread Pool")
                self.init_thread_local_vars()
                for entity in entities:
                    tpe.submit(self.landEntityWithChangeVersion,(entity, minChangeVersion, maxChangeVersion,debugMode, fetchHistory, requestedUrls.get(entity['resource'], [])))
        else:
            for entity in entities:
                self.landEntityWithChangeVersion((entity, minChangeVersion, maxChangeVersion, debugMode, fetchHistory, requestedUrls.get(entity['resource'], [])))
    
    def getChangeVersion(self, resource, default):
        path = f"stage1/{self.transactionalFolder}{resource}/changeFile.json"
        if mssparkutils.fs.exists(oea.to_url(path)):
            return json.loads(mssparkutils.fs.head(oea.to_url(path)))['changeVersion']
        else:
            return default

    def landEntity(self,resource,minChangeVersion,maxChangeVersion, debugMode = False, requestedUrls = list()):
        logger.info(f"[LANDING ENTITY] initiating {resource}")
        if minChangeVersion is None and maxChangeVersion is None:
            url = f"{self.dataManagementUrl}{resource}?totalCount=true"
        else:
            url = f"{self.dataManagementUrl}{resource}?MinChangeVersion={minChangeVersion}&MaxChangeVersion={maxChangeVersion}&totalCount=true"
            
        path = f"stage1/{self.transactionalFolder}{resource}"
        requests_session = self.getSession()
        total_count_response = requests_session.get(url, headers={"Authorization":f"Bearer {self.getAccessToken()}"})
        failedBatchUrls = list()
        total_data_size = 0
        start_time = datetime.now()

        # FIXME: 2024-02-15 Failed Urls Fetches
        if requestedUrls is not None and len(requestedUrls) > 1:
            logger.info(f"[LANDING ENTITY]: Landing data for the urls - {len(requestedUrls)}")
            for url in requestedUrls:
                data = requests_session.get(url, headers={"Authorization":f"Bearer {self.getAccessToken()}"}) 
                if(data.status_code < 400):  
                    filepath = f"{path}/delta_batch_data/rundate={self.runDate}/data{uuid.uuid4()}.json"
                    output = json.loads(data.text)

                    partition_data_size = len(output)
                    total_data_size = total_data_size + partition_data_size  
                    total_count = total_data_size                       
                    output_string = ""
                    for line in output:
                        output_string += json.dumps(line) + "\n"
                    mssparkutils.fs.put(oea.to_url(filepath),output_string)
                    failedBatchUrls.append(url)
                else:
                    logger.error(f"[LANDING ENTITY] There was an error retrieving batch data for {resource}")
                    failedBatchUrls.append(url)
        else:
            try:
                total_count = int(total_count_response.headers["Total-Count"])
                logger.info(f'[LANDING ENTITY] {resource}: TOTAL RECORD COUNT - {total_count}') 
            except:
                logger.error(F"[LANDING ENTITY] {resource}: Total-Count not present")
            try:
                #Keyset pagination implementation: https://techdocs.ed-fi.org/display/ODSAPIS3V61/Improve+Paging+Performance+on+Large+API+Resources
                
                #split into the total number of partitions, and the range size
                total_count = int(total_count_response.headers["Total-Count"])
                if debugMode:
                    logger.info(f"[LANDING ENTITY] {resource}: --- Total Count         : {total_count}")
                    logger.info(f"[LANDING ENTITY] {resource}: --- Batch Size         : {self.batchLimit}")

                partitions = math.ceil(total_count / self.batchLimit)                
                if(total_count == 0 and partitions == 0):
                    logger.info(f'[LANDING ENTITY] {resource}: No new / updated items b/w the following versions {minChangeVersion} and {maxChangeVersion}')
                else:
                    range_size = math.ceil(maxChangeVersion / partitions)
                    for i in range(partitions + 1):
                        #calculate the min and max change version for the partition
                        partitionMinChangeVersion = i*range_size
                        partitionMaxChangeVersion = min(maxChangeVersion, (i+1)*range_size)

                        #Calculate the number of batches per partition
                        partitionUrl=f"{self.dataManagementUrl}{resource}?MinChangeVersion={partitionMinChangeVersion}&MaxChangeVersion={partitionMaxChangeVersion}&totalCount=true"
                        partition_count_response = requests_session.get(partitionUrl, headers={"Authorization":f"Bearer {self.getAccessToken()}"})
                        partition_count = int(partition_count_response.headers["Total-Count"])
                        batches = partition_count // self.batchLimit

                        if debugMode:
                            logger.info(f"[LANDING ENTITY] {resource}: --- Partition Number         : {i}")
                            logger.info(f"[LANDING ENTITY] {resource}: --- Partition MinChangeVer   : {partitionMinChangeVersion}")
                            logger.info(f"[LANDING ENTITY] {resource}: --- Partition MaxChangeVer   : {partitionMaxChangeVersion}")
                            logger.info(f"[LANDING ENTITY] {resource}: --- Number of batches        : {batches}", )
                            logger.info(f"[LANDING ENTITY] {resource}: --- Number of partitions     : {partition_count}")

                        for j in range(batches + 1):
                            batchUrl=f"{partitionUrl}&limit={self.batchLimit}&offset={(j)*self.batchLimit}"
                            data = requests_session.get(batchUrl, headers={"Authorization":f"Bearer {self.getAccessToken()}"}) 
                            if(data.status_code < 400):
                                filepath = f"{path}/delta_batch_data/rundate={self.runDate}/data{uuid.uuid4()}.json"
                                output = json.loads(data.text)

                                partition_data_size = len(output)
                                total_data_size = total_data_size + partition_data_size                  
                                output_string = ""
                                for line in output:
                                    output_string += json.dumps(line) + "\n"
                                mssparkutils.fs.put(oea.to_url(filepath),output_string)
                            else:
                                logger.error(f"[LANDING ENTITY] There was an error retrieving batch data for {resource}")
                                failedBatchUrls.append(batchUrl)
            except ZeroDivisionError as zero_error:
                logger.error(f'[LANDING ENTITY] Divide by Zero Error - {zero_error}; Landing Data of offset = 0')
                requests_session = self.getSession()
                data = requests_session.get(url, headers={"Authorization":f"Bearer {self.getAccessToken()}"})      
                if(data.status_code < 400):
                    filepath = f"{path}/delta_batch_data/rundate={self.runDate}/data{uuid.uuid4()}.json"
                    output = json.loads(data.text)

                    partition_data_size = len(output)
                    total_data_size = total_data_size + partition_data_size                  
                    if(len(output) == 0):
                        logger.info(f'[LANDING ENTITY] {resource}: No new / updated items b/w the following versions {minChangeVersion} and {maxChangeVersion}')
                    else:
                        output_string = ""
                        for line in output:
                            output_string += json.dumps(line) + "\n"
                        mssparkutils.fs.put(oea.to_url(filepath),output_string)
                else:
                    logger.info(f"[LANDING DIVIDE BY ZERO] There was an error retrieving data for {resource}")
                    failedBatchUrls.append(url)
            except Exception as error:
                if resource == '/ed-fi/schoolYearTypes':
                    output, total_count, returned_failedBatchUrls = self.returnEntityData(resource = resource,
                                                minChangeVersion=None,
                                                maxChangeVersion=None)
                    total_data_size = len(output)
                    filepath = f"{path}/delta_batch_data/rundate={self.runDate}/data{uuid.uuid4()}.json"
                    if(len(output) == 0):
                        logger.info(f'[LANDING ENTITY] {resource}: No new / updated items b/w the following versions {minChangeVersion} and {maxChangeVersion}')
                    else:
                        output_string = ""
                        for line in output:
                            output_string += json.dumps(line) + "\n"
                        mssparkutils.fs.put(oea.to_url(filepath),output_string)
                    failedBatchUrls = failedBatchUrls + returned_failedBatchUrls
                else:
                    logger.info(f'[LANDING ENTITY] An Error Occured - {error}; Landing of the resource - {resource} skipped')
                    return
        with self.lock:
            self.failedBatchUrls[resource] = failedBatchUrls
        
        changeFilepath = f"{path}/changeFile.json"
        changeData = {"changeVersion":maxChangeVersion}
        mssparkutils.fs.put(oea.to_url(changeFilepath),json.dumps(changeData),True)
        logger.info(f"[LANDING ENTITY] Completed {resource}")

        end_time = datetime.now()
        # FIXME 2024-02-20: Under Review - logging, numInputRows, numTargetRowsInserted
        log_data = self.error_logger.create_log_dict(uniqueId = self.error_logger.generate_random_alphanumeric(10), # Generate a random 10-character alphanumeric value
                                                pipelineExecutionId = pipelineExecutionId,#'TEST_1234',#executionId,
                                                sparkSessionId = spark.sparkContext.applicationId,
                                                stageName = "ed-fi: Landing",
                                                schemaFormat = 'ed-fi: nested',
                                                entityType =  resource.split('/')[1],
                                                entityName = resource.split('/')[-1],
                                                numInputRows = total_count,
                                                totalNumOutputRows = total_data_size,
                                                numTargetRowsInserted = total_data_size,
                                                numTargetRowsUpdated = 0,
                                                numRecordsSkipped = 0,
                                                numRecordsDeleted = 0,
                                                start_time = start_time,
                                                end_time = end_time,
                                                insertionType = 'NA/Invalid')
        with self.lock:
            self.error_logger.consolidate_logs(log_data,'entity')
    
    def parse_text_to_dataframe(self, text_content, delimiter=','):
        csv_file = StringIO(text_content)
        df = pd.read_csv(csv_file, delimiter=delimiter) 
        
        return df

    def extract_entities_for_etl(self, df):
        concat_list = []
        entity_names_list = []
        
        for index, row in df.iterrows():
            entity_type = row['entity_type']
            entity_name = row['entity_name']
            
            if entity_type != 'ed-fi':
                concat_list.append(f'/{entity_type}/{entity_name}')
            
            concat_list.append(f'/ed-fi/{entity_name}')
            entity_names_list.append(entity_name)
        
        return concat_list, list(set(entity_names_list))


    def getSpecifiedEntities(self, entities_list):
        data = self.getEntities()
        entities = [item for item in data if item['resource'] in entities_list]
        return entities

    def listSpecifiedEntities(self, path): 
        fullpath = path + '/entities-to-extract.csv'
        pathExists = oea.path_exists(fullpath)
        if pathExists:
            csv_str = oea.get_text_from_path(fullpath)
            csv_pd_df = self.parse_text_to_dataframe(csv_str, delimiter=',')
            api_entities, entities = self.extract_entities_for_etl(csv_pd_df)
        else:
            api_entities = list()
            entities = list()
        return api_entities, entities

    def returnEntityData(self,
                         resource,
                         minChangeVersion=None,
                         maxChangeVersion=None,
                         increment = 50):
        self.init_thread_local_vars()
        offset = 0
        total_count = None
        logger.info(f"[LANDING W/O CHANGE VERSION] initiating {resource}")
        failedBatchUrls = list()
        try:
            temp_output = "PLACEHOLDER"
            while temp_output != []:
                url = f"{self.dataManagementUrl}{resource}?limit={self.batchLimit}&offset={offset}&totalCount=true"
                temp_access_token = self.getAccessToken()
                requests_session = self.getSession()
                data = requests_session.get(url, headers={"Authorization": f"Bearer {temp_access_token}"})
                if data.status_code == 404:
                    logger.info("[LANDING W/O CHANGE VERSION] RESOURCE NOT FOUND")
                    return None, None, None
                if data.status_code < 400:
                    if (temp_output == "PLACEHOLDER"):
                        temp_output = json.loads(data.text)
                        output = temp_output
                        if total_count is None:
                            total_count = int(data.headers["Total-Count"])
                    else:
                        temp_output = json.loads(data.text)
                        output = output + temp_output
                else:
                    logger.error(f'[LANDING W/O CHANGE VERSION] ERROR - {data.status_code}')
                    failedBatchUrls.append(url)
                    return None, None, None
                offset += increment
        except Exception as e:
            logger.error(f"[LANDING W/O CHANGE VERSION] ERROR Occurred - {e}")
        return output, total_count, failedBatchUrls

    def fetch_descriptors(self, 
                          descriptor_col, 
                          entities_info, 
                          minChangeVersion = None, 
                          maxChangeVersion = None):
        for entity_info in entities_info:
            if entity_info['resource'] == f'/ed-fi/{descriptor_col}':
                descriptor_col = f'/ed-fi/{descriptor_col}'
                
            elif entity_info['resource'] == f'/TX/{descriptor_col}':
                descriptor_col = f'/TX/{descriptor_col}'
            
        if not descriptor_col.startswith('/'):
            logger.info("[FETCH DESCRIPTOR] No Such Entity")
            return None
        
        output, _ = self.returnEntityData(resource = descriptor_col,
                                       minChangeVersion = minChangeVersion, 
                                       maxChangeVersion = maxChangeVersion)
        if output is not None:
            spark_df = spark.createDataFrame(output)
        else:
            return output
        return spark_df

### Ed-Fi Refinement

In [ ]:
class EdFiRefine:
    #The constructor
    def __init__(self, 
                 workspace, 
                 oea, 
                 spark,
                 schema_gen,
                 moduleName, 
                 authUrl,
                 swaggerUrl, 
                 dataManagementUrl, 
                 changeQueriesUrl, 
                 dependenciesUrl, 
                 apiVersion, 
                 schoolYear, 
                 districtId,
                 pipelineExecutionId,
                 error_logger,
                 test_mode):
        self.workspace = workspace
        self.oea = oea
        self.spark = spark
        self.schema_gen = schema_gen

        self.moduleName = moduleName
        self.authUrl = authUrl
        self.swaggerUrl = swaggerUrl
        self.dataManagementUrl = dataManagementUrl
        self.dependenciesUrl = dependenciesUrl
        self.apiVersion = apiVersion
        
        self.schoolYear = schoolYear
        self.districtId = districtId
        self.districtId_col_name = 'DistrictId'
        self.schoolYear_col_name = 'SchoolYear'

        self.schemas = self.schema_gen.create_spark_schemas()
        self.primitive_datatypes = ['timestamp', 'date', 'decimal', 'boolean', 'integer', 'string', 'long']
        self.test_mode = test_mode
        self.thread_local = threading.local()
        self.pipelineExecutionId = pipelineExecutionId
        self.error_logger = error_logger
    
    def store_start_time(self, table_name):
        if not hasattr(self.thread_local, "start_times"):
            self.thread_local.start_times = dict()
            self.thread_local.start_times[table_name] = datetime.now()
        else:
            self.thread_local.start_times[table_name] = datetime.now()
        
    def get_descriptor_schema(self, descriptor):
        fields = []
        fields.append(StructField('_etag',LongType(), True))
        fields.append(StructField(f"{descriptor[:-1]}Id", IntegerType(), True))
        fields.append(StructField('codeValue',StringType(), True))
        fields.append(StructField('description',StringType(), True))
        fields.append(StructField('id',StringType(), True))
        fields.append(StructField('namespace',StringType(), True))
        fields.append(StructField('shortDescription',StringType(), True))
        return StructType(fields)

    def get_descriptor_metadata(self, descriptor):
        return [['_etag', 'long', 'no-op'],
                [f"{descriptor[:-1]}Id", 'integer', 'hash'],
                ['codeValue','string', 'no-op'],
                ['description','string', 'no-op'],
                ['id','string', 'no-op'],
                ['namespace','string', 'no-op'],
                ['shortDescription','string', 'no-op']]

    def has_column(self, df, col):
        try:
            df[col]
            return True
        except AnalysisException:
            return False

    def modify_descriptor_value(self, df, col_name, districtId_col_name = 'DistrictId', schoolYear_col_name = 'SchoolYear'):
        if col_name in df.columns:
            # TODO: @Abhinav, I do not see where you made the changes to use the descriptorId instead of Namespace/CodeValue
            df = df.withColumn(f"{col_name}LakeId", f.concat_ws('_', f.col(districtId_col_name), f.col(schoolYear_col_name), f.regexp_replace(col_name, '#', '_')))
            df = df.drop(col_name)
        else:
            df = df.withColumn(f"{col_name}LakeId", f.lit(None).cast("String"))

        return df

    def flatten_reference_col(self, df, target_col, districtId_col_name = 'DistrictId', schoolYear_col_name = 'SchoolYear'):
        col_prefix = target_col.name.replace('Reference', '')
        df = df.withColumn(f"{col_prefix}LakeId", f.when(f.col(target_col.name).isNotNull(), f.concat_ws('_', f.col(districtId_col_name), f.col(schoolYear_col_name), f.split(f.col(f'{target_col.name}.link.href'), '/').getItem(3))))
        df = df.drop(target_col.name)
        return df

    def modify_references_and_descriptors(self, df, target_col, districtId_col_name = 'DistrictId', schoolYear_col_name = 'SchoolYear'):
        for ref_col in [x for x in df.columns if re.search('Reference$', x) is not None]:
            df = self.flatten_reference_col(df, target_col.dataType.elementType[ref_col], districtId_col_name, schoolYear_col_name)
        for desc_col in [x for x in df.columns if re.search('Descriptor$', x) is not None]:
            df = self.modify_descriptor_value(df, desc_col, districtId_col_name, schoolYear_col_name)
        return df
    
    def upsert_with_logging(self, 
                            df, 
                            destination_path, 
                            primary_key, 
                            table_name,
                            ext_entity,
                            parent = True):
        # TODO: 2024-02-12: Edit the logging capabilities
        start_time = self.thread_local.start_times.get(table_name, datetime.now())
        logger.info(f"[REFINEMENT UPSERT WITH LOGGING] {table_name}: {destination_path}")
        if parent:
            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.oea.upsert(df = df, 
                                                                    destination_path = destination_path,
                                                                    primary_key = primary_key,#['RECORD', 'DistrictId', 'SchoolYear'],
                                                                    partitioning = True,
                                                                    partitioning_cols = [self.districtId_col_name, self.schoolYear_col_name],
                                                                    surrogate_key = False)   
        else:
            numInputRows, numOutputRows, numTargetRowsInserted, numTargetRowsUpdated = self.oea.delete_then_insert(df = df, 
                                                                    destination_path = destination_path,
                                                                    primary_key = primary_key,#['RECORD', 'DistrictId', 'SchoolYear'],
                                                                    partitioning = True,
                                                                    partitioning_cols = [self.districtId_col_name, self.schoolYear_col_name],
                                                                    surrogate_key = False)                                    
        end_time = datetime.now()
        log_data = self.error_logger.create_log_dict(uniqueId = self.error_logger.generate_random_alphanumeric(10), # Generate a random 10-character alphanumeric value
                                                pipelineExecutionId = self.pipelineExecutionId,#'TEST_1234',#executionId,
                                                sparkSessionId = self.spark.sparkContext.applicationId,
                                                stageName = "ed-fi: Refinement",
                                                schemaFormat = 'ed-fi: exploded',
                                                entityType =  ext_entity.lower(),
                                                entityName = table_name,
                                                numInputRows = numInputRows,
                                                totalNumOutputRows = numOutputRows,
                                                numTargetRowsInserted = numTargetRowsInserted,
                                                numTargetRowsUpdated = numTargetRowsUpdated,
                                                numRecordsSkipped = 0,
                                                numRecordsDeleted = 0,
                                                start_time = start_time,
                                                end_time = end_time,
                                                insertionType = 'upsert')
        self.error_logger.consolidate_logs(log_data,'entity')

    def explode_arrays(self, 
                       df, 
                       sink_general_path, 
                       target_col, 
                       schema_name, 
                       table_name, 
                       extension = None,
                       ext_entity = None, 
                       districtId_col_name = 'DistrictId',
                       schoolYear_col_name = 'SchoolYear',
                       parent_cols = ['lakeId', 'DistrictId', 'SchoolYear', 'LastModifiedDate', 'rowIsActive'],
                       nonNull_count = 1):
        cols = parent_cols#['lakeId', 'DistrictId', 'SchoolYear', 'LastModifiedDate']
        child_url = oea.to_url(f"{sink_general_path}_{target_col.name}")
        child_name = f"{table_name}_{target_col.name}"
        self.store_start_time(child_name)
        if nonNull_count <=0 and DeltaTable.isDeltaTable(spark, child_url):
            logger.info('[REFINEMENT EXPLOSION]: Child Table - No Ingress Records (Empty Schema Present)')
        else:
            child_df = df.select(cols + [target_col.name])
            child_df = child_df.withColumn("exploded", f.explode(target_col.name)).drop(target_col.name).select(cols + ['exploded.*'])
            child_df_cached = child_df.cache()
            child_df = child_df_cached

            grand_child_df = None
            grand_child_df_cached = None

            # TODO: It looks like te {target_col.name}LakeId column is not addedd to the child entities
            #       We should use LakeId suffix when using the "id" column from the parent and HKey suffix when creating a Hash Key based on composite key columns
            identity_cols = [x.name for x in target_col.dataType.elementType.fields if 'x-Ed-Fi-isIdentity' in x.metadata].sort()
            if(identity_cols is not None and len(identity_cols) > 0):
                child_df = child_df.withColumn(f"{target_col.name}LakeId", f.concat(f.col(districtId_col_name), f.lit('_'), f.col(schoolYear_col_name), f.lit('_'), *[f.concat(f.col(x), f.lit('_')) for x in identity_cols]))
            
            # IMPORTANT: We must modify Reference and Descriptor columns for child columns "first". 
            # This must be done "after" the composite key from identity_cols has been created otherwise the columns are renamed and will not be found by identity_cols.
            # This must be done "before" the grand_child is exploded below
            child_df = self.modify_references_and_descriptors(child_df, target_col, districtId_col_name, schoolYear_col_name)

            for array_sub_col in [x for x in target_col.dataType.elementType.fields if x.dataType.typeName() == 'array' ]:
                grand_child_url = oea.to_url(f"{sink_general_path}_{target_col.name}_{array_sub_col.name}")
                grand_child_name = f"{table_name}_{target_col.name}_{array_sub_col.name}"
                self.store_start_time(grand_child_name)
                
                # TODO: Experiment with Non Nulls and Optimization (Future)
                # child_df_size = child_df.withColumn("size", F.size(F.col(array_sub_col.name)))
                # nonNull_count = child_df_size.filter(F.col("size") >= 1).count()
                nonNull_count = 1
                
                if nonNull_count <=0 and DeltaTable.isDeltaTable(spark, grand_child_url):
                    logger.info('[REFINEMENT EXPLOSION] Grand Child Table - No Ingress Records (Empty Schema Present)')
                else:
                    grand_child_df = child_df.withColumn('exploded', f.explode(array_sub_col.name)).select(child_df.columns + ['exploded.*']).drop(array_sub_col.name)
                    grand_child_df_cached = grand_child_df.cache()
                    grand_child_df = grand_child_df_cached
                    
                    # Modifying Reference and Descriptor columns for the grand_child array
                    grand_child_df = self.modify_references_and_descriptors(grand_child_df, array_sub_col, districtId_col_name, schoolYear_col_name)

                    logger.info(f"[REFINEMENT EXPLOSION] Writing Grand Child Table - {table_name}_{target_col.name}_{array_sub_col.name}")
                    # TODO: Review impact of changing from UPSERT to delete_then_insert
                    self.upsert_with_logging(df = grand_child_df, 
                                    destination_path = f"{sink_general_path}_{target_col.name}_{array_sub_col.name}", 
                                    primary_key = 'lakeId', 
                                    table_name = grand_child_name,
                                    ext_entity = ext_entity,
                                    parent = False)
                    self.oea.add_to_lake_db(source_entity_path = f"{sink_general_path}_{target_col.name}_{array_sub_col.name}", 
                                    overwrite = True,
                                    extension = extension)
                    #grand_child_df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(oea.to_url(f"{sink_general_path}_{target_col.name}_{array_sub_col.name}"))

            logger.info(f"[REFINEMENT EXPLOSION] Writing Child Table - {table_name}_{target_col.name}")
            # TODO: Review impact of changing from UPSERT to delete_then_insert
            # FIXME: assessments_periods Temporary Fix
            child_destination_path = f"{sink_general_path}_{target_col.name}"
            if ('/assessments_period' in child_destination_path) and ('/assessments_periods' not in child_destination_path):
                child_destination_path = child_destination_path.replace('/assessments_period', '/assessments_periods')
            self.upsert_with_logging(df = child_df, 
                                    destination_path = child_destination_path, 
                                    primary_key = 'lakeId', 
                                    table_name = child_name,
                                    ext_entity = ext_entity,
                                    parent = False)
            self.oea.add_to_lake_db(source_entity_path = child_destination_path,#f"{sink_general_path}_{target_col.name}",
                            overwrite = True,
                            extension = extension)
            #child_df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(oea.to_url(f"{sink_general_path}_{target_col.name}"))
            child_df_cached.unpersist()
            if grand_child_df:
                grand_child_df_cached.unpersist()

        # Drop array column from parent entity
        df = df.drop(target_col.name)
        return df

    def transform(self,
                df, 
                schema_name, 
                table_name, 
                primary_key,
                ext_entity,
                sink_general_path,
                districtId_col_name = 'DistrictId', 
                schoolYear_col_name = 'SchoolYear'):
        self.store_start_time(table_name)            
        if re.search('Descriptors$', table_name) is None:
            # Use Deep Copy otherwise the schemas object also gets modified every time target_schema is modified
            target_schema = copy.deepcopy(self.schemas[table_name])
            # Add primary key
            if self.has_column(df, primary_key):
                df = df.withColumn('lakeId', f.concat_ws('_', f.col(districtId_col_name), f.col(schoolYear_col_name), f.col(primary_key)).cast("String"))
            else:
                df = df.withColumn('lakeId', f.lit(None).cast("String"))
        else:
            target_schema = self.get_descriptor_schema(table_name)
            # Add primary key
            if self.has_column(df, 'codeValue') and self.has_column(df, 'namespace'):
                # TODO: @Abhinav, I do not see where you made the changes to use the descriptorId instead of Namespace/CodeValue
                df = df.withColumn('lakeId', f.concat_ws('_', f.col(districtId_col_name), f.col(schoolYear_col_name), f.col('namespace'), f.col('codeValue')).cast("String"))
            else:
                df = df.withColumn('lakeId', f.lit(None).cast("String"))

        # FIXME: Temporary Fix
        if table_name.lower().endswith('exts'):
            ext_entity_flag = ext_entity
        else:
            ext_entity_flag = None

        # FIXME schoolYearTypes TEMPORARY FIX
        if table_name == 'schoolYearTypes':
            target_schema = target_schema.add(StructField(districtId_col_name, StringType()))\
                                    .add(StructField('LastModifiedDate', TimestampType())) \
                                    .add(StructField('rowIsActive', BooleanType()))
        else:
            target_schema = target_schema.add(StructField(districtId_col_name, StringType()))\
                                        .add(StructField(schoolYear_col_name, StringType()))\
                                        .add(StructField('LastModifiedDate', TimestampType())) \
                                        .add(StructField('rowIsActive', BooleanType()))

        df = self.transform_sub_module(df, 
                                       target_schema, 
                                       sink_general_path, 
                                       schema_name, 
                                       table_name,
                                       extension = None, 
                                       ext_entity = 'ed-fi' if ext_entity_flag is None else ext_entity_flag,
                                       districtId_col_name = districtId_col_name, 
                                       schoolYear_col_name = schoolYear_col_name)

        # FIXME schoolYearTypes TEMPORARY FIX
        if table_name == 'schoolYearTypes':
            df = df.withColumnRenamed("schoolYear", schoolYear_col_name)
        
        if self.test_mode:
            return df

        logger.info(f"[REFINEMENT TRANSFORM] Writing Main Table - {table_name}")
        self.upsert_with_logging(df = df, 
                                    destination_path = f"{sink_general_path}", 
                                    primary_key = 'lakeId', 
                                    table_name = table_name,
                                    ext_entity = 'ed-fi' if ext_entity_flag is None else ext_entity_flag,
                                    parent = True)

        self.oea.add_to_lake_db(source_entity_path = sink_general_path, 
                        overwrite = True,
                        extension = None)

        if '_ext' in df.columns:
            target_schema = self.get_ext_entities_schemas(table_name = table_name,
                                                    ext_column_name = '_ext',
                                                    default_value = ext_entity)
            df = self.flatten_ext_column(df = df, 
                                    table_name = table_name, 
                                    ext_col = '_ext', 
                                    inner_key = ext_entity,
                                    ext_inner_cols = target_schema.fieldNames(),
                                    base_cols = ['lakeId', districtId_col_name, 'LastModifiedDate',schoolYear_col_name, 'rowIsActive','id_pseudonym'])
            sink_general_path = sink_general_path.replace('/ed-fi/', f'/{ext_entity.lower()}/')
            df = self.transform_sub_module(df, 
                                    target_schema, 
                                    sink_general_path, 
                                    schema_name,
                                    table_name,
                                    extension = f"_{ext_entity.lower()}",
                                    ext_entity = ext_entity,
                                    districtId_col_name = districtId_col_name,
                                    schoolYear_col_name = schoolYear_col_name)

            logger.info(f"[REFINEMENT TRANSFORM] Writing EXT Table - {table_name}")
            self.upsert_with_logging(df = df, 
                                    destination_path = f"{sink_general_path}", 
                                    primary_key = 'lakeId', 
                                    table_name = table_name,
                                    ext_entity = ext_entity,
                                    parent = True)

            self.oea.add_to_lake_db(sink_general_path, 
                            overwrite = True,
                            extension = f"_{ext_entity.lower()}")
            return None
            
    def transform_sub_module(self, 
                             df, 
                             target_schema, 
                             sink_general_path, 
                             schema_name, 
                             table_name,
                             extension = None,
                             ext_entity = None,
                             districtId_col_name = 'DistrictId',
                             schoolYear_col_name = 'SchoolYear'):
        # print(districtId_col_name)
        for col_name in target_schema.fieldNames():
            target_col = target_schema[col_name]
            # If Primitive datatype, i.e String, Bool, Integer, etc.abs
            # Note: Descriptor is a String therefore is a Primitive datatype
            if target_col.dataType.typeName() in self.primitive_datatypes:
                # If it is a Descriptor
                if re.search('Descriptor$', col_name) is not None:
                    df = self.modify_descriptor_value(df, col_name, districtId_col_name, schoolYear_col_name)
                else:
                    if col_name in df.columns:
                        # Casting columns to primitive data types
                        df = df.withColumn(col_name, f.col(col_name).cast(target_col.dataType))
                    else:
                        # If Column not present in dataframe, add column with None values.
                        df = df.withColumn(col_name, f.lit(None).cast(target_col.dataType))
            # If Complex datatype, i.e. Object, Array
            else:
                if col_name not in df.columns:
                    df = df.withColumn(col_name, f.lit(None).cast(target_col.dataType))
                else:
                    # Generate JSON column as a Complex Type
                    if (table_name.lower() == 'assessments' and col_name.lower() == 'period') and (target_col.dataType.typeName() != 'array'):
                        # FIXME: Temporary Fix to deal with assessments_periods
                        df = df.withColumn(col_name, f.array(f.col(col_name)))
                        target_col.dataType = f.ArrayType(target_col.dataType)
                    
                    df = df.withColumn(f"{col_name}_json", f.to_json(f.col(col_name))) \
                        .withColumn(col_name, f.from_json(f.col(f"{col_name}_json"), target_col.dataType)) \
                        .drop(f"{col_name}_json")
                
                # Modify the links with surrogate keys
                if re.search('Reference$', col_name) is not None:
                    df = self.flatten_reference_col(df, target_col, districtId_col_name = districtId_col_name, schoolYear_col_name = schoolYear_col_name)
                
                if self.test_mode:
                    return df
        
                if target_col.dataType.typeName() == 'array':
                    # TODO: Experiment with Non Nulls and Optimization (Future)
                    # df_size = df.withColumn("size", F.size(F.col(target_col.name))).cache()
                    # nonNull_count = df_size.filter(F.col("size") >= 1).count()
                    
                    nonNull_count = 1
                    df = self.explode_arrays(df, 
                                             sink_general_path,
                                             target_col, 
                                             schema_name, 
                                             table_name, 
                                             extension = extension, 
                                             ext_entity = ext_entity,
                                             districtId_col_name = districtId_col_name,
                                             schoolYear_col_name = schoolYear_col_name,
                                             parent_cols = ['lakeId', districtId_col_name, schoolYear_col_name, 'LastModifiedDate', 'rowIsActive'],
                                             nonNull_count = nonNull_count)
                    #df_size.unpersist()
        return df
    def get_ext_entities_schemas(self,
                                table_name = 'staffs',
                                ext_column_name = '_ext',
                                default_value = 'TPDM'):
        target_schema = copy.deepcopy(self.schemas[table_name])
        for col_name in target_schema.fieldNames():
            target_col = target_schema[col_name]
            if target_col.name == ext_column_name:
                if target_col.dataType[0].name == default_value:
                    return target_col.dataType[0].dataType         
                    
    def flatten_ext_column(self, 
                           df, 
                           table_name, 
                           ext_col, 
                           inner_key,
                           ext_inner_cols,
                           base_cols = ['lakeId', 'DistrictId', 'LastModifiedDate', 'rowIsActive','SchoolYear', 'id_pseudonym']):
        cols = base_cols
        flattened_cols = ext_inner_cols#["educatorPreparationPrograms"] #_ext_TX_cols[table_name]
        dict_col = F.col(ext_col)[inner_key]
        complex_dtype_text = str(df.select('_ext').dtypes[0][1])

        exprs = [dict_col.getItem(key).alias(key) for key in flattened_cols if str(key) in complex_dtype_text]
        flattened_df = df.select(exprs + cols)
        return flattened_df

    def sink_path_cleanup(self,destination_path):
        pattern = re.compile(r'DistrictId=.*?/|SchoolYear=.*?/')
        destination_path = re.sub(pattern, '', destination_path)

        return destination_path

    def non_common_elements(self, list1, list2):
        unique_in_list1 = set(list1) - set(list2)
        unique_in_list2 = set(list2) - set(list1)
        
        result = list(unique_in_list1) + list(unique_in_list2)
        return result
    
    def non_empty_elements(self, emptyList, nonEmptyList):
        unique_in_list1 = set(emptyList) - set(nonEmptyList)        
        result = list(unique_in_list1)
        return result
   
    def return_non_ext_tables(self):
        table_names = list(self.schemas.keys())
        non_ext_table_names = []
        for table_name in table_names:
            if 'extension' not in table_name.lower():
                non_ext_table_names.append(table_name)
        return non_ext_table_names

### Error Logging

In [ ]:
class ErrorLogging:
    def __init__(self, spark, oea, logger):
        self.spark = spark
        self.oea = oea
        self.logger = logger
        self.pipeline_id = None
        self.spark_session_id = spark.sparkContext.applicationId
        self.test_mode = True
        self.entity_logs = list()
        self.stage_logs = list()
        self.pipeline_logs = list()
        self.etl_logs = None

    # Helper function to generate a random alphanumeric string of specified length
    def generate_random_alphanumeric(self, length):
        return uuid.uuid4().hex[:length]

    def set_logs_prefix(self):
        workspace_name = self.oea.workspace
        self.storage_account = self.oea.storage_account

        if workspace_name == 'prod' or workspace_name == 'production':
            self.etl_logs = f'abfss://stage1@{self.storage_account}.dfs.core.windows.net/Transactional/SAP/etl-logs'
            # self.etl_logs = 'abfss://etl-logs@' + self.storage_account + '.dfs.core.windows.net'
        elif workspace_name == 'dev' or workspace_name == 'development':
            self.etl_logs = f'abfss://oea@{self.storage_account}.dfs.core.windows.net/dev/etl-logs'
        else:
            self.etl_logs = f'abfss://oea@{self.storage_account}.dfs.core.windows.net/sandboxes/{workspace_name}/etl-logs'
        
    def to_logs_url(self, path):
        if self.etl_logs is None:
            self.set_logs_prefix()
        if not path or path == '': raise ValueError('Specified path cannot be empty.')
        if path.startswith('abfss://'): return path # if a url is given, just return that same url (allows to_url to be invoked just in case translation may be needed)
        path_args = path.split('/')
        stage = path_args.pop(0)
        if stage == 'etl-logs': stage = self.etl_logs
        else: raise ValueError("Logs Path must begin with 'etl-logs'")
        url = f"{stage}/{'/'.join(path_args)}"
        logger.debug(f'to_url: {url}')
        return url      

    def create_log_dict(self, **kwargs):
        return kwargs

    def consolidate_logs(self, log_data, log_type):
        if log_type == 'entity':
            log_data['log_type'] = 'entity'
            self.entity_logs.append(log_data)
        elif log_type == 'stage':
            log_data['log_type'] = 'stage'
            self.stage_logs.append(log_data)
        elif log_type == 'pipeline':
            log_data['log_type'] = 'pipeline'
            self.pipeline_logs.append(log_data)
        else:
            raise ValueError('Invalid Log Type')
    def create_spark_df(self, log_type):
        if log_type == 'entity':
            df = self.spark.createDataFrame(self.entity_logs) 
        elif log_type == 'stage':
            df = self.spark.createDataFrame(self.stage_logs) 
        elif log_type == 'pipeline':
            df = self.spark.createDataFrame(self.pipeline_logs)
        else:
            raise ValueError('Invalid Log Type')
        
        return df
    
    def write_logs_to_delta_lake(self, df, log_type,destination_url, partitioning_cols = ['pipelineExecutionId']):
        #TODO: Pending Edits
        # logger.info('Dynamically over-write the partition')
        self.spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
        if log_type == 'entity':
            primary_key = partitioning_cols = ['entityName','entityType', 'stageName','pipelineExecutionId']
        if log_type == 'pipeline':
            primary_key = partitioning_cols = ['pipelineExecutionId']
        if log_type == 'stage':
            primary_key = partitioning_cols = ['stageName','pipelineExecutionId']
        
        self.oea.upsert(df = df, 
                        destination_path = destination_url,
                        primary_key = primary_key,#['RECORD', 'DistrictId', 'SchoolYear'],
                        partitioning = False,
                        partitioning_cols = [],
                        surrogate_key = False)
    
    def add_etl_logs_to_lake_db(self, 
                                db_name, 
                                logs_base_path, 
                                log_type,
                                overwrite = False):
        logs_full_url = self.to_logs_url(f"{logs_base_path}/log_type={log_type}")
        spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
        if overwrite:
            spark.sql(f'DROP TABLE IF EXISTS {db_name}.ETL{log_type}Logs')
        spark.sql(f"CREATE TABLE IF NOT EXISTS {db_name}.ETL{log_type}Logs using DELTA location '{logs_full_url}'")

### Entity Frequency Processor

In [ ]:
class EntityFrequencyProcessor:
    def __init__(self, oea, filepath, highFrequentDelta = 1, moderateFrequentDelta = 5, lowFrequentDelta = 10, descriptorsDelta = 360):
        self.oea = oea
        self.filepath = filepath
        self.highFrequentDelta = timedelta(days = highFrequentDelta)
        self.moderateFrequentDelta = timedelta(days = moderateFrequentDelta)
        self.lowFrequentDelta = timedelta(days = lowFrequentDelta)
        self.descriptorsDelta = timedelta(days = descriptorsDelta)
        
    def load_lookup_df(self): 
        text_data = self.oea.get_text_from_path(self.filepath)
        self.entity_freq_lookup_df = pd.read_csv(StringIO(text_data))

    def set_freq_fiter_maps(self):
        self.highFrequentMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'high') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.highFrequentDelta)
        self.moderateFrequentMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'moderate') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.moderateFrequentDelta)
        self.lowFrequentMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'low') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.lowFrequentDelta)
        self.descriptorsMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'descriptor') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.descriptorsDelta)
    
    def return_entities_to_etl(self):
        today_date = datetime.today() #.date()
        self.entity_freq_lookup_df['temp_timedelta'] = today_date - pd.to_datetime(self.entity_freq_lookup_df['lastrundatetime']) #.dt.date
        self.set_freq_fiter_maps()
        
        entities_to_etl_filter_map = self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap
        entities_to_etl = list(self.entity_freq_lookup_df[entities_to_etl_filter_map]['resource_full_name'].values)
        self.entity_freq_lookup_df.drop(['temp_timedelta'], axis = 1, inplace = True)

        entities_to_etl_dict = self.entity_freq_lookup_df.loc[entities_to_etl_filter_map,['resource_domain', 'resource_sub_name']].groupby('resource_domain').aggregate(list).to_dict()['resource_sub_name']
        return entities_to_etl, entities_to_etl_dict

    def update_lookup_df(self):
        # TODO: WIP
        today_date = datetime.today().date()
        self.entity_freq_lookup_df.loc[self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap, 'lastrundatetime'] = datetime.today()
        self.entity_freq_lookup_df.loc[self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap, 'lastrundate'] = today_date
    
    def write_lookup_df(self, destination_path):
        data_str = self.entity_freq_lookup_df.to_csv(index=False)  # You can customize options based on your needs
        destination_url = self.oea.to_url(destination_path)
        mssparkutils.fs.put(destination_url, data_str, True)
        

In [ ]:
class EntityFrequencyProcessor:
    def __init__(self, oea, filepath, highFrequentDelta = 1, moderateFrequentDelta = 5, lowFrequentDelta = 10, descriptorsDelta = 360):
        self.oea = oea
        self.filepath = filepath
        self.highFrequentDelta = timedelta(days = highFrequentDelta)
        self.moderateFrequentDelta = timedelta(days = moderateFrequentDelta)
        self.lowFrequentDelta = timedelta(days = lowFrequentDelta)
        self.descriptorsDelta = timedelta(days = descriptorsDelta)
        
    def load_lookup_df(self): 
        text_data = self.oea.get_text_from_path(self.filepath)
        self.entity_freq_lookup_df = pd.read_csv(StringIO(text_data))

    def set_freq_fiter_maps(self):
        self.highFrequentMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'high') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.highFrequentDelta)
        self.moderateFrequentMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'moderate') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.moderateFrequentDelta)
        self.lowFrequentMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'low') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.lowFrequentDelta)
        self.descriptorsMap = (self.entity_freq_lookup_df['resource_frequency_code'] == 'descriptor') & (self.entity_freq_lookup_df['temp_timedelta'] >= self.descriptorsDelta)
    
    def return_entities_to_etl(self):
        today_date = datetime.today() #.date()
        self.entity_freq_lookup_df['temp_timedelta'] = today_date - pd.to_datetime(self.entity_freq_lookup_df['lastrundatetime']) #.dt.date
        self.set_freq_fiter_maps()
        
        entities_to_etl_filter_map = self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap
        entities_to_etl = list(self.entity_freq_lookup_df[entities_to_etl_filter_map]['resource_full_name'].values)
        self.entity_freq_lookup_df.drop(['temp_timedelta'], axis = 1, inplace = True)

        entities_to_etl_dict = self.entity_freq_lookup_df.loc[entities_to_etl_filter_map,['resource_domain', 'resource_sub_name']].groupby('resource_domain').aggregate(list).to_dict()['resource_sub_name']
        return entities_to_etl, entities_to_etl_dict

    def edgraph_return_entities_to_etl(self):
        today_date = datetime.today() #.date()
        self.entity_freq_lookup_df['temp_timedelta'] = today_date - pd.to_datetime(self.entity_freq_lookup_df['lastrundatetime']) #.dt.date
        self.set_freq_fiter_maps()
        
        entities_to_etl_filter_map = self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap
        entities_to_etl = list(self.entity_freq_lookup_df[entities_to_etl_filter_map]['resource_full_name'].values)
        self.entity_freq_lookup_df.drop(['temp_timedelta'], axis = 1, inplace = True)

        # entities_to_etl_dict = self.entity_freq_lookup_df.loc[entities_to_etl_filter_map,['resource_domain', 'resource_sub_name']].groupby('resource_domain').aggregate(list).to_dict()['resource_sub_name']
        return entities_to_etl, None



    def update_lookup_df(self):
        # TODO: WIP
        today_date = datetime.today().date()
        self.entity_freq_lookup_df.loc[self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap, 'lastrundatetime'] = datetime.today()
        self.entity_freq_lookup_df.loc[self.highFrequentMap | self.lowFrequentMap | self.moderateFrequentMap | self.descriptorsMap, 'lastrundate'] = today_date
    
    def write_lookup_df(self, destination_path):
        data_str = self.entity_freq_lookup_df.to_csv(index=False)  # You can customize options based on your needs
        destination_url = self.oea.to_url(destination_path)
        mssparkutils.fs.put(destination_url, data_str, True)
        