In [None]:
# Below step is only for portfolio project. In real prod, need to use service principal to link storage account to Databricks
spark.sparkContext._jsc.hadoopConfiguration().set(
    "fs.azure.account.key.stenergyplatformadls.dfs.core.windows.net",
    "ACCESS_KEY"
)

### Import libraries and functions

In [None]:
from pyspark.sql.functions import col, when, coalesce, last, monotonically_increasing_id, lit, to_date, year, month, regexp_extract, expr, round
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType, DoubleType
from datetime import datetime

### Define functions

In [None]:
def forward_fill_tablename(df_bronze):
    """
    Identifies table names within the generator_type column and forward-fills 
    them across subsequent rows to maintain context after filtering.
    
    Logic:
    1. Extracts 'table_name' from rows where generator_type matches 'table'.
    2. Uses a Window function over a monotonically increasing ID to forward-fill values.
    3. Filters out the original header/metadata rows to keep only data rows.
    """
    df_with_table = df_bronze.withColumn(
        "table_name",
        when(col("generator_type").rlike("(?i)table"), col("generator_type"))
    )
    df_with_table = df_with_table.repartition(1)

    df = df_with_table.withColumn("row_id", monotonically_increasing_id())
    df = df.withColumn('generator_type', coalesce(col('generator_type'), last('generator_type', True).over(Window.orderBy('row_id')), lit('0')))
    df_filled = df.withColumn('table_name', coalesce(col('table_name'), last('table_name', True).over(Window.orderBy('row_id')), lit('0')))

    df_data = df_filled.filter(~col("generator_type").rlike("(?i)table"))
    df_data = df_data.drop('row_id')

    return df_data

In [None]:
def transform_col_name(original_name, header_value):
    """
    Standardizes raw Excel header strings into clean, machine-readable column names.
    
    Logic:
    - Converts to lowercase and replaces spaces/newlines with underscores.
    - Removes special characters like parentheses and brackets.
    - Normalizes 'fuel' related columns to a single 'fuel' identifier.
    - Strips 'provisional' suffixes for schema consistency.
    """
    name = str(header_value)
    
    name = (name.lower()
            .replace(" ", "_")
            .replace("(", "")
            .replace(")", "")
            .replace("[", "")
            .replace("]", "")
            .replace("\n", "_"))
    
    if "fuel" in name:
        return "fuel"
    
    name = name.replace("_provisional", "")
    return name

In [None]:
def column_mapping(df_bronze):
    """
    Orchestrates the renaming of a DataFrame based on a specific header row 
    found within the data.
    
    Logic:
    1. Locates the row containing 'Generator type'.
    2. Iterates through columns and applies the transform_col_name logic.
    3. Returns a DataFrame with the new aliased schema.
    """
    header_row = df_bronze.filter('_c0 like "Generator type"').limit(1).collect()[0]
    current_cols = df_bronze.columns

    new_columns = [
        col(old_name).alias(transform_col_name(old_name, header_row[i]))
        for i, old_name in enumerate(current_cols)]
    
    df_column_mapped = df_bronze.select(*new_columns)
    return df_column_mapped

In [None]:
def clean_et51_tables(bronze_path, table):
    """
    Reads the excel file from bronze_path and returns a cleaned dataframe
    """
    if table == 'main':
        excel_data_address = "'Main Table'!A6"
    elif table == 'annual':
        excel_data_address = "'Annual'!A5"
    elif table == 'quarter':
        excel_data_address = "'Quarter'!A5"
    #Reading the excel file from bronze container
    df_bronze = (
        spark.read
        .format("com.crealytics.spark.excel")
        .option("header", "false")
        .option("inferSchema", "true")
        .option("dataAddress", excel_data_address)  # sheet name
        .load(bronze_path)
        )
    df_correct_columns = column_mapping(df_bronze)
    df_with_tablename = forward_fill_tablename(df_correct_columns)

    #Remove the header row from the data now that it's in the schema and remove null rows
    df_filtered = df_with_tablename.filter('generator_type not like "Generator type"').filter(col('fuel').isNotNull())

    #Logic for the 'unit' column
    df_with_newcols = df_filtered.withColumn("unit", when(col("table_name").contains("Mtoe"), lit("mtoe"))
        .when(col("table_name").contains("TWh"), lit("TWh"))
        .when(col("table_name").contains("%"), lit("percentage"))
        .otherwise(None)
    )
    df_with_newcols = df_with_newcols.withColumn("unit", when(col("fuel").contains("M tonnes"), "mtoe")
                                        .when(col("fuel").contains("TWh"), "TWh")
                                        .otherwise(col("unit")))\
                                    .withColumn("ingestion_date", lit(datetime.now()))
    
    #Cleaning the 'fuel' column

    # We chain the replacements: remove units, then remove any [note X] patterns
    # \s* handles optional spaces before the brackets
    note_pattern = r"\s*\[note \d+\]"
    unit_patterns = r"\s*\(M tonnes\)|\s*\(TWh\)"

    df_clean = df_with_newcols.withColumn("fuel", 
        regexp_replace(col("fuel"), unit_patterns, "") # Remove (M tonnes) or (TWh)
    ).withColumn("fuel", 
        regexp_replace(col("fuel"), note_pattern, "") # Remove [note X]
    )

    # To remove [note X] from EVERY string column in the dataframe
    for col_name in [c for c, t in df.dtypes if t == 'string']:
        df_clean = df_clean.withColumn(col_name, regexp_replace(col(col_name), note_pattern, ""))

    #Casting datatypes for all the numeric columns
    cols_to_fix = [c for c in df_clean.columns if c not in ["generator_type", "fuel", "table_name", "unit", "ingestion_date"]]

    # Apply transformations: cast to float, fill nulls with 0, and round to 2 decimals
    df_final = df_clean.select(
        "generator_type", "fuel", "table_name", "unit", "ingestion_date",
        *[round(coalesce(col(c).cast("double"), lit(0)), 2).alias(c) for c in cols_to_fix]
    )

    return df_final

### Implementation

In [None]:
bronze_path = "abfss://bronze@stenergyplatformadls.dfs.core.windows.net/energy_trends/ET_5.1_SEP_25.xlsx"

for table in ('main', 'annual', 'quarter'):
    silver_path = f"abfss://silver@stenergyplatformadls.dfs.core.windows.net/energy_trends_generation/{table}/"
    df = clean_et51_tables(bronze_path, table)
    df.write.format("delta").mode("overwrite").save(silver_path)

## Notebook exit

In [None]:

dbsparkutils.notebook.exit("Success")