In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Step 1: Read the configuration table and filter for WORKSHEET = 'MarketR'
data_id = spark.sql("SELECT * FROM default.data_id WHERE WORKSHEET = 'MarketR'")

# Step 2: Read the 'MarketR' sheet from the unstructured Excel file
#excel_file_path = "/Volumes/workspace/default/input/MarketR.xlsx" # for test

#excel_file_path = dbutils.widgets.get("input_path") # Parameter set by workflow
#df_excel = pd.read_excel(excel_file_path, sheet_name='MarketR', header=None)

# Step 3: Create a function to extract values based on RC_CODE
def get_cell_value(rc_code):
    if rc_code:
        # Extract the row and column from RC_CODE
        row_num = int(rc_code.split('C')[0][1:])  # Get the number after 'R'
        col_num = int(rc_code.split('C')[1])      # Get the number after 'C'
        
        # Retrieve the value from the DataFrame
        return df_excel.iat[row_num - 1, col_num - 1] if (0 <= row_num - 1 < len(df_excel)) and (0 <= col_num - 1 < len(df_excel.columns)) else None
    return None

# Register the UDF
get_cell_value_udf = udf(get_cell_value, StringType())

# Step 4: Create a new column 'VALUE' in data_id
data_id = data_id.withColumn("VALUE", get_cell_value_udf(data_id["RC_CODE"]))

# Step 5: Store the updated DataFrame as a new table
data_id.write.mode("overwrite").saveAsTable("default.data_id_updated")

%md
Prompt, with which the following code was drafted
The following table aggregation_tree_market contains input for a market SCR calculation (without currency and concentration risk) in Solvency II in form of an aggregation tree structure, where column _NODE_ID
contains all nodes of the tree, column _PARENT_NODE_ID contains their respective parent node and AGGREGATION_METHOD_CD contains either the value "external", which means that the input of the value is provided externally via a variable (column DATA_ID) in table "data_id_updated", or one of the following aggregation methods:
-  "sum": The value for this node is calculated as a sum of all child nodes
-  "max": The value for this node is calculated as the maximum of all child nodes
-  "correlated": The value for this node is calculated with the help of a correlation matrix. The name of the matrix to be used in the aggregation is given in column _MATRIX_ID. The values of all correlation matrices are stored in table default.correlation_matrix, which consists of the following columns: 
-- CORRELATION_MATRIX_ID: ID of correlation matrix, which is referenced in aggregation_tree_market._MATRIX_ID
-- VAR1_NM: ID of variable 1
-- VAR2_NM: ID of variable 2
-- CORRELATION_VALUE_NO: Correlation value
The resulting aggregation logic using "correlated is: sqrt(sum(Corr_i,j*Node_i*Node_j)) over all child nodes of the respective node.
- "dnav": All child nodes of a node with aggregation method "dnav" are either assets or liabilities. This information is specified in column "BS_TYPE" with "asset" for assets and "liab" for liabilities. Each asset or liability has a scenario that is specified in column "SCENARIO": "BC" is the base case and "SH" is the shocked scenario. With this information, the aggregation logic for "dnav" is: (Sum of all base case assets - sum of all base case liabilities) - ((Sum of all shocked assets - sum of all shocked liabilities)).
- "max_scen": Does the same as "max" for now.

Can you please write a method, that takes the input "aggregation_tree_id" and does the following steps: 
1. Read in the tables "aggregation_tree" and "data_id_updated" as data frames.
2. Add the column "VALUE" to the data frame "aggregation_tree".
3. For the specified "aggregation_tree_id", read in all values for all nodes with "_AGGREGATION_METHOD_CD" = "external" from data frame "data_id_updated", where _NODE_ID = DATA_ID in "data_id_updated".
4. Aggregate all other values using their aggregation method, which is definded in column "AGGREGATION_METHOD_CD".
5. Save the results in a new table in schema "default" with name "aggregation_tree_market_enriched".


In [None]:


import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sqrt, sum as spark_sum, max as spark_max, when

# Initialize Spark session
spark = SparkSession.builder.appName("MarketSCRAggregation").getOrCreate()

def aggregate_tree(aggregation_tree_id):
    """
    Aggregates market SCR values based on the specified aggregation tree ID.

    Parameters:
    aggregation_tree_id (str): The ID of the aggregation tree to be used for aggregation.

    Steps:
    1. Reads in the aggregation tree and data ID updated tables using Spark SQL.
    2. Initializes the "VALUE" column in the aggregation tree DataFrame to 0.
    3. Reads in values for nodes with "_AGGREGATION_METHOD_CD" = "external" and updates the "VALUE" column.
    4. Aggregates values for other nodes based on their aggregation method.
        - Supported methods: 'sum', 'max', 'correlated', 'dnav', 'max_scen'.
    5. Saves the results in a new table "aggregation_tree_market_enriched" in the "default" schema.

    Example usage:
    aggregate_market_scr('MARKET_INT')
    """
    
    # Step 1: Read in the tables using Spark SQL
    aggregation_tree = spark.sql(f'SELECT * FROM default.aggregation_tree_market WHERE AGGREGATION_TREE_ID = "{aggregation_tree_id}"')
    data_id_updated = spark.sql('SELECT * FROM default.data_id_updated')

    # Step 2: Add the column "VALUE" to the data frame "aggregation_tree" and initialize to 0
    aggregation_tree = aggregation_tree.withColumn("VALUE", when(col("_NODE_ID").isNotNull(), 0).cast("double"))

    # Step 3: Read in all values for nodes with "_AGGREGATION_METHOD_CD" = "external"
    external_nodes = aggregation_tree.filter(col("_AGGREGATION_METHOD_CD") == "external")
    external_node_ids = [row["_NODE_ID"] for row in external_nodes.collect()]
    external_values = data_id_updated.filter(data_id_updated["DATA_ID"].isin(external_node_ids))

    for row in external_nodes.collect():
        value_row = external_values.filter(external_values["DATA_ID"] == row["_NODE_ID"]).first()
        if value_row:
            aggregation_tree = aggregation_tree.withColumn("VALUE", 
                when(col("_NODE_ID") == row["_NODE_ID"], value_row["VALUE"]).otherwise(col("VALUE")))

    # Step 4: Aggregate all other values using their aggregation method
    def aggregate_node(node_id):
        node = aggregation_tree.filter(col("_NODE_ID") == node_id).first()
        if not node:
            return None

        method = node["_AGGREGATION_METHOD_CD"]
        children = aggregation_tree.filter(col("_PARENT_NODE_ID") == node_id)

        if method == 'sum':
            return children.agg(spark_sum("VALUE")).first()[0]
        elif method == 'max':
            return children.agg(spark_max("VALUE")).first()[0]
        elif method == 'correlated':
            matrix_id = node["_MATRIX_ID"]
            correlation_matrix = spark.sql(f'SELECT * FROM default.correlation_matrix WHERE CORRELATION_MATRIX_ID = "{matrix_id}"')
            total = 0
            child_ids = [row["_NODE_ID"] for row in children.collect()]
            for i in range(len(child_ids)):
                for j in range(len(child_ids)):
                    if i != j:
                        corr_value = correlation_matrix.filter((correlation_matrix["VAR1_NM"] == child_ids[i]) & 
                                                               (correlation_matrix["VAR2_NM"] == child_ids[j])).first()
                        if corr_value:
                            total += corr_value["CORRELATION_VALUE_NO"] * (children.filter(col("_NODE_ID") == child_ids[i]).select("VALUE").first()[0] * 
                                                                            children.filter(col("_NODE_ID") == child_ids[j]).select("VALUE").first()[0])
            return sqrt(total)
        elif method == 'dnav':
            base_case_assets = children.filter((col("BS_TYPE") == 'asset') & (col("SCENARIO") == 'BC')).agg(spark_sum("VALUE")).first()[0]
            base_case_liabilities = children.filter((col("BS_TYPE") == 'liab') & (col("SCENARIO") == 'BC')).agg(spark_sum("VALUE")).first()[0]
            shocked_assets = children.filter((col("BS_TYPE") == 'asset') & (col("SCENARIO") == 'SH')).agg(spark_sum("VALUE")).first()[0]
            shocked_liabilities = children.filter((col("BS_TYPE") == 'liab') & (col("SCENARIO") == 'SH')).agg(spark_sum("VALUE")).first()[0]
            return (base_case_assets - base_case_liabilities) - (shocked_assets - shocked_liabilities)
        elif method == 'max_scen':
            return children.agg(spark_max("VALUE")).first()[0]
        else:
            return None

    # Apply aggregation recursively for all nodes
    node_ids = [row["_NODE_ID"] for row in aggregation_tree.collect()]
    for node_id in node_ids:
        if aggregation_tree.filter(col("_NODE_ID") == node_id).select("VALUE").first()[0] is None:
            value = aggregate_node(node_id)
            aggregation_tree = aggregation_tree.withColumn("VALUE", 
                when(col("_NODE_ID") == node_id, value).otherwise(col("VALUE")))

    # Step 5: Save the results in a new table in schema "default" with name "aggregation_tree_market_enriched"
    aggregation_tree.select("_NODE_ID", "VALUE").write.mode("overwrite").saveAsTable("default.aggregation_tree_market_enriched")

# Example usage
aggregate_tree('MARKET_INT')

The following code takes the aggregated values from before and writer ist into an output file, using the logic in the output_template.xlsx.

In [None]:

# Databricks notebook source

import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()




from databricks.sdk import WorkspaceClient
import pandas as pd
from datetime import datetime
import openpyxl
import string
import pyspark.pandas as ps
from shutil import move
from pyspark.sql import SparkSession

w = WorkspaceClient()

# Define volume, folder, and file details.
catalog            = 'workspace'
schema             = 'default'
volume             = 'output'
volume_path        = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume

#RUN_ID = "001"  # For test purposes; If the script is run by workflow, this is set as a parameter
RUN_ID = dbutils.widgets.get("run_id")
volume_folder   = f"output_{RUN_ID}"
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file        = f"Output_{RUN_ID}.xlsx"
volume_file_path   = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/

# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)

# Initialize Spark session
spark = SparkSession.builder.appName("test").getOrCreate()

# Read in the tables as data frames
Output_Enriched2_df = spark.table("default.Output_Enriched2")

# Load template
template_file = f"{volume_path}/Output_Template.xlsx"
template = openpyxl.load_workbook(template_file)

# Create a writer
local_out_path = f"/tmp/output_{RUN_ID}.xlsx"  # Use a local path


# Define the path to your Excel file
excel_file_path = "/Volumes/workspace/default/configuration/Output_Mapping.xlsx"

# Read the Excel file 
df = pd.read_excel(
    excel_file_path, 
    sheet_name='Output mapping', 
    usecols="C:K", 
    skiprows=11, 
    nrows=52
) 
# Read range 'Output mapping'!C12:K63

# Rename columns to remove invalid characters "" ,;{}()\n\t=""
df.columns = [
    str(column).replace(" ", "")
               .replace(",", "_")
               .replace(";", "_")
               .replace("(", "_")
               .replace(")", "_")
               .replace("\n", "_")
    for column in df.columns
]

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Save the DataFrame as a new table in the "default" schema
spark_df.write \
    .mode("overwrite") \
    .saveAsTable("default.Output_Mapping2")



# Import necessary libraries
from pyspark.sql import functions as F

# Step 1: Read in the tables as data frames
output_mapping_df = spark.table("default.Output_Mapping2")
data_id_updated_df = spark.table("default.data_id_updated")
aggregation_tree_df = spark.table("default.aggregation_tree_market_enriched")

# Step 2: Create a new data frame with enriched values
# Create a mapping dictionary for aggregation_tree_market_enriched
aggregation_dict = {row["_NODE_ID"]: row["VALUE"] for row in aggregation_tree_df.collect()}
# Create a mapping dictionary for data_id_updated
data_id_dict = {row["DATA_ID"]: row["VALUE"] for row in data_id_updated_df.collect()}

# Combine both dictionaries
combined_dict = {**aggregation_dict, **data_id_dict}

# Create a UDF to replace values based on the mapping logic
replace_udf = F.udf(lambda value: combined_dict.get(value, value), StringType())

# Apply the UDF to each relevant column
for col_name in [f"C00{i}" for i in range(20, 81, 10)]:  # C0020, C0040, C0060, C0080
    output_mapping_df = output_mapping_df.withColumn(col_name, replace_udf(F.col(col_name)))

# Save the Spark DataFrame as a new table in the "default" schema
output_mapping_df.write \
    .mode("overwrite") \
    .saveAsTable("default.Output_Enriched2")



# Use the openpyxl engine directly
with pd.ExcelWriter(local_out_path, engine='openpyxl') as writer:
    writer.book = template
    # Write dataframe to excel using template and save in output path
    Output_Enriched2_df.toPandas().to_excel(writer, sheet_name='Output mapping', startrow=12, startcol=2, index=False, header=False)


