In [0]:
def get_bronze_table_config(table_name):
    # Read from the control table for bronze configurations
    config_df = spark.sql(f"""
        SELECT source_system, bronze_table_name, load_type, file_location
        FROM bronze_control_table
        WHERE bronze_table_name = '{table_name}'
    """)
    return config_df.collect()[0] if not config_df.isEmpty() else None

In [0]:
def log_to_table(table_name, layer, log_message, error_flag=False):
    timestamp = spark.sql("SELECT current_timestamp()").collect()[0][0]
    spark.sql(f"""
        INSERT INTO logging_table (table_name, layer, log_message, log_timestamp, error_flag)
        VALUES ('{table_name}', '{layer}', '{log_message}', '{timestamp}', {error_flag})
    """)

In [0]:
def log_to_table(table_name, layer, log_message, error_flag=False):
    timestamp = spark.sql("SELECT current_timestamp()").collect()[0][0]
    spark.sql(f"""
        INSERT INTO logging_table (table_name, layer, log_message, log_timestamp, error_flag)
        VALUES ('{table_name}', '{layer}', '{log_message}', '{timestamp}', {error_flag})
    """)

In [0]:
def extract_data(file_location):
    return spark.read.format("parquet").load(file_location)

In [0]:
def load_to_bronze_table(df, bronze_table_name):
    df.write.format("delta").mode("overwrite").saveAsTable(bronze_table_name)

In [0]:
# Get bronze table names from the tracking table
def get_bronze_table_names():
    bronze_tables_df = spark.sql("""
        SELECT table_name 
        FROM tracking_table 
        WHERE layer = 'Bronze'
    """)
    return [row.table_name for row in bronze_tables_df.collect()]

In [0]:
def update_tracking_table(table_name, layer):
    timestamp = spark.sql("SELECT current_timestamp()").collect()[0][0]
    spark.sql(f"""
        MERGE INTO tracking_table AS t
        USING (SELECT '{table_name}' AS table_name, '{layer}' AS layer, '{timestamp}' AS last_loaded) AS s
        ON t.table_name = s.table_name AND t.layer = s.layer
        WHEN MATCHED THEN
            UPDATE SET last_loaded = s.last_loaded
        WHEN NOT MATCHED THEN
            INSERT (table_name, layer, last_loaded) VALUES (s.table_name, s.layer, s.last_loaded)
    """)

In [0]:
# Import the transformation function
dbutils.notebook.run("/Workspace/Metadata_Driven_ETL_Framework/Transformation_Logic/Transform_Bronze_Layer", 600)

def execute_bronze_tasks(table_names):
    """
    Execute ETL tasks for bronze tables.

    Parameters:
    table_names (list): A list of bronze table names to process.
    """
    for table_name in table_names:
        config = get_bronze_table_config(table_name)  # Function to get bronze table config
        if config:
            source_system = config.source_system
            bronze_table_name = config.bronze_table_name
            load_type = config.load_type
            file_location = config.file_location
            
            try:
                # Extract data
                df = extract_data(file_location)
                log_to_table(bronze_table_name, "Bronze", f"Successfully extracted data from {file_location}")

                # Transform data
                transformed_df = transform_data(df)  # Adjust if you have specific transformations for bronze
                log_to_table(bronze_table_name, "Bronze", "Successfully transformed data")

                # Load data into bronze table
                load_to_bronze_table(transformed_df, bronze_table_name)  # Function to load data into bronze
                log_to_table(bronze_table_name, "Bronze", "Successfully loaded data into bronze table")

                # Update tracking table
                update_tracking_table(bronze_table_name, "Bronze")
                log_to_table(bronze_table_name, "Bronze", "Successfully updated tracking table")

            except Exception as e:
                log_to_table(bronze_table_name, "Bronze", f"Error processing {table_name}: {e}", error_flag=True)
                print(f"Error processing {table_name}: {e}")

        else:
            log_to_table(table_name, "Bronze", f"No configuration found for table: {table_name}", error_flag=True)
            print(f"No configuration found for table: {table_name}")

In [0]:
try:
    # Get the list of bronze table names
    bronze_table_names = get_bronze_table_names()
        
    # Execute the bronze tasks
    execute_bronze_tasks(bronze_table_names)
except Exception as e:
    log_to_table("Bronze Execution", "Bronze", f"Error during bronze table operations: {str(e)}", error_flag=True)
    print(f"Error during bronze table operations: {str(e)}")