## Data ETL Notebook

**Layer**: Bronze

**Domain**: Risk-free

**Action**: Ingest NZDM New Zealand Government Securities Currently on Issue

The purpose of this notebook is to ingest the file govtbonds-onissue-YYYY-mm-dd.xlsx from raw_data volume, apply SCD Type 2 data lineage, and write to a bronze data table with full history.

In [0]:
# Install project requirements
%pip install openpyxl

In [0]:
# Import libraries
import pandas as pd
import datetime
from datetime import datetime
from pyspark.sql.functions import lit
from delta.tables import DeltaTable
import os
import re

In [0]:
# Define path for input data file
source_directory = '/Volumes/workspace/riskfree_bronze/raw_data/'
source_file_names = [f for f in os.listdir(source_directory) if os.path.isfile(os.path.join(source_directory, f))]
source_file_names

In [0]:
# Keep only valid source file names
valid_files = [file for file in source_file_names if file.startswith('govtbonds-onissue-') and file.endswith('.xlsx')]
source_file_names = [file for file in source_file_names if file in valid_files]
source_file_names

if not source_file_names:
    dbutils.notebook.exit("No valid source files found.")
else:
    print(source_file_names)

In [0]:
ingestion_timestamp = datetime.datetime.now()
print('Ingestion timestamp:', ingestion_timestamp)

Ingest Nominal & Index-linked Bond Data into bronze table (append, SCD Type 2)

In [0]:
def clean_column_name(col):
    # Remove *, (, ), +, $, strip spaces, replace spaces and - with _, lowercase
    col = re.sub(r'[*()+$]', '', col)
    col = col.strip().replace(' ', '_').replace('-', '_').lower()
    col = re.sub(r'_+', '_', col)  # Replace multiple underscores with one
    return col

In [0]:
for file in source_file_names:    
    
    # Set location
    excel_path = f"{source_directory}{file}"

    # Imports & Config
    sheet_name = "Sheet1"

    # Load Excel Sheet
    df = pd.read_excel(excel_path, sheet_name=sheet_name)

    # Extract publish date
    publish_date = df.iloc[0, 0]
    publish_date = publish_date.replace('Published: ', '')
    publish_date = datetime.strptime(publish_date, '%d %B %Y')
    publish_date = publish_date.strftime('%Y-%m-%d')

    # Extract as of date
    as_of_date = df.columns[0]
    as_of_date = as_of_date.replace('New Zealand Government Securities Currently on Issue as at: ', '')
    as_of_date = datetime.strptime(as_of_date, '%d %B %Y')
    as_of_date = as_of_date.strftime('%Y-%m-%d')

    ### Non-linked Assets Amounts Data ###

    # Extract as of nominals amounts data
    start_row = df[df.iloc[:, 0] == 'Date first Issued'].index[0]
    end_row = df.iloc[start_row:, 0].isnull().idxmax()
    df_nominals = df.iloc[start_row:end_row].reset_index(drop=True)
    df_nominals.columns = df_nominals.iloc[0]
    df_nominals = df_nominals[1:].reset_index(drop=True)

    df_nominals.columns = [clean_column_name(col) for col in df_nominals.columns]

    # Add additional info
    df_nominals["publish_date"] = publish_date
    df_nominals["as_of_date"] = as_of_date
    df_nominals["source_file_name"] = file
    df_nominals["ingestion_timestamp"] = ingestion_timestamp

    df_nominals["coupon"] = df_nominals["coupon"].astype(str)

    # Convert to Spark and write to Delta
    spark_df_nominals = spark.createDataFrame(df_nominals)

    # Convert to Spark & Add SCD2 fields
    spark_df_nominals = spark_df_nominals \
        .withColumn("effective_start", lit(ingestion_timestamp)) \
        .withColumn("effective_end", lit(None).cast("timestamp")) \
        .withColumn("is_current", lit(True))

    # Create the Delta table if it does not exist
    spark.sql("""
    CREATE TABLE IF NOT EXISTS workspace.riskfree_bronze.nzdm_govtbonds_onissue (
        date_first_issued STRING,
        loan_prefix STRING,
        coupon STRING,
        maturity DATE,
        total_amt_outstanding_incl_rb_eqc_sresl_m DOUBLE,
        rbnz_m DOUBLE,
        eqc_m DOUBLE,
        sresl_m DOUBLE,
        market_bonds_m DOUBLE,
        publish_date STRING,
        as_of_date DATE,
        source_file_name STRING,
        ingestion_timestamp TIMESTAMP,
        effective_start TIMESTAMP,
        effective_end TIMESTAMP,
        is_current BOOLEAN
    ) USING DELTA
    """)

    # SCD Type 2 Merge
    delta_table = DeltaTable.forName(spark, 'workspace.riskfree_bronze.nzdm_govtbonds_onissue')
    delta_table.alias("t").merge(
        spark_df_nominals.alias("s"),
        "t.loan_prefix = s.loan_prefix AND t.is_current = true"
    ).whenMatchedUpdate(
        condition="t.total_amt_outstanding_incl_rb_eqc_sresl_m != s.total_amt_outstanding_incl_rb_eqc_sresl_m AND t.rbnz_m != s.rbnz_m AND t.eqc_m != s.eqc_m AND t.sresl_m != s.sresl_m AND t.market_bonds_m != s.market_bonds_m AND t.coupon != s.coupon",
        set={
            "effective_end": "s.effective_start",
            "is_current": "false"
        }
    ).whenNotMatchedInsertAll().execute()

    print("workspace.riskfree_bronze.nzdm_govtbonds_onissue ", file)

    ### Linked Assets Amounts Data ###

    # Extract as of nominals amounts data
    start_row = df[df.iloc[:, 0] == 'Maturity'].index[0]
    end_row = df.iloc[start_row:, 0].isnull().idxmax()
    df_linked = df.iloc[start_row:end_row].reset_index(drop=True)
    df_linked.columns = df_linked.iloc[0]
    df_linked = df_linked[1:].reset_index(drop=True)

    df_linked.columns = [clean_column_name(col) for col in df_linked.columns]

    # Add additional info
    df_linked["publish_date"] = publish_date
    df_linked["as_of_date"] = as_of_date
    df_linked["source_file_name"] = file
    df_linked["ingestion_timestamp"] = ingestion_timestamp

    # Convert to Spark and write to Delta
    spark_df_linked = spark.createDataFrame(df_linked)

    # Convert to Spark & Add SCD2 fields
    spark_df_linked = spark_df_linked \
        .withColumn("effective_start", lit(ingestion_timestamp)) \
        .withColumn("effective_end", lit(None).cast("timestamp")) \
        .withColumn("is_current", lit(True))

    # Create the Delta table if it does not exist
    spark.sql("""
    CREATE TABLE IF NOT EXISTS workspace.riskfree_bronze.nzdm_govtbonds_onissue_linked (
        maturity DATE,
        last_coupon DATE,
        next_coupon DATE,
        kt_1 DOUBLE,
        p DOUBLE,
        kt DOUBLE,
        face_value DOUBLE,
        indexation_value DOUBLE,
        current_principal DOUBLE,
        publish_date STRING,
        as_of_date DATE,
        source_file_name STRING,
        ingestion_timestamp TIMESTAMP,
        effective_start TIMESTAMP,
        effective_end TIMESTAMP,
        is_current BOOLEAN
    ) USING DELTA
    """)

    # SCD Type 2 Merge
    delta_table = DeltaTable.forName(spark, 'workspace.riskfree_bronze.nzdm_govtbonds_onissue_linked')
    delta_table.alias("t").merge(
        spark_df_linked.alias("s"),
        "t.maturity = s.maturity AND t.is_current = true"
    ).whenMatchedUpdate(
        condition="t.kt_1 != s.kt_1 AND t.p != s.p AND t.kt != s.kt AND t.face_value != s.face_value AND t.indexation_value != s.indexation_value AND t.current_principal != s.current_principal",
        set={
            "effective_end": "s.effective_start",
            "is_current": "false"
        }
    ).whenNotMatchedInsertAll().execute()

    print("workspace.riskfree_bronze.nzdm_govtbonds_onissue_linked ", file)

Move the xlsx into archive folder with date name

In [0]:
for file in source_file_names:

    # Set location
    excel_path = f"{source_directory}{file}"

    # Excel archive name
    excel_path_archive = f"/Volumes/workspace/riskfree_bronze/raw_data/archive/{file}"

    # Move the processed Excel file to the archive directory
    dbutils.fs.mv(excel_path, excel_path_archive)