# Main ETL for Loan Level Data

## Initial set up

In [1]:
import glob
import sys
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import DateType, StringType, DoubleType, BooleanType, TimestampType
from functools import reduce
import pandas as pd

SPARK = SparkSession.builder.master("local[*]").getOrCreate()
GOLD_SOURCE_DIR = "../data/output/silver"
QNDL_PKL = "../data/MacrodataFiles11Oct2022/macro_features.pkl"

### ASSET GOLD

In [2]:
def set_job_params():
    """
    Setup parameters used for this module.

    :return config: dictionary with properties used in this job.
    """
    config = {}
    config["SOURCE_DIR"] = GOLD_SOURCE_DIR
    # config["DATE_COLUMNS"] = [
    #     "AS1",
    #     "AS19",
    #     "AS20",
    #     "AS31",
    #     "AS50",
    #     "AS51",
    #     "AS67",
    #     "AS70",
    #     "AS71",
    #     "AS87",
    #     "AS91",
    #     "AS112",
    #     "AS124",
    #     "AS127",
    #     "AS130",
    #     "AS133",
    #     "AS134",
    #     "AS137",
    # ]
    # config["ASSET_COLUMNS"] = {
    #     "AS1": DateType(),
    #     "AS2": StringType(),
    #     "AS3": StringType(),
    #     "AS4": StringType(),
    #     "AS5": StringType(),
    #     "AS6": StringType(),
    #     "AS7": StringType(),
    #     "AS8": StringType(),
    #     "AS15": StringType(),
    #     "AS16": StringType(),
    #     "AS17": StringType(),
    #     "AS18": StringType(),
    #     "AS19": DateType(),
    #     "AS20": DateType(),
    #     "AS21": StringType(),
    #     "AS22": StringType(),
    #     "AS23": BooleanType(),
    #     "AS24": StringType(),
    #     "AS25": StringType(),
    #     "AS26": StringType(),
    #     "AS27": DoubleType(),
    #     "AS28": DoubleType(),
    #     "AS29": BooleanType(),
    #     "AS30": DoubleType(),
    #     "AS31": DateType(),
    #     "AS32": StringType(),
    #     "AS33": StringType(),
    #     "AS34": StringType(),
    #     "AS35": StringType(),
    #     "AS36": StringType(),
    #     "AS37": DoubleType(),
    #     "AS38": DoubleType(),
    #     "AS39": DoubleType(),
    #     "AS40": DoubleType(),
    #     "AS41": DoubleType(),
    #     "AS42": StringType(),
    #     "AS43": StringType(),
    #     "AS44": DoubleType(),
    #     "AS45": StringType(),
    #     "AS50": DateType(),
    #     "AS51": DateType(),
    #     "AS52": StringType(),
    #     "AS53": BooleanType(),
    #     "AS54": DoubleType(),
    #     "AS55": DoubleType(),
    #     "AS56": DoubleType(),
    #     "AS57": StringType(),
    #     "AS58": StringType(),
    #     "AS59": StringType(),
    #     "AS60": DoubleType(),
    #     "AS61": DoubleType(),
    #     "AS62": StringType(),
    #     "AS63": DoubleType(),
    #     "AS64": DoubleType(),
    #     "AS65": StringType(),
    #     "AS66": DoubleType(),
    #     "AS67": DateType(),
    #     "AS68": StringType(),
    #     "AS69": DoubleType(),
    #     "AS70": DateType(),
    #     "AS71": DateType(),
    #     "AS80": DoubleType(),
    #     "AS81": DoubleType(),
    #     "AS82": DoubleType(),
    #     "AS83": StringType(),
    #     "AS84": StringType(),
    #     "AS85": DoubleType(),
    #     "AS86": DoubleType(),
    #     "AS87": DateType(),
    #     "AS88": DoubleType(),
    #     "AS89": StringType(),
    #     "AS90": DoubleType(),
    #     "AS91": DateType(),
    #     "AS92": StringType(),
    #     "AS93": DoubleType(),
    #     "AS94": StringType(),
    #     "AS100": DoubleType(),
    #     "AS101": DoubleType(),
    #     "AS102": DoubleType(),
    #     "AS103": DoubleType(),
    #     "AS104": DoubleType(),
    #     "AS105": DoubleType(),
    #     "AS106": DoubleType(),
    #     "AS107": DoubleType(),
    #     "AS108": DoubleType(),
    #     "AS109": DoubleType(),
    #     "AS110": DoubleType(),
    #     "AS111": StringType(),
    #     "AS112": DateType(),
    #     "AS115": DoubleType(),
    #     "AS116": DoubleType(),
    #     "AS117": DoubleType(),
    #     "AS118": DoubleType(),
    #     "AS119": DoubleType(),
    #     "AS120": DoubleType(),
    #     "AS121": BooleanType(),
    #     "AS122": BooleanType(),
    #     "AS123": StringType(),
    #     "AS124": DateType(),
    #     "AS125": DoubleType(),
    #     "AS126": DoubleType(),
    #     "AS127": DateType(),
    #     "AS128": DoubleType(),
    #     "AS129": StringType(),
    #     "AS130": DateType(),
    #     "AS131": BooleanType(),
    #     "AS132": DoubleType(),
    #     "AS133": DateType(),
    #     "AS134": DateType(),
    #     "AS135": DoubleType(),
    #     "AS136": DoubleType(),
    #     "AS137": DateType(),
    #     "AS138": DoubleType(),
    # }
    return config


run_props = set_job_params()
date_table = SPARK.read.parquet(f'{run_props["SOURCE_DIR"]}/assets/date_table.parquet')
financial_info_table = SPARK.read.parquet(f'{run_props["SOURCE_DIR"]}/assets/financial_info_table.parquet')
interest_rate_table = SPARK.read.parquet(f'{run_props["SOURCE_DIR"]}/assets/interest_rate_table.parquet')
loan_info_table = SPARK.read.parquet(f'{run_props["SOURCE_DIR"]}/assets/loan_info_table.parquet')
obligor_info_table = SPARK.read.parquet(f'{run_props["SOURCE_DIR"]}/assets/obligor_info_table.parquet')
performance_info_table = SPARK.read.parquet(f'{run_props["SOURCE_DIR"]}/assets/performance_info_table.parquet')


In [4]:
(
    date_table.show(n=100)
)

+----------+----------+----+-----+-------+---+---+
|  date_col| unix_date|year|month|quarter|WoY|day|
+----------+----------+----+-----+-------+---+---+
|2016-03-01|1456790400|2016|    3|      1|  9|  1|
|2013-01-22|1358812800|2013|    1|      1|  4| 22|
|2007-04-20|1177027200|2007|    4|      2| 16| 20|
|1999-02-01| 917827200|1999|    2|      1|  5|  1|
|2013-09-09|1378684800|2013|    9|      3| 37|  9|
|2019-11-01|1572566400|2019|   11|      4| 44|  1|
|2030-10-01|1917043200|2030|   10|      4| 40|  1|
|2013-09-19|1379548800|2013|    9|      3| 38| 19|
|2018-09-01|1535760000|2018|    9|      3| 35|  1|
|2008-02-01|1201824000|2008|    2|      1|  5|  1|
|2014-08-01|1406851200|2014|    8|      3| 31|  1|
|2006-12-01|1164931200|2006|   12|      4| 48|  1|
|2013-02-02|1359763200|2013|    2|      1|  5|  2|
|2010-12-01|1291161600|2010|   12|      4| 48|  1|
|2018-11-01|1541030400|2018|   11|      4| 44|  1|
|2033-10-01|2011737600|2033|   10|      4| 39|  1|
|2013-09-12|1378944000|2013|   

In [5]:
obligor_info_table.show(n=100)

+--------------------+----------+---+---------------+----+----+--------------+----------+----+----+----+-----+----+----+----+----+----+-----+----+----+----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+-----+
|             ed_code|       AS1|AS2|            AS3| AS4| AS5|           AS6|       AS7| AS8|AS15|AS16| AS17|AS18|AS19|AS20|AS21|AS22| AS23|AS24|AS25|AS26|AS27|AS28| AS29|AS30|AS31|AS32|AS33|AS34|AS35|AS36|AS37|AS38|AS39|AS40|AS41| AS42|AS43|AS44|AS45|year|month|
+--------------------+----------+---+---------------+----+----+--------------+----------+----+----+----+-----+----+----+----+----+----+-----+----+----+----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+-----+
|SMESES00017610032...|1356912000|412|000807331101532|0081|0081|BANC SABADELL |002160545 |null|  ES| 30 |ES620|   2|null|null|null|null|false|null|   1|   2|null|null|false|null|null|null|null|null|null|nul

### COLLATERAL SILVER

In [3]:
def set_job_params():
    """
    Setup parameters used for this module.

    :return config: dictionary with properties used in this job.
    """
    config = {}
    config["SOURCE_DIR"] = SILVER_SOURCE_DIR
    config["DATE_COLUMNS"] = ["CS11", "CS12", "CS22"]
    config["COLLATERAL_COLUMNS"] = {
        "CS1": StringType(),
        "CS2": StringType(),
        "CS3": StringType(),
        "CS4": DoubleType(),
        "CS5": DoubleType(),
        "CS6": StringType(),
        "CS7": BooleanType(),
        "CS8": BooleanType(),
        "CS9": BooleanType(),
        "CS10": DoubleType(),
        "CS11": DateType(),
        "CS12": DateType(),
        "CS13": StringType(),
        "CS14": StringType(),
        "CS15": DoubleType(),
        "CS16": StringType(),
        "CS17": StringType(),
        "CS18": DoubleType(),
        "CS19": DoubleType(),
        "CS20": StringType(),
        "CS21": DoubleType(),
        "CS22": DateType(),
        "CS23": StringType(),
        "CS24": StringType(),
        "CS25": StringType(),
        "CS26": StringType(),
        "CS27": StringType(),
        "CS28": DoubleType(),
    }
    return config

def replace_no_data(df):
    """
    Replace ND values inside the dataframe
    TODO: ND are associated with labels that explain why the vaue is missing.
          Should handle this information better in future releases.
    :param df: Spark dataframe with loan asset data.
    :return df: Spark dataframe without ND values.
    """
    for col_name in df.columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name).startswith("ND"), None).otherwise(F.col(col_name)),
        )
    return df


def replace_bool_data(df):
    """
    Replace Y/N with boolean flags in the dataframe.

    :param df: Spark dataframe with loan asset data.
    :return df: Spark dataframe without Y/N values.
    """
    for col_name in df.columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name) == "Y", "True")
            .when(F.col(col_name) == "N", "False")
            .otherwise(F.col(col_name)),
        )
    return df


def cast_to_datatype(df, columns):
    """
    Cast data to the respective datatype.

    :param df: Spark dataframe with loan asset data.
    :param columns: collection of column names and respective data types.
    :return df: Spark dataframe with correct values.
    """
    for col_name, data_type in columns.items():
        if data_type == BooleanType():
            df = (
                df.withColumn(col_name, F.col(col_name).contains("True"))
            )
        if data_type == DateType():
            df = (
                df.withColumn(col_name, F.to_date(F.col(col_name)))
            )
        if data_type == DoubleType():
            df = (
                df.withColumn(
                    col_name, F.round(F.col(col_name).cast(DoubleType()), 2)
                )
            )
    return df

def process_dates(df, date_cols_list):
    """
    Extract dates dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :param date_cols_list: list of date columns.
    :return new_df: silver type Spark dataframe.
    """
    date_cols = [c for c in date_cols_list if c in df.columns]

    new_df = (
        df.select(F.explode(F.array(date_cols)).alias("date_col"))
        .dropDuplicates()
        .withColumn("unix_date", F.unix_timestamp(F.col("date_col")))
        .withColumn("year", F.year(F.col("date_col")))
        .withColumn("month", F.month(F.col("date_col")))
        .withColumn("quarter", F.quarter(F.col("date_col")))
        .withColumn("WoY", F.weekofyear(F.col("date_col")))
        .withColumn("day", F.dayofmonth(F.col("date_col")))
    )
    return new_df


def process_collateral_info(df):
    """
    Extract collateral info dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :return new_df: silver type Spark dataframe.
    """
    new_df = (
        df
        .dropDuplicates()
        .withColumn(
            "CS11", F.unix_timestamp(F.to_timestamp(F.col("CS11"), "yyyy-MM"))
        )
        .withColumn(
            "CS12", F.unix_timestamp(F.to_timestamp(F.col("CS12"), "yyyy-MM"))
        )
        .withColumn(
            "CS22", F.unix_timestamp(F.to_timestamp(F.col("CS22"), "yyyy-MM"))
        )
    )
    return new_df


print("Start COLLATERAL SILVER job.")
run_props = set_job_params()
bronze_df = SPARK.read.parquet(
    f'{run_props["SOURCE_DIR"]}/collaterals.parquet'
).filter("iscurrent == 1").drop("valid_from", "valid_to", "checksum", "iscurrent")
print("Remove ND values.")
tmp_df1 = replace_no_data(bronze_df)
print("Replace Y/N with boolean flags.")
tmp_df2 = replace_bool_data(tmp_df1)
print("Cast data to correct types.")
cleaned_df = cast_to_datatype(tmp_df2, run_props["COLLATERAL_COLUMNS"])
print("Generate collateral info dataframe")
info_df = process_collateral_info(cleaned_df)
print("Generate time dataframe")
date_df = process_dates(cleaned_df, run_props["DATE_COLUMNS"])

print("Write dataframe")

(
    info_df.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet("../data/output/silver/collaterals/info_table.parquet")
)
(
    date_df.write
    .mode("overwrite")
    .parquet("../data/output/silver/collaterals/date_table.parquet")
)

Start COLLATERAL SILVER job.
Remove ND values.
Replace Y/N with boolean flags.
Cast data to correct types.
Generate collateral info dataframe
Generate time dataframe
Write dataframe


### BOND INFO SILVER

In [4]:
def set_job_params():
    """
    Setup parameters used for this module.

    :return config: dictionary with properties used in this job.
    """
    config = {}
    config["SOURCE_DIR"] = SILVER_SOURCE_DIR
    config["DATE_COLUMNS"] = ["BS1", "BS27", "BS28", "BS38", "BS39"]
    config["BOND_COLUMNS"] = {
        "BS1": DateType(),
        "BS2": StringType(),
        "BS3": DoubleType(),
        "BS4": DoubleType(),
        "BS5": BooleanType(),
        "BS6": StringType(),
        "BS11": DoubleType(),
        "BS12": BooleanType(),
        "BS13": DoubleType(),
        "BS19": StringType(),
        "BS20": StringType(),
        "BS25": StringType(),
        "BS26": StringType(),
        "BS27": DateType(),
        "BS28": DateType(),
        "BS29": StringType(),
        "BS30": DoubleType(),
        "BS31": DoubleType(),
        "BS32": StringType(),
        "BS33": DoubleType(),
        "BS34": DoubleType(),
        "BS35": DoubleType(),
        "BS36": DoubleType(),
        "BS37": DoubleType(),
        "BS38": DateType(),
        "BS39": DateType()
    }
    return config

def replace_no_data(df):
    """
    Replace ND values inside the dataframe
    TODO: ND are associated with labels that explain why the vaue is missing.
          Should handle this information better in future releases.
    :param df: Spark dataframe with loan asset data.
    :return df: Spark dataframe without ND values.
    """
    for col_name in df.columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name).startswith("ND"), None).otherwise(F.col(col_name)),
        )
    return df


def replace_bool_data(df):
    """
    Replace Y/N with boolean flags in the dataframe.

    :param df: Spark dataframe with loan asset data.
    :return df: Spark dataframe without Y/N values.
    """
    for col_name in df.columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name) == "Y", "True")
            .when(F.col(col_name) == "N", "False")
            .otherwise(F.col(col_name)),
        )
    return df


def cast_to_datatype(df, columns):
    """
    Cast data to the respective datatype.

    :param df: Spark dataframe with loan asset data.
    :param columns: collection of column names and respective data types.
    :return df: Spark dataframe with correct values.
    """
    for col_name, data_type in columns.items():
        if data_type == BooleanType():
            df = (
                df.withColumn(col_name, F.col(col_name).contains("True"))
            )
        if data_type == DateType():
            df = (
                df.withColumn(col_name, F.to_date(F.col(col_name)))
            )
        if data_type == DoubleType():
            df = (
                df.withColumn(
                    col_name, F.round(F.col(col_name).cast(DoubleType()), 2)
                )
            )
    return df

def get_columns_collection(df):
    """
    Get collection of dataframe columns divided by topic.

    :param df: Asset bronze Spark dataframe.
    :return cols_dict: collection of columns labelled by topic.
    """
    cols_dict = {
        "bond_info":["ed_code", "year", "month"]
        + [f"BS{i}" for i in range(1, 11) if f"BS{i}" in df.columns],
        "collateral_info": ["ed_code", "year", "month"]
        + [f"BS{i}" for i in range(11, 19) if f"BS{i}" in df.columns],
        "contact_info": ["ed_code", "year", "month"]
        + [f"BS{i}" for i in range(19, 25) if f"BS{i}" in df.columns],
        "tranche_info": ["ed_code", "year", "month"]
        + [f"BS{i}" for i in range(25, 40) if f"BS{i}" in df.columns],
    }
    return cols_dict


def process_dates(df, date_cols_list):
    """
    Extract dates dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :param date_cols_list: list of date columns.
    :return new_df: silver type Spark dataframe.
    """
    date_cols = [c for c in date_cols_list if c in df.columns]

    new_df = (
        df.select(F.explode(F.array(date_cols)).alias("date_col"))
        .dropDuplicates()
        .withColumn("unix_date", F.unix_timestamp(F.col("date_col")))
        .withColumn("year", F.year(F.col("date_col")))
        .withColumn("month", F.month(F.col("date_col")))
        .withColumn("quarter", F.quarter(F.col("date_col")))
        .withColumn("WoY", F.weekofyear(F.col("date_col")))
        .withColumn("day", F.dayofmonth(F.col("date_col")))
    )
    return new_df


def process_bond_info(df, cols_dict):
    """
    Extract bond info dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :param cols_dict: collection of columns labelled by their topic.
    :return new_df: silver type Spark dataframe.
    """
    new_df = (
        df.select(cols_dict["bond_info"])
        .dropDuplicates()
        .withColumn(
            "BS1", F.unix_timestamp(F.to_timestamp(F.col("BS1"), "yyyy-MM-dd"))
        )
    )
    return new_df


def process_collateral_info(df, cols_dict):
    """
    Extract collateral info dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :param cols_dict: collection of columns labelled by their topic.
    :return new_df: silver type Spark dataframe.
    """
    new_df = (
        df.select(["BS1", "BS2"] + cols_dict["collateral_info"])
        .dropDuplicates()
        .withColumn(
            "BS1", F.unix_timestamp(F.to_timestamp(F.col("BS1"), "yyyy-MM-dd"))
        )
    )
    return new_df


def process_contact_info(df, cols_dict):
    """
    Extract contact info dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :param cols_dict: collection of columns labelled by their topic.
    :return new_df: silver type Spark dataframe.
    """
    new_df = (
        df.select(["BS1", "BS2"] + cols_dict["contact_info"])
        .dropDuplicates()
        .withColumn(
            "BS1", F.unix_timestamp(F.to_timestamp(F.col("BS1"), "yyyy-MM-dd"))
        )
    )
    return new_df

def process_tranche_info(df, cols_dict):
    """
    Extract tranche info dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :param cols_dict: collection of columns labelled by their topic.
    :return new_df: silver type Spark dataframe.
    """
    new_df = (
        df.select(["BS1", "BS2"] + cols_dict["tranche_info"])
        .dropDuplicates()
        .withColumn(
            "BS1", F.unix_timestamp(F.to_timestamp(F.col("BS1"), "yyyy-MM-dd"))
        )
        .withColumn(
            "BS27", F.unix_timestamp(F.to_timestamp(F.col("BS27"), "yyyy-MM-dd"))
        )
        .withColumn(
            "BS28", F.unix_timestamp(F.to_timestamp(F.col("BS28"), "yyyy-MM-dd"))
        )
        .withColumn(
            "BS38", F.unix_timestamp(F.to_timestamp(F.col("BS38"), "yyyy-MM-dd"))
        )
        .withColumn(
            "BS39", F.unix_timestamp(F.to_timestamp(F.col("BS39"), "yyyy-MM-dd"))
        )
    )
    return new_df


print("Start BOND INFO SILVER job.")
run_props = set_job_params()
bronze_df = SPARK.read.parquet(
    f'{run_props["SOURCE_DIR"]}/bond_info.parquet'
).filter("iscurrent == 1").drop("valid_from", "valid_to", "checksum", "iscurrent")
print("Remove ND values.")
tmp_df1 = replace_no_data(bronze_df)
print("Replace Y/N with boolean flags.")
tmp_df2 = replace_bool_data(tmp_df1)
print("Cast data to correct types.")
cleaned_df = cast_to_datatype(tmp_df2, run_props["BOND_COLUMNS"])
bond_info_columns = get_columns_collection(cleaned_df)
print("Generate time dataframe")
date_df = process_dates(cleaned_df, run_props["DATE_COLUMNS"])
print("Generate bond info dataframe")
info_df = process_bond_info(cleaned_df, bond_info_columns)
print("Generate collateral info dataframe")
collateral_df = process_collateral_info(cleaned_df, bond_info_columns)
print("Generate contact info dataframe")
contact_df = process_contact_info(cleaned_df, bond_info_columns)
print("Generate tranche info dataframe")
tranche_df = process_tranche_info(cleaned_df, bond_info_columns)

print("Write dataframe")

(
    date_df.write
    .mode("overwrite")
    .parquet("../data/output/silver/bond_info/date_table.parquet")
)
(
    info_df.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet("../data/output/silver/bond_info/info_table.parquet")
)
(
    collateral_df.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet("../data/output/silver/bond_info/collaterals_table.parquet")
)
(
    contact_df.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet("../data/output/silver/bond_info/contacts_table.parquet")
)
(
    tranche_df.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet("../data/output/silver/bond_info/trache_info_table.parquet")
)


Start BOND INFO SILVER job.
Remove ND values.
Replace Y/N with boolean flags.
Cast data to correct types.
Generate time dataframe
Generate bond info dataframe
Generate collateral info dataframe
Generate contact info dataframe
Generate tranche info dataframe
Write dataframe


### AMORTISATION SILVER

In [15]:
def set_job_params():
    """
    Setup parameters used for this module.

    :return config: dictionary with properties used in this job.
    """
    config = {}
    config["SOURCE_DIR"] = SILVER_SOURCE_DIR
    config["AMORTISATION_COLUMNS"] = {
        "AS3": StringType(),
        "AS150": DoubleType(),
        "AS151": DateType(),
        "AS1348": DoubleType(),
        "AS1349": DateType(),
    }
    for i in range(152, 1348):
        if i % 2 == 0:
            config["AMORTISATION_COLUMNS"][f"AS{i}"] = DoubleType()
        else:
            config["AMORTISATION_COLUMNS"][f"AS{i}"] = DateType()
    return config


def _melt(df, id_vars, value_vars, var_name="FEATURE_NAME", value_name="FEATURE_VALUE"):
    """Convert DataFrame from wide to long format."""
    # Ref:https://stackoverflow.com/a/41673644
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = F.array(
        *(
            F.struct(F.lit(c).alias(var_name), F.col(c).alias(value_name))
            for c in value_vars
        )
    )
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", F.explode(_vars_and_vals))
    cols = id_vars + [
        F.col("_vars_and_vals")[x].cast("string").alias(x)
        for x in [var_name, value_name]
    ]
    return _tmp.select(*cols)


def unpivot_dataframe(df, columns):
    """
    Convert dataframe from wide to long table.

    :param df: raw Spark dataframe.
    :param columns: data columns with respective datatype.
    :return new_df: unpivot Spark dataframe.
    """
    df = df.withColumn("AS3", F.concat_ws("_",F.col("AS3"), F.monotonically_increasing_id()))
    date_columns = [
        k for k, v in columns.items() if v == DateType() and k in df.columns
    ]
    double_columns = [
        k for k, v in columns.items() if v == DoubleType() and k in df.columns
    ]

    date_df = (
            _melt(
                df,
                id_vars=["AS3"],
                value_vars=date_columns,
                var_name="DATE_COLUMNS",
                value_name="DATE_VALUE",
                )
            .filter(F.col("DATE_VALUE").isNotNull())
        )
    double_df = (
            _melt(
                df,
                id_vars=["AS3"],
                value_vars=double_columns,
                var_name="DOUBLE_COLUMNS",
                value_name="DOUBLE_VALUE",
                )
        )
    scd2_df = df.select("AS3","ed_code", "year", "month")
    new_df = (  
        date_df
        .join(double_df, on="AS3", how="inner")
        .join(scd2_df, on="AS3", how="inner")
        .withColumn("AS3", F.split(F.col("AS3"),"_").getItem(0))
    )
    return new_df

def cast_to_datatype(df, columns):
    """
    Cast data to the respective datatype.

    :param df: Spark dataframe with loan asset data.
    :param columns: collection of column names and respective data types.
    :return df: Spark dataframe with correct values.
    """
    for col_name, data_type in columns.items():
        if data_type == DateType():
            df = df.withColumn(col_name, F.to_date(F.col(col_name)))
        if data_type == DoubleType():
            df = df.withColumn(col_name, F.round(F.col(col_name).cast(DoubleType()), 2))
    return df

def process_dates(df):
    """
    Extract dates dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :return new_df: silver type Spark dataframe.
    """
    new_df = (
        df.select("DATE_VALUE")
        .withColumnRenamed("DATE_VALUE", "date_col")
        .dropDuplicates()
        .withColumn("unix_date", F.unix_timestamp(F.col("date_col")))
        .withColumn("year", F.year(F.col("date_col")))
        .withColumn("month", F.month(F.col("date_col")))
        .withColumn("quarter", F.quarter(F.col("date_col")))
        .withColumn("WoY", F.weekofyear(F.col("date_col")))
        .withColumn("day", F.dayofmonth(F.col("date_col")))
    )
    return new_df

def process_info(df):
    """
    Extract amortisation values dimension from bronze Spark dataframe.

    :param df: Spark bronze dataframe.
    :return new_df: silver type Spark dataframe.
    """
    new_df = df.withColumn("DATE_VALUE", F.unix_timestamp(F.col("DATE_VALUE")))
    return new_df


print("Start AMORTISATION SILVER job.")
run_props = set_job_params()
bronze_df = SPARK.read.parquet(
    f'{run_props["SOURCE_DIR"]}/amortisation.parquet'
).filter("iscurrent == 1").drop("valid_from", "valid_to", "checksum", "iscurrent")
print("Cast data to correct types.")
tmp_df1 = unpivot_dataframe(bronze_df, run_props["AMORTISATION_COLUMNS"])
cleaned_df = (
    tmp_df1.withColumn("DATE_VALUE", F.to_date(F.col("DATE_VALUE")))
    .withColumn(
        "DOUBLE_VALUE", F.round(F.col("DOUBLE_VALUE").cast(DoubleType()), 2)
    )
)
print("Generate time dataframe")
date_df = process_dates(cleaned_df)
print("Generate info dataframe")
info_df = process_info(cleaned_df)
print("Write dataframe")

(
    date_df.write
    .mode("overwrite")
    .parquet("../data/output/silver/amortisation/date_table.parquet")
)
(
    info_df.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet("../data/output/silver/amortisation/info_table.parquet")
)


Start AMORTISATION SILVER job.
Cast data to correct types.
Generate time dataframe
Generate info dataframe
Write dataframe
