In [1]:
#%load_ext pycodestyle_magic

In [2]:
#%pycodestyle_off

In [3]:
import pandas as pd
import numpy as np
import re
from feature import Feature, Level
from rule import Rule
import random  # This is used to generate random level names.
random.seed(10)
import copy

In [4]:
def extract_feature_levels(text):
    # print("Value at ({}, {}):".format(index, column), df.loc[index, column])
    # 'feature in {level, level, level, ...}'
    pattern = r"([^\*]+) in \{([^\}]+)\}"

    matches = re.findall(pattern, text)

    if matches:
        feature = matches[0][0].strip("' ")
        levels = [level.strip('" ') for level in matches[0][1].split(',')]
        #   print("Feature:", feature)
        #   print("Levels:", levels)
    else:
        return False, False

    return feature, levels

In [5]:
def extract_min_max_range(text, df):

    EPSILON = 0.0001

    # Inequality like: '(value (<=, <)) feature ((<=, <) value)'
    # Left and right size are optional, but it must have at least one.
    # Operators can be < or <=
    pattern = r'(-?\d+)?\s*([<>]=?)?\s*((?:"[\w\s-]+")|(?:\'[\w\s-]+\'|\w+(?:-?\w+)*))\s*([<>]=?)?\s*(-?\d+)?'

    match = re.match(pattern, text)

    if not match:
        return False, False, False

    number1 = float(match.group(1)) if match.group(1) else None
    op1 = match.group(2) if match.group(2) else None
    feature = match.group(3).strip("\'")
    op2 = match.group(4) if match.group(4) else None
    number2 = float(match.group(5)) if match.group(5) else None

    if number1 is None and number2 is None:
        print ("Range patter could not identify any limits")
        raise TypeError


    # Limited on the left and right
    # This case can only have signs < and <=
    if number1 is not None and number2 is not None:

        min = number1
        max = number2

        if "=" not in op1:
            min += EPSILON
        
        if "=" not in op2:
            max -= EPSILON

    # Only limited on the right
    if number1 is None and op1 is None:
        if ">" in op2:
            max = df[feature].max()
            min = number2
            if "=" not in op2:
                min += EPSILON
            
        
        if "<" in op2:
            min = df[feature].min()
            max = number2
            if "=" not in op2:
                max -= EPSILON
    
    # Only limited on the left
    if number2 is None and op2 is None:
        if ">" in op1:
            min = df[feature].min()
            max = number1
            if "=" not in op1:
                max -= EPSILON
              
        if "<" in op1:
            max = df[feature].max()
            min = number1
            if "=" not in op1:
                min += EPSILON

    # print("First number:", number1)
    # print("Operator 1:", op1)
    # print("Word:", feature)
    # print("Operator 2:", op2)
    # print("Second number:", number2)
    
    return feature, min, max

In [6]:
def create_rules_and_update_feature_set(rules_df: pd.core.frame.DataFrame,  # DF with rules from Rulex
                                        df: pd.core.frame.DataFrame,  # DF with data used to built rules
                                        # Conclusion set as a list of tuples [(category, min max), ...]
                                        conclusion_set: list,
                                        categories_set: set,  # Set of possible category values to be employed
                                        # Current list with all features
                                        feature_set: list[Feature] = None) -> tuple:
    # and levels that will be updated
    """ Dataframe for rules needs a structure like this,
            ID  Score Normalized   Condition 1      Condition 2             Condition 3            
        0	1	0.25               0 < age <= 35	'education-num' <= 12	'capital-gain' <= 5119

        Condition 4 
        relationship in {"Not-in-family", "Other-relative", "Own-child", Unmarried}

        Hence, the conditions for each rule can be an inequalitity or set of strings.

        The original dataframe (df) is necessary to get min and max for unbounded ranges.

        This Function will update the feature set list like: [Feature, Feature, Feature, ...]
        Each instance of Feature contains a list of levels each of type Level

        This function also returns a list of Rules extracted from the rules_df.
        This list can later be imported by the argumentation framework.
    """
    def append_feature(feature_list, feature) -> None:
        for f in feature_list:  # Check if feature is already in the list
            if f.name == feature:
                break
        else:
            feature_list.append(Feature(feature))

    if feature_set is None:
        feature_set = []

    # List of Rules to be created
    rules = []  

    for index in rules_df.index:
        # Create current rule
        rule = Rule(index)

        for column in rules_df.columns:

            if "Score Normalized" in column:
                rule.weight = rules_df.loc[index, column]

            if "Condition" not in column:
                continue

            if type(rules_df.loc[index, column]) != str:
                continue

            # Get feature and levels if condition is categorical
            feature, levels = extract_feature_levels(
                rules_df.loc[index, column])

            if feature:

                append_feature(feature_set, feature)

                # Need to find feature in the list
                for i, f in enumerate(feature_set):

                    if f.name != feature:
                        continue

                    for level in levels:

                        # Debug condition and extracted feature level
                        # print(rules_df.loc[index, column])

                        try:
                            # If the category is a number, there is no need to assign a counter.
                            new_level = Level(
                                feature, "category", level, float(level), float(level))
                            # Level value employed is discarded from possible values to be used
                            categories_set.discard(float(level))
                            # Debug condition and extracted feature level
                            # print(feature, level, float(level), float(level))
                        except ValueError:
                            # Get a possible value from the set of values not used to assign to this category
                            value = next(iter(categories_set))
                            new_level = Level(
                                feature, "category", level, value, value)
                            # Level value employed is discarded from possible values to be used
                            categories_set.discard(value)
                            # Debug condition and extracted feature level
                            # print(feature, level, len(feature_set[i]) + 1, len(feature_set[i]) + 1)

                        # Try to add in case there is not another level with the same range
                        feature_set[i].add_level(new_level)
                        #rule_conditions.append(copy.deepcopy(new_level))
                        rule.conditions.append(copy.deepcopy(new_level))

                    break  # Feature found in the list

                continue  # Got to next condition

            # Get min and max range if condition is categorical
            feature, min, max = extract_min_max_range(
                rules_df.loc[index, column], df)

            if feature:

                # Check if feature is already in the list
                append_feature(feature_set, feature)

                # Need to find feature in the list
                for i, f in enumerate(feature_set):

                    if f.name != feature:
                        continue

                    # Debug condition and extracted feature level
                    # print(rules_df.loc[index, column])
                    # print(feature, min, max)

                    new_level = Level(feature, "range",
                                      Feature.get_new_level(), min, max)
                    feature_set[i].add_level(new_level)

                    # Get level is necessary in case we try to add a level that was already there
                    # rule_conditions.append(copy.deepcopy(
                    #     feature_set[i].get_level(min, max)))
                    rule.conditions.append(copy.deepcopy(
                        feature_set[i].get_level(min, max)))

                continue  # Got to next condition

            print("Condition does not match any pattern")
            print(rules_df.loc[index, column])
            raise TypeError

        rule.rule_str = rule.get_rule_str(rules_df, conclusion_set)
        rules.append(rule)

    return feature_set, rules

In [7]:
def feature_set_to_json(feature_set: list[Feature],
                        conclusion_set: list[tuple],
                        name: str = "featureset") -> str:
    """ Get feature_set from create_feature_set and a conclusion set in a list of tuples.
        Returns the json string to be imported as the feature_set
        Example of json structure being employed: https://lucasrizzo.com/framework/json_example.png
    """

    json_str = f"{{\"featureset\":\"{name}\","
    json_str += f"\n\t\"attributes\":["

    for att in feature_set:
        json_str += f"\n\t\t[{{\"name\":\"{att.name}\"}},"
        json_str += f"\n\t\t {{\"range\":\"{len(att)}\"}},"

        # From loop
        json_str += f"\n\t\t {{\"from\":["
        for level in att.levels:
            json_str += f"\n\t\t  \t{{\"value\":\"{level.min:.4f}\"}},"

        json_str = json_str[:-1]  # Remove last coma
        json_str += "]},"

        # To loop
        json_str += f"\n\t\t {{\"to\":["
        # for level in feature_set[att]:
        for level in att.levels:
            json_str += f"\n\t\t  \t{{\"value\":\"{level.max:.4f}\"}},"

        json_str = json_str[:-1]  # Remove last coma
        json_str += "]},"

        # Level loop
        json_str += f"\n\t\t {{\"level\":["
        for level in att.levels:
            json_str += f"\n\t\t  \t{{\"value\":\"{level.name}\"}},"

        json_str = json_str[:-1]  # Remove last coma
        json_str += "]}],"

    json_str = json_str[:-1]  # Remove last coma
    json_str += "\n\t],"

    json_str += f"\n\t\"conclusions\":["

    # for key, value in conclusion_set.items():
    for name, min, max in conclusion_set:
        json_str += f"\n\t\t[{{\"category\":\"{name}\"}},"
        json_str += f"\n\t\t {{\"from\":\"{min}\"}},"
        json_str += f"\n\t\t {{\"to\":\"{max}\"}}],"

    json_str = json_str[:-1] + "]"  # Remove last coma
    json_str += "\n}"

    return json_str

In [8]:
def rules_to_json(rules: list[Rule]) -> None:

    json_result = '{"nodes":['
    node_id = 0
    x = 0
    y = 0

    for rule in rules:
        json_result += f'{{\"id\":{node_id},'
        json_result += f'\"title\":\"R{node_id + 1}\",'
        json_result += f'\"x\":{x},'
        json_result += f'\"y\":{y},'
        json_result += f'\"weight\":"{rule.weight}",'
        json_result += f'\"tooltip\":\"{
            rule.rule_str.replace('"', '\\"').strip("\n")}\"}},'
        x += 150
        node_id += 1
        if x == 1050:  # After 6 nodes, restart row
            x = 0
            y += 150

    json_result = json_result[:-1] + "],\"edges\":[]}"
    return json_result

In [9]:
def parse_rulex(root_folder: str,  # root folder where files are being stored
                df: pd.core.frame.DataFrame,  # data frame used to generate the rules
                file_rules_name: str,  # xlsx file with all the rules generated by rules
                conclusion_set: list[tuple],  # Conclusion set of rules
                data_name: str,  # Name used to the feature set
                max_categories: int = 1000000) -> None:

    random_values = set()
    while len(random_values) < max_categories:
        random_values.add(random.randint(0, max_categories * 10))

    xls = pd.ExcelFile(file_rules_name)

    feature_set = []
    for sheet_name in xls.sheet_names:
        rules_df = pd.read_excel(file_rules_name, sheet_name=sheet_name)

        file_rule_name = "".join(
            x for x in sheet_name if x.isalnum()) + "_rules"
        rule_file_txt = root_folder + "results framework/" + file_rule_name + ".txt"
        rule_file_json = root_folder + "results framework/" + file_rule_name + ".json"

        feature_set, rules = create_rules_and_update_feature_set(
            rules_df, df, conclusion_set, random_values, feature_set)

        file = open(rule_file_txt, "w")
        for rule in rules:
            file.write(str(rule))
            file.write("\n")
        file.close()

        file = open(rule_file_json, "w")
        file.write(rules_to_json(rules))
        file.close()

    json_str = ""
    json_str += feature_set_to_json(feature_set, conclusion_set, data_name)
    file = open(root_folder + "results framework/" + data_name.capitalize() + "_featureset.json", "w")
    file.write(json_str)
    file.close()
    # print(json_str)

In [10]:
# Adult file configuration
root_folder = "./data/Adult/unlimited_conditions/"
bank_df = pd.read_csv("./data/Adult/adult_numeric.csv")
file_rules_name = root_folder + '/Scoring_Rules_Adult.xlsx'
conclusion_set = [("no", 0, 0), ("yes", 1, 1)]

parse_rulex(root_folder, bank_df, file_rules_name, conclusion_set, "adult")

TypeError: write() argument must be str, not Rule

In [None]:
# # Bank file configuration
# root_folder = "./data/Bank/4_conditions/"
# bank_df = pd.read_csv("./data/Bank/bank_numeric.csv")
# file_rules_name = root_folder + '/Scoring_Rules_Bank.xlsx'
# conclusion_set = [("no", 0, 0), ("yes", 1, 1)]

# parse_rulex(root_folder, bank_df, file_rules_name, conclusion_set, "bank")

In [None]:
# # Cars file configuration
# root_folder = "./data/Cars/4_conditionals/"
# bank_df = pd.read_csv(root_folder + "cars_numeric.csv")
# file_rules_name = root_folder + 'Scoring_Rules_Cars.xlsx'
# conclusion_set = [("unacc", 0, 0), ("acc", 1, 1)]

# parse_rulex(root_folder, bank_df, file_rules_name, conclusion_set, "cars")

In [None]:
# # Myocardial file configuration
# root_folder = "./data/Myocardial/4_conditions/"
# bank_df = pd.read_csv("./data/Myocardial/myocardial_numeric.csv")
# file_rules_name = root_folder + '/Scoring_Rules_Myocardial.xlsx'
# conclusion_set = [("no", 0, 0), ("yes", 1, 1)]

# parse_rulex(root_folder, bank_df, file_rules_name, conclusion_set, "myocardial")