### Import statments

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

###Function returning config.tbl_details

In [0]:
def config_df():
    """
    This function rteurns the workspace.config.tbl_details Dataframe
    """
    try:
        return spark.sql('select * from workspace.config.tbl_details')
    except Exception as e:
        print(f"failed due to Exception {e}")
        return None

###Looping through the DataFrame

In [0]:


def get_src_tgt_names(df):
    """
    This function loops through the dataframe which we get from the config_df() function 
    and 
    returns a list of tuples of the form (src_tbl_t_name,tgt_tbl_t_name,incremental_column_name,tgt_inc_column)
    """
    try:
        table_details=[]
        for x in df.collect():
            src_catalog = x['src_catalog_name']
            src_database=x['src_database_name']
            src_tbl=x['src_table_name']
            inc_col=x['incremental_column_name']
            src_tbl_t_name=src_catalog+'.'+src_database+'.'+src_tbl
            tgt_catalog=x['tgt_catalog_name']
            tgt_schema=x['tgt_schema_name']
            tgt_tbl=x['tgt_tbl_name']
            tgt_inc_col=x['tgt_inc_column']
            tgt_tbl_t_name=tgt_catalog+'.'+tgt_schema+'.'+tgt_tbl
            table_details.append((src_tbl_t_name,tgt_tbl_t_name,inc_col,tgt_inc_col))
        return table_details
    except Exception as e:
        print(f"failed due to Exception {e}")
        return None


###Getting last Refresh data run from config.control_tbl

In [0]:
def get_last_refresh_date(up_col_name, src_file_name):
    """
    This function gets the last refresh date from the control table
    """
    try:
        query = f"""
            SELECT NVL(MAX({up_col_name}), TO_DATE('1900-01-01')) AS update_date
            FROM workspace.config.control_tbl
            WHERE tbl_name = '{src_file_name}'
            
        """
        older_date = spark.sql(query)
        fallback_date = older_date.collect()[0]['update_date'].strftime('%d-%m-%Y')
        return fallback_date
    except Exception as e:
        print(f"Failed due to Exception: {e}")
        return None


###Creating of source table Dataframe

In [0]:
def creating_src_df(src_tbl_t_name,incremental_column_name,fallback):
    """
    This function creates a dataframe from the data which is available in the source table 
    and returns the dataframe and count of records
    """
    try:
    
        df=spark.sql(f"""
                    select * 
                    from {src_tbl_t_name}
                    where 
                    to_date ({incremental_column_name},"dd-MM-yyyy" )> TO_DATE('{fallback}','dd-MM-yyyy')
                    """).withColumn("ingestion_timestamp",current_date())
        row_count=df.count()
        return df,row_count
    except Exception as e:
        print(f"failed due to Exception {e}")
        return None

###Writing the data to Traget tables

In [0]:
def write_data_to_bronze(df,tgt_tbl_t_name,row_count):
    """
    This function writes the data to the bronze table
    """
    try:
        df.write.format('delta').mode('append').saveAsTable(tgt_tbl_t_name)
        if row_count>=0:
            status="completed"
        else:
            status="failed"
        return status
    except Exception as e:
        print(f"failed due to Exception {e}")
        return None




###Updating value in config.control_tbl

In [0]:
def get_and_update_value(tgt_inc_column,tgt_tbl_t_name,status,src_tbl_t_name):
    """
    This function updates the control table with the latest date and status
    """
    try:
        New_date=spark.sql(f"""
                    select max({tgt_inc_column}) as max_date
                    from
                    {tgt_tbl_t_name}

                    """)
        print(f"New_date is {New_date}")
        if New_date.count()!=0:
            Next_date=New_date.collect()[0]['max_date']
        print(f"Next data is {Next_date}")
        spark.sql(f"""
            Update workspace.config.control_tbl
            set update_date='{Next_date}',
            status='{status}'
            where 
            tbl_name='{src_tbl_t_name}'
            """)
        return True
    except Exception as e:
            print(f"failed due to Exception {e}")
            return None
            
            

##MAIN

In [0]:
if __name__=="__main__":
    df=config_df()
    display(df)

    for src_tbl_t_name,tgt_tbl_t_name,inc_col,tgt_col in get_src_tgt_names(df):
        print(src_tbl_t_name)
        print(tgt_tbl_t_name)
        print(inc_col)


        fall_back_date=get_last_refresh_date(up_col_name='update_date',src_file_name=src_tbl_t_name)
        print(fall_back_date)
        
        df,count=creating_src_df(src_tbl_t_name=src_tbl_t_name,incremental_column_name=inc_col,fallback=fall_back_date)
        print(df)
        print(count)

        status=write_data_to_bronze(df=df,tgt_tbl_t_name=tgt_tbl_t_name,row_count=count)
        print(status)

        result=get_and_update_value(tgt_inc_column=tgt_col,tgt_tbl_t_name=tgt_tbl_t_name,status=status,src_tbl_t_name=src_tbl_t_name)
        print(result)




        
        
        
        


In [0]:
df=spark.sql(f"""select nvl(max(update_date), to_date('01-01-1900','dd-MM-yyyy')) as update_date 
              from workspace.default.control_tbl 
              where tbl_name = 'workspace.default.order_details' 
              and status = 'completed'"""
         )
display(df.collect()[0]['update_date'])