In [154]:
from typing import (
    Optional,
    Iterable,
    Sequence,
    Set,
    Literal,
    Callable,
    Generator,
    Self,
    TypeVar,
    Mapping,
)
from dataclasses import dataclass
from operator import itemgetter, attrgetter, add, contains, or_
from itertools import (
    repeat,
    accumulate,
    chain,
    groupby,
    starmap,
    filterfalse,
)
from functools import reduce, partial
from random import seed, randint, choice, choices, sample
from collections import Counter, defaultdict
from math import ceil
from enum import Enum
from treelib import Tree, Node
from polars import DataFrame
from notebook_parameters import NotebookParameters

## Вспомогательные классы

In [155]:
class VarEnum(Enum):
    FORBIDDEN = "FORBIDDEN"
    FREE = "FREE"


InputVarValue = Literal[VarEnum.FREE] | int
OutputVarValue = Literal[VarEnum.FORBIDDEN, VarEnum.FREE] | int


@dataclass(frozen=True, repr=False, kw_only=True)
class Cube:
    input_vars: tuple[InputVarValue, ...]
    output_vars: tuple[OutputVarValue, ...]
    source: str = "unknown"

    def __repr__(self: Self) -> str:
        return f"{format_input_vars(self.input_vars)} | {format_output_vars(self.output_vars)} | {self.source}"

    def __str__(self: Self) -> str:
        return self.__repr__()

    def to_global(self: Self, output_var_index: int, source: str = "unknown") -> "Cube":
        return Cube(
            input_vars=local_to_global_input_var_list(
                output_var_index, self.input_vars
            ),
            output_vars=self.output_vars,
            source=source,
        )

    def with_replacements(
        self: Self,
        input_vars: dict[int, InputVarValue] = {},
        output_vars: dict[int, OutputVarValue] = {},
        source: str = "unknown",
    ) -> "Cube":
        return Cube(
            input_vars=tuple(
                starmap(
                    lambda i, x: x if i not in input_vars else input_vars[i],
                    enumerate(self.input_vars),
                )
            ),
            output_vars=tuple(
                starmap(
                    lambda i, x: x if i not in output_vars else output_vars[i],
                    enumerate(self.output_vars),
                )
            ),
            source=source,
        )


@dataclass(frozen=True, kw_only=True)
class AddItems:
    iterable: Iterable[str]


@dataclass(frozen=True, kw_only=True)
class PopItems:
    set: Set[str]

## Функции форматирования вывода

In [156]:
def format_input_vars(input_vars: tuple[InputVarValue, ...]) -> str:
    """Преобразование входных переменных куба в строку для вывода"""
    return " ".join(
        list(map(lambda x: "x" if x == VarEnum.FREE else str(x), input_vars))
    )


def format_output_vars(output_vars: tuple[OutputVarValue, ...]) -> str:
    """Преобразование выходных переменных куба в строку для вывода"""
    return " ".join(
        list(
            map(
                lambda y: (
                    "\u0305y"
                    if y == VarEnum.FORBIDDEN
                    else "y" if y == VarEnum.FREE else str(y)
                ),
                output_vars,
            )
        )
    )


def format_input_output_vars(input_vars, output_vars) -> str:
    """Строковое представление куба"""
    return f"{format_input_vars(input_vars)} | {format_output_vars(output_vars)}"

## Функции работы с деревом

In [157]:
def add_child(parent: Node, tree: Tree, child_input_vars: tuple[InputVarValue]) -> str:
    return tree.create_node(
        tag=format_input_output_vars(child_input_vars, parent.data.output_vars),
        data=Cube(
            input_vars=child_input_vars,
            output_vars=parent.data.output_vars,
        ),
        parent=parent.identifier,
    ).identifier


def create_initial_cube(output_var_index: int, l: int, tree: Tree) -> str:
    input_vars: tuple[InputVarValue, ...] = tuple(
        repeat(VarEnum.FREE, len(cs[output_var_index]))
    )
    output_vars: tuple[OutputVarValue, ...] = (
        *repeat(VarEnum.FORBIDDEN, output_var_index),
        VarEnum.FREE,
        *repeat(VarEnum.FORBIDDEN, l - output_var_index - 1),
    )
    return tree.create_node(
        tag=format_input_output_vars(input_vars, output_vars),
        data=Cube(
            input_vars=input_vars,
            output_vars=output_vars,
        ),
    ).identifier

## Вспомогательные функции

In [158]:
Member = TypeVar("Member")


def take_random_members_from_set(
    st: Sequence[Member], probability: float
) -> list[Member]:
    """Получение заданной доли множества, заполненной произвольными его элементами"""
    if probability:
        return sample(st, ceil(len(st) * probability))
    return []


class GenericCounter(Counter, Mapping[Member, int]):
    pass


def new_sample(seq: Sequence[int]) -> Sequence[int]:
    return sample(seq, k=len(seq))


def shuffled_least_common_items(counter: GenericCounter[Member]) -> Iterable[Member]:
    return map(
        itemgetter(0),
        reduce(
            add,
            starmap(
                lambda k, v: new_sample(list(v)),
                groupby(reversed(counter.most_common()), itemgetter(1)),
            ),
        ),
    )

## Функции подсчёта связанных переменных

In [159]:
def cube_connected_vars(cube: Cube) -> int:
    return num_connected_vars(cube.input_vars)


# def input_vars_count(output_var_index):
#    return len(cs[output_var_index])


def num_connected_vars(input_vars: tuple[InputVarValue, ...]) -> int:
    """Подсчёт связанных переменных в кубе"""
    return len(list(filter(lambda x: x != VarEnum.FREE, input_vars)))

## Функции сокращения результирующего множества

In [160]:
def enough_values_to_reduce(target_set: list[Cube]) -> bool:
    mus = defaultdict(
        lambda: 0, Counter(map(lambda x: num_connected_vars(x.input_vars), target_set))
    )
    return all(
        starmap(lambda i, p: p == 0 or mus[i] > 0, enumerate(connected_probabilities))
    )


def count_ross(reduced_set: list[Cube]) -> list[float]:
    N = len(reduced_set)
    # μ
    mus = defaultdict(
        lambda: 0, Counter(map(lambda x: num_connected_vars(x.input_vars), reduced_set))
    )
    # μ'
    muss = list(
        map(
            lambda n: max(0, (N + 1) * connected_probabilities[n] - mus[n]),
            range(m + 1),
        )
    )
    # сумма μ'
    mussum = sum(muss)
    # ρ'
    ross = list(map(lambda n: muss[n] / mussum, range(m + 1)))
    return ross


def reduce_set_to_probability(reduced_set: list[Cube]) -> list[Cube]:
    """Удаление элементов множества для получения распределения вероятностей переменных, схожего с обозначенным в массиве connected_probabilities"""
    ross = count_ross(reduced_set)
    groups = defaultdict(
        lambda: [],
        starmap(
            lambda i, x: (i, list(map(itemgetter(1), x))),
            groupby(
                sorted(
                    map(lambda x: (cube_connected_vars(x), x), reduced_set),
                    key=itemgetter(0),
                ),
                key=itemgetter(0),
            ),
        ),
    )
    res_groups: dict[int, list[Cube]] = dict(
        map(
            lambda i: (i, take_random_members_from_set(groups[i], ross[i])),
            range(len(ross)),
        )
    )
    max_index, max_group = max(iter(res_groups.items()), key=lambda x: len(x[1]))
    max_count = len(max_group)
    max_probability = connected_probabilities[max_index]
    if max_probability == 0:
        raise Exception(
            f"ross {ross}\nmax_index: {max_index}\nmax_count: {max_count}\nres_groups {list(res_groups.items())}\ngroups: {dict(groups)}"
        )
    relative_probabilities = list(
        map(lambda x: x / max_probability, connected_probabilities)
    )
    fill_indices = list(
        filter(
            lambda pair: pair[1] > 0 and len(res_groups[pair[0]]) == 0,
            enumerate(relative_probabilities),
        )
    )
    fill_groups = dict(
        starmap(
            lambda i, prob: (i, sample(groups[i], ceil(prob * max_count))), fill_indices
        )
    )
    res_groups = res_groups | fill_groups
    return list(sum(map(itemgetter(1), iter(res_groups.items())), []))

## Функции преобразования индексов переменных кубов

In [161]:
def numerate_input_vars(
    input_vars: tuple[InputVarValue, ...], output_var_index: int
) -> Iterable[tuple[int, InputVarValue]]:
    """Нумерация индексов входных переменных атрибута"""
    return zip(cs[output_var_index], input_vars)


def has_last_input_var(output_var_index: int) -> bool:
    """Проверка наличия последней входной переменной в массиве связанных переменных"""
    return (m - 1) in cs[output_var_index]


def local_to_global_input_var_index(output_var_index: int, local_index: int) -> int:
    """Получение индекса переменной с учётом отступа из несвязанных переменных"""
    return cs[output_var_index][local_index]


def local_to_global_input_var_list(
    output_var_index: int, input_vars: tuple[InputVarValue, ...]
) -> tuple[InputVarValue, ...]:
    """Заполнение пробелов в списке связанных входных переменных значениями свободных переменных
    для получения списка с длиной, равной длине списка входных переменных (растягивание списка)
    """
    filler_type = tuple[tuple[InputVarValue, ...], int]

    def space_filler(acc: filler_type, x: tuple[int, InputVarValue]) -> filler_type:
        free_vars: Iterable[Literal[VarEnum.FREE]] = repeat(
            VarEnum.FREE, x[0] - acc[1] - 1
        )
        return (
            (
                *acc[0],
                *free_vars,
                x[1],
            ),
            x[0],
        )

    numerated_vars: Iterable[tuple[int, InputVarValue]] = chain(
        numerate_input_vars(input_vars, output_var_index),
        (
            [
                (
                    m - 1,
                    VarEnum.FREE,
                )
            ]
            if not has_last_input_var(output_var_index)
            else []
        ),
    )
    reduce_init: tuple[tuple[InputVarValue, ...], int] = (
        tuple(),
        -1,
    )
    # type: ignore
    result: tuple[InputVarValue, ...] = tuple(
        reduce(space_filler, numerated_vars, reduce_init)
    )[
        0
    ]  # type: ignore
    if len(result) != m:
        raise Exception()
    return result

## Функции преобразования вероятностей количества связанных переменных

In [162]:
def needed_probability(
    connected_var_count: int,
    bound_needed_count: int,
    min_group_bound_needed_count: int,
    connected_probability: float,
    weighted_connected_probability: float,
    min_group_probability_sum: float,
) -> float:
    """Получение вероятности включения куба
    в целевое множество на основе
    * Количества кубов в целевом множестве
    с таким же количеством связанных переменных
    * Целевого количества кубов
    с этим количеством связанных переменных
    * Минимального количества кубов
    с этим количеством связанных переменных,
    необходимого для соблюдения целевого распределения
    * Целевой вероятности включения куба
    с заданным числом переменных в целевое множество
    * Суммы минимальных вероятностей
    для получения целевого распределения
    """
    if connected_probability > 0:
        group_ratio: int = connected_var_count % min_group_bound_needed_count
        if group_ratio > 0:
            return weighted_connected_probability / (
                group_ratio / min_group_bound_needed_count
                if connected_var_count > 0
                else 1 / min_group_probability_sum
            )
        # return weighted_connected_probability / (prob_distribution((connected_var_count or 1) / (min_group_bound_needed_counts))) if min_group_bound_needed_count > 0 else 1)
        return weighted_connected_probability / (
            (connected_var_count / (bound_needed_count or 1)) or 1
        )
    return 0

## Функции выбора целевого количества связанных переменных 

In [163]:
def needed_j(connected_probabilities: list[float]) -> int:
    """
    Псевдослучайный датчик ИС, на вход принимает
    таблицу вероятностей p для каждого j, возвращает
    j - число требуемых связанных переменных
    """
    return choices(range(m + 1), connected_probabilities)[0]


def max_connected_probability_index(connected_probabilities: list[float]):
    """Получение максимального индекса связанной переменной для массива всех связанных переменных"""
    return list(filter(lambda pair: pair[1] > 0, enumerate(connected_probabilities)))[
        -1
    ][0]


def max_input_vars_reached(
    connected_probabilities: list[float], input_vars: tuple[InputVarValue]
) -> bool:
    return num_connected_vars(input_vars) >= max_connected_probability_index(
        connected_probabilities
    )

## Гамма-трансформация

### Итерация по кандидатам на гамма-трансформацию

In [164]:
augmentable_list: list[str] = []


def augmentable_generator() -> (
    Generator[Optional[str], Optional[AddItems | PopItems], None]
):
    """Итератор по augmentable_list с возможностью добавления/удаления следующих элементов без прерывания итерации"""
    global augmentable_list
    while augmentable_list:
        command: Optional[AddItems | PopItems] = yield augmentable_list.pop(0)
        if command is not None:
            yield None
            if isinstance(command, AddItems):
                for new_item in command.iterable:
                    augmentable_list.insert(randint(0, len(augmentable_list)), new_item)
            elif isinstance(command, PopItems):
                in_set: Callable[[str], bool] = partial(contains, command.set)
                augmentable_list = list(filterfalse(in_set, augmentable_list))

### Прямая гамма-трансформация
Генерация следующих кандидатов на гамма-трансформацию 

In [165]:
used_indices: set[int] = set()


def random_unbound_index(input_vars: list[InputVarValue]) -> int | None:
    """Псевдослучайный выбор индекса входной несвязанной переменной"""
    matching_indices = set(
        map(
            itemgetter(0), filter(lambda x: x[1] == VarEnum.FREE, enumerate(input_vars))
        )
    )
    if matching_indices:
        unused_matching_indices = matching_indices - used_indices
        selected_index = choice(list(unused_matching_indices or matching_indices))
        used_indices.add(selected_index)
        return selected_index
    return None


def gamma_transformation(
    output_var_index: int,
    cube_input_vars: list[InputVarValue],
    input_var_lens: list[float],
) -> Iterable[tuple[InputVarValue]]:
    """Гамма-трансформация - подстановка на место произвольной свободной входной переменной всех её возможных значений"""
    var_index = random_unbound_index(cube_input_vars)
    if var_index is not None:
        var_len = input_var_lens[
            local_to_global_input_var_index(output_var_index, var_index)
        ]
        (vars_before, vars_after) = itemgetter(
            slice(0, var_index), slice(var_index + 1, m)
        )(cube_input_vars)
        return map(
            lambda var: (
                *vars_before,
                var,
                *vars_after,
            ),
            range(var_len),
        )
    return []

## Заполнение переменных

In [166]:
def cubes_with_no_output_var_conflicts(cube: Cube) -> Callable[[Cube], bool]:
    """Создание функции для проверки пересечения множества выходных переменных"""
    return lambda c: all(
        map(
            lambda pair: len(pair) == 1
            or VarEnum.FORBIDDEN in pair
            or VarEnum.FREE in pair,
            map(set, zip(cube.output_vars, c.output_vars)),
        )
    )


def count_var_values(
    all_cubes: list[Cube], input_var_lens: list[int], output_var_lens: list[int]
):  # -> tuple[tuple[Counter], tuple[Counter]]:
    """Подсчёт количества кубов для каждого значения входных и выходных переменных"""
    empty_var_len_type = dict[int, Literal[0]]

    def var_len_to_dict(var_len: int) -> empty_var_len_type:
        return dict(map(lambda i: (i, 0), range(var_len)))

    def var_lens_to_dict(var_lens: Iterable[int]) -> tuple[empty_var_len_type, ...]:
        return tuple(map(var_len_to_dict, var_lens))

    initial_counters: tuple[
        tuple[dict[int, Literal[0]], ...], tuple[dict[int, Literal[0]], ...]
    ] = (
        var_lens_to_dict(input_var_lens),
        var_lens_to_dict(output_var_lens),
    )
    # cube_lists_type = tuple[list[list[InputVarValue]], list[[list[OutputVarValue]]]]

    def cube_to_lists(cube):  # -> cube_lists_type:
        return (
            list(map(lambda x: [x], cube.input_vars)),
            list(map(lambda x: [x], cube.output_vars)),
        )

    # def join_var_lists(acc_vars: cube_lists_type, cube_lists: cube_lists_type) -> cube_lists_type:
    #     return starmap(add, zip(acc_vars, x_vars))
    return tuple(
        map(
            lambda grp: tuple(map(Counter, starmap(or_, zip(*grp)))),
            zip(
                initial_counters,
                map(
                    lambda grp: list(map(lambda values: dict(Counter(values)), grp)),
                    reduce(
                        lambda acc, x: list(
                            starmap(
                                lambda accvars, xvars: list(
                                    starmap(add, zip(accvars, xvars))
                                ),
                                zip(acc, x),
                            )
                        ),
                        map(
                            cube_to_lists,
                            all_cubes,
                        ),
                    ),
                ),
            ),
        )
    )

### Заполнение входных переменных

In [167]:
def fill_cube_with_input_vars(cube: Cube, all_cubes: list[Cube]) -> Cube:
    output_var_conflicting_cubes = list(
        filterfalse(cubes_with_no_output_var_conflicts(cube), all_cubes)
    )
    if len(output_var_conflicting_cubes) == 0:
        output_var_conflicting_cubes = all_cubes
    input_vars, _ = count_var_values(
        output_var_conflicting_cubes, input_var_lens, output_var_lens
    )
    for i, _ in filter(lambda x: isinstance(x[1], VarEnum), enumerate(cube.input_vars)):
        least_common_input_var = next(
            filter(
                lambda x: not isinstance(x, VarEnum),
                shuffled_least_common_items(input_vars[i]),
            )
        )
        cube = cube.with_replacements(
            input_vars={i: least_common_input_var}, source="fill"
        )
    return cube

### Заполнение выходных переменных

#### Заполнение свободных переменных

In [168]:
def random_output_value_by_index(output_var_index: int) -> int:
    return choice(range(output_var_lens[output_var_index]))


def fill_output_vars(cube: Cube) -> Cube:
    """Заполнение свободных выходных переменных куба случайными значениями"""
    return Cube(
        input_vars=cube.input_vars,
        output_vars=tuple(
            starmap(
                lambda i, x: (
                    random_output_value_by_index(i) if x == VarEnum.FREE else x
                ),
                enumerate(cube.output_vars),
            )
        ),
        source=cube.source,
    )

#### Заполнение запрещённых переменных

In [169]:
def fill_cube_with_output_vars(
    all_cubes: list[Cube], conflicts_allowed=False
) -> Callable[[list[Cube], Cube], list[Cube]]:
    def reducer(acc: list[Cube], cube: Cube) -> list[Cube]:
        result_cube: Cube | None = None
        output_var_conflicting_cubes: list[Cube] = list(
            filterfalse(cubes_with_no_output_var_conflicts(cube), all_cubes)
        )
        if len(output_var_conflicting_cubes) == 0:
            output_var_conflicting_cubes = all_cubes
        _, output_vars = count_var_values(
            output_var_conflicting_cubes, input_var_lens, output_var_lens
        )
        numerated_empty_output_vars: list[tuple[int, OutputVarValue]] = list(
            filter(lambda x: isinstance(x[1], VarEnum), enumerate(cube.output_vars))
        )
        if not numerated_empty_output_vars:
            result_cube = cube
        else:
            for i, _ in numerated_empty_output_vars:
                least_common_output_vars: Iterable[int] = filter(
                    lambda x: not isinstance(x, VarEnum),
                    shuffled_least_common_items(output_vars[i]),
                    # map(
                    #     itemgetter(0),
                    #     reversed(output_vars[i].most_common()),
                    # ),
                )
                new_cube: Cube | None = next(
                    filter(
                        lambda c: c not in acc,
                        map(
                            lambda var: (result_cube or cube).with_replacements(
                                output_vars={i: var}, source="fill_output"
                            ),
                            least_common_output_vars,
                        ),
                    ),
                    None,
                )
                if new_cube is not None:
                    result_cube = new_cube
                else:
                    break
        if result_cube:
            return [result_cube, *acc]
        return acc

    return reducer

## Задание входных параметров программы

In [170]:
params_object: NotebookParameters | Literal[False] = False

In [171]:
params_object = params_object or NotebookParameters(
    i_max=1000,
    input_var_lens=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6],
    output_var_lens=[5, 5, 5, 5, 5, 5, 5, 5, 5, 5],
    attribute_connections=[
        [*range(5), *range(6, 10)],
        [*range(4), *range(5, 7)],
        [*range(2, 10)],
        [*range(1, 8), 9],
        [*range(1, 3), *range(4, 10)],
        [*range(2), *range(4, 9)],
        [2, *range(5, 8)],
        [0, 4, 7, 8],
        [*range(3), 4],
        [*range(4), *range(5, 7), 8],
    ],
    connected_probabilities=[
        0,
        0,
        0.32,
        0.38,
        0.3,
        *repeat(0, 6),
    ],
    # i_max=1000,
    # input_var_lens=[5, 5, 5, 5, 5],
    # output_var_lens=[4],
    # attribute_connections=[[0, 1, 2]],
    # connected_probabilities=[0, 0.4, 0.6, 0, 0, 0],
    # i_max=1000,
    # input_var_lens=[5, 5, 4],
    # output_var_lens=[4],
    # attribute_connections=[[0, 1, 2]],
    # connected_probabilities=[0, 0.2, 0.8, 0],
    output_file_path="./test_solution_table.csv",  # "./solution_table.csv",
)
i_max: int
input_var_lens: list[int]
output_var_lens: list[int]
attribute_connections: list[list[int]]
connected_probabilities: list[float]
output_file_path: str
(
    i_max,
    input_var_lens,
    output_var_lens,
    attribute_connections,
    connected_probabilities,
    output_file_path,
) = attrgetter(
    "i_max",
    "input_var_lens",
    "output_var_lens",
    "attribute_connections",
    "connected_probabilities",
    "output_file_path",
)(
    params_object
)
# x_review_period = 100  # период пересмотра значения x
# x = 0.45  # вероятность того, что входная координата является связной

## Задание параметров ИС

In [172]:
l = len(output_var_lens)  # число атрибутов решения
m = len(input_var_lens)  # число атрибутов условия
cs: list[list[int]] = (
    attribute_connections  # Множество входных переменных, от которых зависит хотя бы одна функция
)
# seed(1)

## Этап 1: Формирование входной части многовыходных кубов

In [173]:
target_set = []
# Вероятности кубов с i-м количеством связанных переменных,
# изменённые с учётом возможности попадания кубов
# с меньшим числом связанных переменных в целевое множество,
# отменяющего дальнейшую гамма-трансформацию
weighted_connected_probabilities = list(
    starmap(
        lambda p, q: p / q,
        zip(
            connected_probabilities,
            accumulate(
                map(lambda x: x or 1, connected_probabilities), lambda acc, x: acc * x
            ),
        ),
    )
)
connected_probability_sum = sum(connected_probabilities)
# Целевые количества кубов с i-м количеством связанных переменных, общие для каждой выходной переменной
bound_needed_counts: list[int] = list(
    map(
        lambda p: ceil(i_max * p / connected_probability_sum / l),
        connected_probabilities,
    )
)
min_connected_probability: float = min(filter(lambda x: x > 0, connected_probabilities))
# Минимальные необходимые для соблюдения распределения количества кубов с i-м количеством связанных переменных для каждой выходной переменной
min_group_bound_needed_counts: list[int] = list(
    map(lambda p: ceil(p / min_connected_probability), connected_probabilities)
)
min_group_probability_sum: int = sum(min_group_bound_needed_counts)
for output_var_index in range(l):
    global augmentable_list
    tree = Tree()
    augmentable_list = [create_initial_cube(output_var_index, l, tree)]
    bound_target_set: list[Cube] = []
    connected_var_counts: Counter = Counter()
    gen = augmentable_generator()
    for node_id in gen:
        node = tree.get_node(node_id)

        def compute_needed_probability(i: int) -> float:
            return needed_probability(
                connected_var_counts[i],
                bound_needed_counts[i],
                min_group_bound_needed_counts[i],
                connected_probabilities[i],
                weighted_connected_probabilities[i],
                min_group_probability_sum,
            )

        needed_weighted_connected_probabilities: list[float] = list(
            map(
                compute_needed_probability,
                range(len(connected_probabilities)),
            )
        )
        j: int = needed_j(needed_weighted_connected_probabilities)
        connected_vars: int = num_connected_vars(node.data.input_vars)
        if connected_vars == j:
            bound_target_set.append(
                node.data.to_global(output_var_index, source="matched")
            )
            connected_var_counts[connected_vars] += 1
        else:
            mivr: bool = max_input_vars_reached(
                connected_probabilities, node.data.input_vars
            )
            if connected_vars < j and not mivr:
                gen.send(
                    AddItems(
                        iterable=map(
                            partial(add_child, node, tree),
                            gamma_transformation(
                                output_var_index, node.data.input_vars, input_var_lens
                            ),
                        )
                    )
                )
            else:
                parent_node = tree.parent(node_id)
                if parent_node:
                    new_popped_items = set(
                        map(
                            attrgetter("identifier"),
                            tree.subtree(parent_node.identifier).all_nodes(),
                        )
                    )
                    gen.send(PopItems(set=set(new_popped_items)))
                bound_target_set.append(
                    parent_node.data.to_global(output_var_index, source="parent")
                )
                connected_var_counts[
                    num_connected_vars(parent_node.data.input_vars)
                ] += 1
    if enough_values_to_reduce(bound_target_set):
        # Пересчёт целевого множества для одной выходной координаты
        bound_target_set = reduce_set_to_probability(bound_target_set)
    target_set.extend(bound_target_set)

In [174]:
# Печать дерева
st = tree.subtree(tree.root)
for node in st.children(st.root)[1:]:
    st.remove_subtree(node.identifier)
for i, node in enumerate(st.children(st.children(st.root)[0].identifier)):
    if i != 4:
        st.remove_subtree(node.identifier)
st.to_graphviz("tree.dot", shape="rect")

In [175]:
# Результирующее множество кубов, распределённое по количеству связанных переменных
print(Counter(map(lambda x: num_connected_vars(x.input_vars), target_set)))

Counter({3: 323, 2: 267, 4: 236})


## Этап 2: Пересчёт распределения кубов до целевых вероятностей количества связанных переменных

In [176]:
# Пересчёт целевого множества для всех выходных координат
if not enough_values_to_reduce(target_set):
    raise Exception(f"Not enough values to reduce {target_set}")
target_set = reduce_set_to_probability(target_set)

## Заполнение свободных выходных координат

In [177]:
target_set = list(map(fill_output_vars, target_set))

In [178]:
# Количество кубов по источникам происхождения - parent - обратная λ-трансформация, matched - прямая λ-трансформация
print(Counter(map(lambda x: (x.source, num_connected_vars(x.input_vars)), target_set)))

Counter({('matched', 4): 236, ('parent', 2): 207, ('parent', 3): 189, ('matched', 3): 110, ('matched', 2): 45})


In [179]:
# Целевое распределение кубов
needed_distribution = tuple(enumerate(connected_probabilities))

In [180]:
# Результирующее распределение кубов по количеству связанных переменных
result_distribution = tuple(
    starmap(
        lambda i, x: (i, x / len(target_set)),
        Counter(map(lambda c: num_connected_vars(c.input_vars), target_set)).items(),
    )
)

In [181]:
# Вывод всего целевого множества
print("\n".join(map(str, target_set)))

x x x 0 x 2 x x x x | ̅y ̅y ̅y 2 ̅y ̅y ̅y ̅y ̅y ̅y | parent
x x x x 3 x x 1 x x | ̅y ̅y ̅y ̅y ̅y 0 ̅y ̅y ̅y ̅y | parent
2 x 2 x x x x x x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y | parent
4 x x 0 x x x x x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y 4 | parent
0 x 4 x x x x x x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y | parent
4 x x x x x x 3 x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y 0 ̅y ̅y | parent
x x x 1 x 4 x x x x | ̅y 0 ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y | parent
x x x x x 4 x 1 x x | ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y | parent
x x x x x 2 x 3 x x | ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y | parent
x x x x x 5 x 2 x x | ̅y ̅y ̅y ̅y ̅y ̅y 1 ̅y ̅y ̅y | matched
0 x x x x x x 2 x x | ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y ̅y | matched
x x x x 3 x x 4 x x | ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y ̅y | parent
x x x x 1 x x 4 x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y 4 ̅y ̅y | matched
x x x x x 1 x 4 x x | ̅y ̅y ̅y ̅y ̅y ̅y 1 ̅y ̅y ̅y | parent
x x x x x 4 x 4 x x | ̅y ̅y ̅y ̅y ̅y ̅y 0 ̅y ̅y ̅y | parent
x x x x x x x 2 x 0 | 1 ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y | parent
x x x x x x x 2 2 x | ̅y ̅y ̅y ̅y ̅y 

In [182]:
mixed_cubes = target_set

In [183]:
assert len(target_set) == len(mixed_cubes)
# Вывод промежуточного целевого множества
print("\n".join(map(str, mixed_cubes)))

x x x 0 x 2 x x x x | ̅y ̅y ̅y 2 ̅y ̅y ̅y ̅y ̅y ̅y | parent
x x x x 3 x x 1 x x | ̅y ̅y ̅y ̅y ̅y 0 ̅y ̅y ̅y ̅y | parent
2 x 2 x x x x x x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y | parent
4 x x 0 x x x x x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y 4 | parent
0 x 4 x x x x x x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y | parent
4 x x x x x x 3 x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y 0 ̅y ̅y | parent
x x x 1 x 4 x x x x | ̅y 0 ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y | parent
x x x x x 4 x 1 x x | ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y | parent
x x x x x 2 x 3 x x | ̅y ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y | parent
x x x x x 5 x 2 x x | ̅y ̅y ̅y ̅y ̅y ̅y 1 ̅y ̅y ̅y | matched
0 x x x x x x 2 x x | ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y ̅y | matched
x x x x 3 x x 4 x x | ̅y ̅y ̅y ̅y ̅y 2 ̅y ̅y ̅y ̅y | parent
x x x x 1 x x 4 x x | ̅y ̅y ̅y ̅y ̅y ̅y ̅y 4 ̅y ̅y | matched
x x x x x 1 x 4 x x | ̅y ̅y ̅y ̅y ̅y ̅y 1 ̅y ̅y ̅y | parent
x x x x x 4 x 4 x x | ̅y ̅y ̅y ̅y ̅y ̅y 0 ̅y ̅y ̅y | parent
x x x x x x x 2 x 0 | 1 ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y ̅y | parent
x x x x x x x 2 2 x | ̅y ̅y ̅y ̅y ̅y 

## Заполнения оставшихся лакун кубов

In [184]:
initial_list: list[Cube] = []  # for mypy
filled_cubes: list[Cube] = list(
    reduce(
        fill_cube_with_output_vars(mixed_cubes),
        map(lambda cube: fill_cube_with_input_vars(cube, mixed_cubes), mixed_cubes),
        initial_list,
    )
)
print("Результирующая таблица решений:\n", "\n".join(map(str, filled_cubes)))

Результирующая таблица решений:
 3 4 3 4 4 0 0 4 4 4 | 4 1 3 0 4 1 0 1 3 0 | fill_output
3 1 2 3 4 4 1 5 2 5 | 0 4 1 1 2 1 4 3 4 0 | fill_output
0 1 2 4 2 3 1 2 4 2 | 2 2 0 4 4 0 0 1 0 0 | fill_output
2 0 4 0 0 0 3 3 3 0 | 2 1 3 2 0 3 0 1 2 4 | fill_output
0 4 2 1 5 0 2 4 2 2 | 3 4 2 1 4 4 1 2 2 0 | fill_output
4 2 2 3 1 2 0 0 0 1 | 0 4 0 1 3 0 3 1 3 1 | fill_output
2 3 3 3 5 5 1 3 4 2 | 3 1 1 2 3 1 4 0 3 3 | fill_output
2 2 3 4 3 0 3 2 4 5 | 3 0 4 4 3 3 2 3 2 4 | fill_output
5 1 3 5 4 1 3 2 3 1 | 1 4 0 3 0 1 2 0 0 4 | fill_output
4 5 0 2 5 0 5 2 1 3 | 4 1 0 2 0 1 0 0 3 0 | fill_output
2 2 4 3 4 2 0 0 0 4 | 1 3 2 2 2 1 0 2 2 4 | fill_output
0 2 4 0 4 0 1 1 2 0 | 2 4 4 4 2 0 1 1 1 0 | fill_output
2 5 3 2 4 4 1 1 5 5 | 4 4 4 2 4 3 1 0 4 4 | fill_output
0 0 5 4 2 3 2 4 0 5 | 4 4 2 3 1 0 1 2 0 3 | fill_output
2 1 0 4 1 4 5 2 2 1 | 4 1 0 1 1 2 0 4 3 3 | fill_output
4 3 1 3 1 0 2 1 4 0 | 2 2 0 0 1 0 1 4 2 4 | fill_output
0 1 4 5 2 4 5 3 3 1 | 0 0 3 3 3 3 1 1 3 3 | fill_output
2 5 1 4 0 0 4 5

## Запись результатов

In [185]:
df: DataFrame = DataFrame(
    reduce(
        lambda acc, x: dict(map(lambda key: (key, [*acc[key], x[key]]), x.keys())),
        starmap(
            lambda i, cube: (
                {"No": i}
                | dict(starmap(lambda i, v: (f"i{i}", v), enumerate(cube.input_vars)))
                | dict(starmap(lambda i, v: (f"o{i}", v), enumerate(cube.output_vars)))
            ),
            enumerate(filled_cubes),
        ),
        {"No": []}
        | dict(
            chain(
                map(lambda i: (f"i{i}", []), range(m)),
                map(lambda i: (f"o{i}", []), range(l)),
            )
        ),
    )
)
df.write_csv(output_file_path)

In [186]:
print(len(set(filled_cubes)))
print("Целевое распределение:", needed_distribution)
print("Реальное распределение:", result_distribution)

787
Целевое распределение: ((0, 0), (1, 0), (2, 0.32), (3, 0.38), (4, 0.3), (5, 0), (6, 0), (7, 0), (8, 0), (9, 0), (10, 0))
Реальное распределение: ((2, 0.3202033036848793), (3, 0.3799237611181703), (4, 0.29987293519695046))
