In [253]:
from polars import DataFrame, Series, read_csv, col
from itertools import product, batched, cycle, starmap, groupby
from functools import partial, reduce
from operator import itemgetter, attrgetter, methodcaller
from enum import Enum
from numpy import abs
from typing import Any
from dataclasses import dataclass

In [254]:
class VarTypeEnum:
    NUMERICAL = "NUMERICAL"
    NOMINAL = "NOMINAL"


@dataclass(frozen=True)
class VarCol:
    title: str
    type: VarTypeEnum


@dataclass(frozen=True)
class VarCell:
    type: VarTypeEnum
    value: Any
    epsilon: float

In [255]:
def prepare_table(df: DataFrame, input_var_cols: list[VarCol]) -> DataFrame:
    numerical_cols = None
    nominal_cols = None
    for k, g in groupby(
        sorted(input_var_cols, key=attrgetter("type")), attrgetter("type")
    ):
        group = list(map(attrgetter("title"), g))
        if k == VarTypeEnum.NUMERICAL:
            numerical_cols = group
        elif k == VarTypeEnum.NOMINAL:
            nominal_cols = group
    nominal_convert_expr = []
    if nominal_cols is not None:
        mapping_dicts = list(
            map(
                lambda c: (
                    c,
                    dict(
                        starmap(
                            lambda i, x: (x, i), enumerate(df.get_column(c).unique())
                        )
                    ),
                ),
                nominal_cols,
            )
        )
        nominal_convert_expr = starmap(
            lambda c, d: col(c).replace(d).alias(c), mapping_dicts
        )
    numerical_convert_expr = []
    if numerical_cols:
        numerical_maxs = df.select(map(lambda c: col(c).max(), numerical_cols)).rows()[
            0
        ]
        numerical_mins = df.select(map(lambda c: col(c).min(), numerical_cols)).rows()[
            0
        ]
        print(f"numerical_maxs {numerical_maxs}")
        print(f"numerical_mins {numerical_mins}")
        numerical_convert_expr = starmap(
            lambda c, mx, mn: ((col(c) - mn) / (mx - mn)).alias(c),
            zip(numerical_cols, numerical_maxs, numerical_mins),
        )
    return df.with_columns(*nominal_convert_expr, *numerical_convert_expr)


def fuzzy_relation_matrix(
    df: DataFrame, var_cols: list[VarCol], lbda: float = 1
) -> list[tuple[float]]:
    def fuzzy_similarity(p1: VarCell, p2: VarCell):
        if p1.type != p2.type:
            return 0
        if p1.type == VarTypeEnum.NOMINAL:
            return int(p1.value == p2.value)
        elif p1.type == VarTypeEnum.NUMERICAL:
            assert p1.epsilon == p2.epsilon, "comparing different columns"
            diff = abs(p1.value - p2.value)
            if diff <= p1.epsilon:
                return 1 - diff
            return 0

    curr_columns: list[Series] = list(
        map(df.get_column, map(attrgetter("title"), var_cols))
    )
    column_epsilons: list[float] = list(
        map(
            lambda c: (
                df.get_column(c.title).std() if c.type == VarTypeEnum.NUMERICAL else 0
            )
            / lbda,
            var_cols,
        )
    )
    # print(f'column_epsilons {column_epsilons}')
    curr_columns_values_with_type = list(
        starmap(
            lambda typ, lis, eps: list(
                starmap(VarCell, zip(cycle([typ]), lis, cycle([eps])))
            ),
            zip(
                map(attrgetter("type"), var_cols),
                map(methodcaller("to_list"), curr_columns),
                column_epsilons,
            ),
        )
    )
    # print(f'curr_columns_values_with_type {curr_columns_values_with_type}')
    return list(
        map(
            tuple,
            batched(
                starmap(
                    lambda ct1, ct2: reduce(
                        conjunction, starmap(fuzzy_similarity, zip(ct1, ct2)), 1
                    ),
                    product(zip(*curr_columns_values_with_type), repeat=2),
                ),
                len(curr_columns_values_with_type[0]),
            ),
        )
    )


# aka t-norm
def conjunction(a: float, b: float) -> float:
    return min(a, b)


# aka t-conorm


def disjunction(a: float, b: float) -> float:
    return max(a, b)


def negation(a: float) -> float:
    return 1 - a


def get_var_cols_with_types(
    var_cols: list[str], var_col_types: list[VarTypeEnum]
) -> list[VarCol]:
    return list(starmap(VarCol, zip(var_cols, var_col_types)))


# X - целевое множество ([x_i]_R_Q), x - проверяемый элемент, y - произвольный элемент U


def lower_approximation(x: int, M: list[tuple[float]], X: list[float]) -> float:
    bR = M[x]
    return min(map(lambda y: disjunction(negation(bR[y]), X[y]), range(len(X))))


def upper_approximation(x: int, M: list[tuple[float]], X: list[float]) -> float:
    bR = M[x]
    return max(map(lambda y: conjunction(bR[y], X[y]), range(len(X))))


def fuzzy_partition(M: list[tuple[float]]):
    return [*M]


def positive_region(P: list[tuple[float]], Q: list[tuple[float]]):
    return lambda x: tuple(
        map(lambda xi: lower_approximation(x, P, xi), fuzzy_partition(Q))
    )


# gamma_P(Q)
def correlation(P: list[tuple[float]], Q: list[tuple[float]]) -> float:
    return sum(map(sum, map(positive_region(P, Q), range(len(P))))) / len(P)


def relevance(M: list[tuple[float]], Ms: list[list[tuple[float]]]):
    return sum(map(partial(correlation, M), Ms)) / len(Ms)


def significance(
    old_rel: float, new_M: list[tuple[float]], Ms: list[list[tuple[float]]]
) -> float:
    return relevance(new_M, Ms) - old_rel

In [256]:
def FRUAR(df: DataFrame, input_var_cols: list[VarCol]):
    label = True
    R: list[VarCol] = []
    B = input_var_cols.copy()
    last_rel = 0
    print(B)
    Ms = list(map(lambda c: fuzzy_relation_matrix(df, [c]), input_var_cols))
    print("Ms", *Ms, sep="\n")
    while label:
        crMs = list(map(lambda c: fuzzy_relation_matrix(df, [*R, c]), B))
        relevances = list(map(lambda M: relevance(M, Ms), crMs))
        max_index = max(enumerate(relevances), key=itemgetter(1))[0]
        sig = significance(last_rel, crMs[max_index], Ms)
        last_rel = relevances[max_index]
        if sig > 0:
            R.append(B[max_index])
            B.pop(max_index)
        else:
            label = False
    if len(R) == len(input_var_cols):
        return R[: len(input_var_cols)]
    return R

In [257]:
table_path: str = "./sample_solution_table.csv"
input_var_cols: list[str] = list(map(lambda i: f"i{i}", range(4)))
input_var_col_types = [
    VarTypeEnum.NOMINAL,
    VarTypeEnum.NOMINAL,
    VarTypeEnum.NUMERICAL,
    VarTypeEnum.NUMERICAL,
]
df: DataFrame = read_csv(table_path)
input_var_cols_with_types = get_var_cols_with_types(input_var_cols, input_var_col_types)
normalized_df = prepare_table(df, input_var_cols_with_types)
print(normalized_df)
print(FRUAR(normalized_df, input_var_cols_with_types))

numerical_maxs (10, 0.7)
numerical_mins (2, 0.2)
shape: (6, 4)
┌─────┬─────┬───────┬─────┐
│ i0  ┆ i1  ┆ i2    ┆ i3  │
│ --- ┆ --- ┆ ---   ┆ --- │
│ str ┆ str ┆ f64   ┆ f64 │
╞═════╪═════╪═══════╪═════╡
│ 2   ┆ 3   ┆ 1.0   ┆ 1.0 │
│ 0   ┆ 1   ┆ 0.5   ┆ 0.2 │
│ 0   ┆ 2   ┆ 0.0   ┆ 0.6 │
│ 1   ┆ 2   ┆ 0.125 ┆ 0.0 │
│ 1   ┆ 0   ┆ 0.625 ┆ 0.4 │
│ 0   ┆ 1   ┆ 0.125 ┆ 0.8 │
└─────┴─────┴───────┴─────┘
[VarCol(title='i0', type='NOMINAL'), VarCol(title='i1', type='NOMINAL'), VarCol(title='i2', type='NUMERICAL'), VarCol(title='i3', type='NUMERICAL')]
Ms
[(1, 0, 0, 0, 0, 0), (0, 1, 1, 0, 0, 1), (0, 1, 1, 0, 0, 1), (0, 0, 0, 1, 1, 0), (0, 0, 0, 1, 1, 0), (0, 1, 1, 0, 0, 1)]
[(1, 0, 0, 0, 0, 0), (0, 1, 0, 0, 0, 1), (0, 0, 1, 1, 0, 0), (0, 0, 1, 1, 0, 0), (0, 0, 0, 0, 1, 0), (0, 1, 0, 0, 0, 1)]
[(1, 0, 0, 0, 0.625, 0), (0, 1, 0, 0.625, 0.875, 0.625), (0, 0, 1, 0.875, 0, 0.875), (0, 0.625, 0.875, 1, 0, 1), (0.625, 0.875, 0, 0, 1, 0), (0, 0.625, 0.875, 1, 0, 1)]
[(1, 0, 0, 0, 0, 0.8000000000000002), 