# Convert DNBs Additional Validation Rules to Patterns

DNBs additional validation rules are available in the 'solvency2-rules' subfolder of the repository.  
The formulas in this file use a specific syntax, this notebook converts this syntax to a syntax that can be interpreted by Python.  
The resulting formulas are called 'patterns'.

## Import packages

In [None]:
import pandas as pd  # dataframes
from os.path import join # some os dependent functionality
import re # regex
#from src import adjust_syntax  # adjust syntax of additional Solvency 2 validation rules
#from src import Evaluator  # conversion from 'rules' to pandas expressions for the data-patterns packages

## General parameters

In [None]:
# Location and name of the file with the additional rules:
RULES_PATH = join('..', 'data', 'downloaded files')  
# Based on 2022
FILENAME_RULES = '2022_02_23_set_aanvullende_controleregels_solvency2.xlsx'
# Based on 2021
#FILENAME_RULES = '2021_04-01_set_aanvullende_controleregels_solvency2.xlsx'
# Based on 2020
#FILENAME_RULES = '2020-01-22 Set aanvullende controleregels Solvency II_tcm46-387021.xlsx'

In [None]:
# Location and names of files with all possible datapoints for QRS and ARS
DATAPOINTS_PATH = join('..', 'data', 'datapoints')
FILENAME_DATAPOINTS_QRS = 'QRS.csv'
FILENAME_DATAPOINTS_ARS = 'ARS.csv'
FILENAME_DATAPOINTS_QRG = 'QRG.csv'
FILENAME_DATAPOINTS_ARG = 'ARG.csv'
FILENAME_DATAPOINTS_QFS = 'QFS.csv'
FILENAME_DATAPOINTS_QFG = 'QFG.csv'
#FILENAME_DATAPOINTS_FTK = 'FTK.csv'

In [None]:
# Input parameters:
PARAMETERS = {'decimal': 0}
# currently only 'decimal' is available which specifies tolerance during evaluation of patterns.
# decimal: 0 means tolerance = abs(1.5e-0) (= 1.5)

In [None]:
# # We log to rules.log in the data/instances path
# logging.basicConfig(filename = join(INSTANCES_DATA_PATH, 'rules.log'),level = logging.INFO, 
#                     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

## Read file with all possible datapoints

We use a simplified taxonomy with all possible datapoints, located in the data/datapoints directory.  
The evaluator uses this taxonomy to generate the patterns.

In [None]:
# Load files to dataframe:
df_datapoints_qrs = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_QRS), sep=";").fillna("")
df_datapoints_ars = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_ARS), sep=";").fillna("")
df_datapoints_qrg = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_QRG), sep=";").fillna("")
df_datapoints_arg = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_ARG), sep=";").fillna("")
df_datapoints_qfs = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_QFS), sep=";").fillna("")
df_datapoints_qfg = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_QFG), sep=";").fillna("")
#df_datapoints_ftk = pd.read_csv(join(DATAPOINTS_PATH, FILENAME_DATAPOINTS_FTK), sep=";").fillna("")

In [None]:
# Merge into a single dataframe with all datapoints
df_datapoints = pd.concat([df_datapoints_qrs, df_datapoints_ars, df_datapoints_qrg, df_datapoints_arg,
                           df_datapoints_qfs, df_datapoints_qfg, #df_datapoints_ftk
                          ], ignore_index = True).drop_duplicates()

In [None]:
df_datapoints.head()

## Read DNBs Additional Validation Rules

DNBs additional validation rules are currently published as an Excel file on the DNB statistics website. We included the Excel file here in the project.

Here we read the Excel and perform some data cleaning.

In [None]:
df_rules = pd.read_excel(join(RULES_PATH, FILENAME_RULES), header = 1, engine='openpyxl')
df_rules.drop_duplicates(inplace=True) #remove double lines
df_rules.fillna("", inplace = True)
df_rules = df_rules.set_index('ControleRegelCode')

In [None]:
df_rules.head()

Adjust syntax of additional Solvency 2 validation rules

In [None]:
def adjust_syntax(rules):
    """Adjust syntax of additional Solvency 2 validation rules"""

    # Correct template typo's
    rules['Formule'] = rules['Formule'].str.replace('S.08.01.01.01,c0380', 'S.08.01.01.02,c0380')
    rules['Formule'] = rules['Formule'].str.replace('S.08.01.01.01,c0430', 'S.08.01.01.02,c0430')
    rules['Formule'] = rules['Formule'].str.replace('S.08.01.04.01,c0380', 'S.08.01.04.02,c0380')
    rules['Formule'] = rules['Formule'].str.replace('S.08.01.04.01,c0430', 'S.08.01.04.02,c0430')
    rules['Formule'] = rules['Formule'].str.replace('S.08.02.01.01,c0320', 'S.08.02.01.02,c0320')
    rules['Formule'] = rules['Formule'].str.replace('S.08.02.04.01,c0320', 'S.08.02.04.02,c0320')

    # " " has to be converted to None
    rules['Formule'] = rules['Formule'].str.replace('" "','None')
    rules['Formule'] = rules['Formule'].str.replace('""','None')
    # <> . has to be converted to <> None
    rules['Formule'] = rules['Formule'].str.replace('<> \.','<> None')
    rules['Formule'] = rules['Formule'].str.replace('<>\.','<> None')
    rules['Formule'] = rules['Formule'].str.replace('< > \.','< > None')
    rules['Formule'] = rules['Formule'].str.replace('< >\.','< > None')
    # = . has to be converted to = None
    rules['Formule'] = rules['Formule'].str.replace('= \.','= None')
    rules['Formule'] = rules['Formule'].str.replace('=\.','= None')
    # <> has to be converted to !=
    rules['Formule'] = rules['Formule'].str.replace('<>','!=')
    rules['Formule'] = rules['Formule'].str.replace('< >','!=')
    # ; has to be converted to ,
    rules['Formule'] = rules['Formule'].str.replace(';',',')

    # Use of rNNN is unnecessary
    rules['Formule'] = rules['Formule'].str.replace('rNNN,','')
    # Use wildcard # instead of RNNN for summing instead of repeating over multiple rows, and make sure all rows are included
    rules.loc[rules['Formule'].str.contains('RNNN,'), 'Rijen'] = '(All)'
    rules['Formule'] = rules['Formule'].str.replace('RNNN,','#,')
    # Correct C\d\d\d\dC to C\d\d\d\d
    for item in [tuple(filter(None, tup)) for cols in rules['Kolommen'] for tup in re.findall(r"([Cc]\d\d\d\d)([Cc])", cols)]:
        rules['Kolommen'] = rules['Kolommen'].str.replace("".join(item),item[0])
    # Correct C\d\d\d to C0\d\d\d
    for item in [tuple(filter(None, tup)) for cols in rules['Kolommen'] for tup in re.findall(r"([^0-9])(\d\d\d)($|;|\))", cols)]:
        item = tuple((item[0], item[1], "")) if len(item) == 2 else tuple((item[0], item[1], item[2]))
        rules['Kolommen'] = rules['Kolommen'].str.replace("".join(item),item[0] + '0' + item[1] + item[2])
    # Split double row entries {R\d\d\d\d,R\d\d\d\d} into two entries, i.e. {R\d\d\d\d},{R\d\d\d\d}
    for item in [tuple(filter(None, tup)) for form in rules['Formule'] for tup in re.findall(r"([Rr]\d\d\d\d)(,)([Rr]\d\d\d\d)", form)]:
        rules['Formule'] = rules['Formule'].str.replace("".join(item),item[0] + "}" + item[1] + "{" + item[2])
    # Add template when not included in formula
    for item in [tuple((rules.loc[idx, 'HoofdTabel'],tuple(filter(None, tup)))) for idx in list(rules.index) for tup in re.findall(r"(\{)([CcRr]\d\d\d\d\})", rules.loc[idx, 'Formule'])]:
        rules['Formule'] = rules['Formule'].str.replace("".join(item[1]),item[1][0] + item[0] + "," + item[1][1])
    # Add comma to SUBSTR({}#,#) expression
    for item in [tuple(filter(None, tup)) for form in rules['Formule'] for tup in re.findall(r"(\})([0-9]{1,2})", form)]:
        rules['Formule'] = rules['Formule'].str.replace("".join(item), item[0] + "," + item[1])
    # Remove trailing comma in (#,#,#,) expression
    rules['Formule'] = rules['Formule'].str.replace(r",\)",")")

    # Some rules check dates to be filled by > 0, this has to be changed to <> None
    list_of_rules = ['S.15.01_105', 'S.15.01_107', 'S.23.04_111', 'S.23.04_112', 'S.23.04_121', 'S.23.04_122', 'S.23.04_133', 'S.23.04_144', 'S.23.04_145', 'S.30.01_105', 'S.30.01_106', 
                     'S.30.01_117', 'S.30.01_118', 'S.30.03_102', 'S.30.03_103', 'S.36.01_106', 'S.36.02_106', 'S.36.02_108', 'S.36.03_104', 'S.10.01_115', 'S.15.01_106', 'S.15.01_108',
                     'S.23.04_127', 'S.23.04_128', 'S.23.04_137', 'S.23.04_148', 'S.23.04_149']
    list_of_rules_adj = [rule for rule in list_of_rules if rule in list(rules.index)]
    if len(list_of_rules_adj) > 0:
        rules.loc[list_of_rules_adj, 'Formule'] = rules.loc[list_of_rules_adj, 'Formule'].str.replace("> 0",'<> None').str.replace(">0",'<> None')

    return rules

In [None]:
df_rules = adjust_syntax(df_rules)

In [None]:
df_rules.head()

The Excel file contains rules for different report-types. In the next step we filter out the rules for QRS, ARS, QRG and ARG respectively.

In [None]:
df_rules_qrs = df_rules.copy()[(df_rules['Standaard'] == 'SOLVENCY') | (df_rules['Standaard'] == 'QRS')]
df_rules_ars = df_rules.copy()[(df_rules['Standaard'] == 'SOLVENCY') | (df_rules['Standaard'] == 'ARS')]

In [None]:
df_rules_qrs.head()

In [None]:
df_rules_ars.head()

## Convert the rules to patterns

The evaluator is a piece of Python code, which takes the Additional Validation Rules as input, and transforms it to expressions that can be interpreted by the data_patterns package (patterns).

In [None]:
import pandas as pd
import numpy as np
import os
from os.path import join
import re
import sys
import data_patterns
import logging


class Evaluator:
    def __init__(self, df_rules, df_datapoints, parameters):

        self.entrypoint_templates = sorted(list(df_datapoints['tabelcode'].unique()))
        self.entrypoint_datapoints = sorted(list((df_datapoints['tabelcode'] + "," +
                                                  df_datapoints['rij'] + "," +
                                                  df_datapoints['kolom']
                                                  ).str.replace(",,", ",")))
        self.df_rules = self.pre_process_rules(df_rules)
        self.df_patterns = self.process_rules(df_datapoints, parameters)

    def datapoints2pandas(self, s):
        """Transform EVA2 datapoints to Python Pandas datapoints by making letters uppercase"""
        datapoints = []
        for item in list(set(re.findall(r'{(.*?)}', s))):
            datapoints.append(item.upper())
            s = s.replace(item, '"' + item.upper() + '"')
        s = self.preprocess_pattern(s)
        return s, datapoints

    def replace_and_or(self, s):
        """Replace and by & and or by |, but not within strings"""
        if re.search(r"(.*?)\"(.*?)\"(.*)", s) is None:  # input text does not contain strings
            s = s.replace("OR", "|")
            s = s.replace("AND", "&")
        for item in re.findall(r"(.*?)\"(.*?)\"(.*)", s):
            s = s.replace(item[0], item[0].replace("OR", "|"))
            s = s.replace(item[0], item[0].replace("AND", "&"))
            s = s.replace(item[2], self.replace_and_or(item[2]))
        return s

    def replace_substr(self, s):
        """Replace SUBSTR(A,B,C) by A.str.slice(B,B+C,1)"""
        for item in re.findall(r"(SUBSTR\s?\()(.*?)(,)([0-9]{1,2})(,)([0-9]{1,2})(\))", s):
            s = s.replace("".join(item), item[1] + ".str.slice(" + str(int(item[3]) - int(1)) + "," + str(int(item[3]) - int(1) + int(item[5])) + ",1)")
        return s

    def replace_in_not_in(self, s):
        """Replace IN and NOT IN by str.contains((...))"""
        # NOT IN
        for item in re.findall(r"(.*?)(\s?[^\w]NOT IN[^\w]\s?)(\(.*?\))", s):
            item_2_adj = item[2] if "None" not in item[2] else "(" + item[2].replace("None, ", "").replace("None,", "").replace(", None", "").replace(",None", "") + ", True, 0, True)"
            s = s.replace("".join(item),item[0] + ".str.contains" + item[2].replace('","', "|").replace('", "', "|") + "=False")
        # IN
        for item in re.findall(r"(.*?)(\s?[^\w]IN[^\w]\s?)(\(.*?\))", s):
            item_2_adj = item[2] if "None" not in item[2] else "(" + item[2].replace("None, ", "").replace("None,", "").replace(", None", "").replace(",None", "") + ", True, 0, True)"
            s = s.replace("".join(item),item[0] + ".str.contains" + item_2_adj.replace('","', "|").replace('", "', "|"))
        return s

    def adjust_sum(self, s):
        """Adjust SUM by adding additional parenthesis""" 
        for item in re.findall(r"(SUM\s?\()(\(?.*\).*?\)?)", s):
            s = s.replace("".join(item),item[0] + "(" + item[1] + ")")
        return s

    def preprocess_pattern(self, pattern):
        # Pattern: AND, OR
        pattern = self.replace_and_or(pattern)
        # Pattern: SUBSTR
        pattern = self.replace_substr(pattern)
        # Pattern: IN, NOT IN
        pattern = self.replace_in_not_in(pattern)
        # Pattern: SUM
        pattern = self.adjust_sum(pattern)

        return pattern

    def make_pattern_expression(self, expression, name, parameters):
        """Make expressions for the miner"""
        parameters['solvency'] = True
        pandas_expressions = data_patterns.to_pandas_expressions(expression, {}, parameters, None)
        pattern = [[name, 0] + [expression] + [0, 0, 0] + ["DNB"] + [{}] + pandas_expressions + ["", "", ""]]
        return pattern

    def pre_process_rules(self, df_rules):
        """Transform rules so that we can evaluate them. Not all rules are fit to be evaluated"""
        logger = logging.getLogger(__name__)
        df_rules['datapoints'] = ''
        df_rules['datapoints'] = df_rules['datapoints'].astype('object')
        df_rules['templates'] = ''
        df_rules['templates'] = df_rules['templates'].astype('object')
        for row in df_rules.index:
            rule_original = df_rules.loc[row, 'Formule']
            if not isinstance(rule_original, str):
                logger.info("Rule " + row + ": " + "duplicate rule. ")
                rule_original = rule_original.values[0]
            rule_original, datapoints = self.datapoints2pandas(rule_original)
            df_rules.at[row, 'datapoints'] = datapoints
            df_rules.at[row, 'templates'] = list(set([datapoint[0:13].upper() for datapoint in datapoints]))
            df_rules.loc[row, 'Formule_input'] = rule_original

        df_rules['error'] = ''  # error message
        df_rules['n_patterns'] = 0  # number of patterns derived from rules

        return df_rules

    def unpack_rows_columns(self, row_range, column_range, datapoints, df_datapoints):
        "Unpack rows and columns"
        datapoints_not_found = []
        expansion_dict = {}
        # are the datapoints in the rule in the instance?
        for datapoint in datapoints:
            if datapoint not in self.entrypoint_datapoints:  # if datapoint is not there, see if we need to add rows or columns
                new_list = []
                bool_wildcard = ",#" in datapoint
                datapoint_orig = datapoint
                datapoint = datapoint.replace(",#", "")
                if datapoint[14] == "C" and (row_range[0] != "" or row_range[0].upper() == "ALL"):
                    if len(row_range) == 1 and row_range[0].upper() == "ALL":
                        for col in self.entrypoint_datapoints:
                            reg = re.search(datapoint[0:14] + "R....," + datapoint[14:],col)  # do for all rows if necessary
                            if reg:
                                new_list.append(reg.group(0))
                    else:
                        rows = []
                        for r in row_range:
                            if len(r) - len(r.replace("-", "")) == 1:  # range
                                low, high = r.split("-")
                                rows.extend(list(df_datapoints[(df_datapoints['tabelcode'] == datapoint[0:13]) &
                                                                (df_datapoints['kolom'] == datapoint[14:]) &
                                                                (df_datapoints['rij'].str[-4:] >= low) &
                                                                (df_datapoints['rij'].str[-4:] <= high)
                                                                ].rij))
                            else:
                                if r.upper()[0] == 'R':
                                    rows.extend([r.upper()])
                                else:
                                    rows.extend([('R' + r)])
                        for r in rows:
                            new_list.append(datapoint[0:14] + r + "," + datapoint[14:])
                if datapoint[14] == "R" and (column_range[0] != "" or column_range[0].upper() == "ALL"):
                    if len(column_range) == 1 and column_range[0].upper() == "ALL":
                        for col in self.entrypoint_datapoints:
                            reg = re.search(datapoint + ",C....", col)  # do for all columns if necessary
                            if reg:
                                new_list.append(reg.group(0))
                    else:
                        cols = []
                        for c in column_range:
                            if len(c) - len(c.replace("-", "")) == 1:  # range
                                low, high = c.split("-")
                                cols.extend(list(df_datapoints[(df_datapoints['tabelcode'] == datapoint[0:13]) &
                                                                (df_datapoints['rij'] == datapoint[14:]) &
                                                                (df_datapoints['kolom'].str[-4:] >= low) &
                                                                (df_datapoints['kolom'].str[-4:] <= high)
                                                                ].kolom))
                            else:
                                if c.upper()[0] == 'C':
                                    cols.extend([c.upper()])
                                else:
                                    cols.extend([('C' + c)])
                        for c in cols:
                            new_list.append(datapoint + "," + c)
                if new_list != []:
                    # Wildcard # notation indicates that we need to sum over all the datapoints
                    new_list = ['"},{"'.join(new_list)] if bool_wildcard else new_list
                    expansion_dict[datapoint_orig] = new_list
                else:
                    datapoints_not_found.append(datapoint_orig)

        return expansion_dict, datapoints_not_found

    def process_rule(self, pre_expression, name, datapoints, expansion_dict, df_datapoints, parameters):
        """Some rules have multiple rows or columns. This function makes all the expressions with every row/column"""
        expressions = []
        invalid_expressions = ""
        if expansion_dict:
            if datapoints[0] in expansion_dict.keys():
                zero = datapoints[0]
            else:
                zero = datapoints[1]
            bool_wildcard = ",#" in pre_expression
            for i in range(len(expansion_dict[zero])):
                expression = pre_expression
                valid_expression = True
                for datapoint in datapoints:
                    if datapoint in expansion_dict.keys():
                        datapoints_wildcard = [item for item in re.findall(r"(S\.\d\d\.\d\d\.\d\d\.\d\d,R\d\d\d\d,C\d\d\d\d)*", expansion_dict[datapoint][i]) if item != '']
                        for datapoint_wildcard in datapoints_wildcard:
                            if len(df_datapoints[(df_datapoints['tabelcode'] == datapoint_wildcard[:13]) &
                                                (df_datapoints['rij'] == datapoint_wildcard[14:19].upper()) &
                                                (df_datapoints['kolom'] == datapoint_wildcard[20:25].upper())]) == 0:
                                valid_expression = False
                        expression = expression.replace(datapoint, expansion_dict[datapoint][i])
                if valid_expression:
                    expressions.extend(self.make_pattern_expression(expression, name, parameters))
                else:
                    if invalid_expressions == "":
                        invalid_expressions = invalid_expressions + "(" + expression + ")"
                    else:
                        invalid_expressions = invalid_expressions + ", (" + expression + ")"
        else:
            expressions.extend(self.make_pattern_expression(pre_expression, name, parameters))

        return expressions, invalid_expressions

    def process_rules(self, df_datapoints, parameters):
        """Evaluate all rules and stores the result in df_rules"""
        logger = logging.getLogger(__name__)
        rules_expressions = []
        for idx in range(len(self.df_rules.index)):
            row = self.df_rules.index[idx]
            rule_original = self.df_rules.loc[row, 'Formule_input']
            rule_name = self.df_rules.index[idx]
            datapoints = self.df_rules.loc[row, 'datapoints'].copy()
            templates = self.df_rules.loc[row, 'templates']
            self.df_rules['Rijen'] = self.df_rules['Rijen'].astype(str)
            self.df_rules['Kolommen'] = self.df_rules['Kolommen'].astype(str)
            row_range = self.df_rules.loc[row, 'Rijen'].replace("(", "").replace(")", "").replace(",", ";").split(";")
            column_range = self.df_rules.loc[row, 'Kolommen'].replace("(", "").replace(")", "").replace(",", ";").split(";")
            # are the templates in the rule in the instance?
            templates_not_found = []
            for template in templates:
                if template not in self.entrypoint_templates:
                    templates_not_found.append(template)

            if templates_not_found == []:
                expansion_dict, datapoints_not_found = self.unpack_rows_columns(row_range, column_range, datapoints, df_datapoints)
                if datapoints_not_found == []:
                    rule_expressions, invalid_expressions = self.process_rule(rule_original, rule_name, datapoints, expansion_dict, df_datapoints, parameters)
                    rules_expressions.extend(rule_expressions)
                    if invalid_expressions != "":
                        self.df_rules.loc[row, 'error'] = \
                            'Some expressions skipped due to invalid datapoint references: ' + invalid_expressions
                        logger.warning("Rule " + row + ", " + self.df_rules.loc[row, 'error'])
                    else:
                        self.df_rules.loc[row, 'error'] = ''
                    self.df_rules.loc[row, 'n_patterns'] = len(rule_expressions)
                    logger.info("Rule " + row + ", " + str(len(rule_expressions)) + " pattern(s) generated")
                else:
                    # expression = rule_original
                    self.df_rules.loc[row, 'error'] = 'missing datapoint(s): ' + str(datapoints_not_found)
                    logger.warning("Rule " + row + ", " + self.df_rules.loc[row, 'error'])
            else:
                # expression = rule_original
                self.df_rules.loc[row, 'error'] = 'missing template(s): ' + str(templates_not_found)
                logger.warning("Rule " + row + ", " + self.df_rules.loc[row, 'error'])

        df_patterns = pd.DataFrame(data = rules_expressions, columns = data_patterns.PATTERNS_COLUMNS)
        df_patterns.index.name = 'index'

        return df_patterns

In [None]:
evaluator_qrs = Evaluator(df_rules_qrs, df_datapoints, PARAMETERS)

In [None]:
evaluator_qrs.df_patterns.head()

In [None]:
evaluator_ars = Evaluator(df_rules_ars, df_datapoints, PARAMETERS)

In [None]:
evaluator_ars.df_patterns.head()

## Export patterns to rules folder

In [None]:
evaluator_qrs.df_patterns.to_excel(join('..', 'solvency2-rules', "qrs_patterns_additional_rules.xlsx"))

In [None]:
evaluator_ars.df_patterns.to_excel(join('..', 'solvency2-rules', "ars_patterns_additional_rules.xlsx"))