In [44]:
# Для корректной работы ноутбука необходимо установить универсальную среду для запуска решателей minizinc
# Это необходимо для решения задач линейного программирования, описанных в файлах mzn

# Описание на wikipedia
# https://en.wikipedia.org/wiki/MiniZinc

# Документация по установке
# https://docs.minizinc.dev/en/stable/installation.html

# Скачать можно на странице по адресу:
# https://www.minizinc.org/

# Указать путь к minizinc.exe
minizinc_exec_path = "./MiniZinc/minizinc.exe"

# Путь к данным решателя по умолчанию
# Для увеличения скорости можно записывать файл в tmpfs или ramdisk (в оперативную память)
# По умолчанию данные для модели записываются в одну директорию с моделью и ноутбуком
default_minizinc_data_dir = "./data_dzn"

# Путь к экземплярам тестов и результам их вычисления
# Директория, где расположены json'ы
default_problem_instances_path = "./data"

In [45]:
import os
import copy
import json
import time
import numpy as np
import random
import asyncio
import hashlib
import networkx as nx
import itertools
import subprocess
import aiofiles
import aiofiles.os
import matplotlib.pyplot as plt
import inspect

from munkres import Munkres, print_matrix
from collections import defaultdict
from scipy.optimize import linear_sum_assignment

from tqdm.notebook import trange, tqdm
from IPython.display import clear_output

In [46]:
# Константы

INF = float("inf")

In [47]:
# reversed_enumerate
# enumerate, который производит итерацию начиная с последнего элемента в списке
def reversed_enumerate(arr: list):
    for i in range(len(arr) - 1, -1, -1):
        yield i, arr[i]

In [48]:
# deep_round
# Округление всей матрицы
def deep_round(val, prec=0):
    if not hasattr(val, '__iter__'):
        return round(val, prec)
    for i, _ in enumerate(val):
        val[i] = deep_round(val[i], prec)
    return val

In [49]:
# nget
# Вернуть значение сложного объекта по указанному пути
def nget(obj, path, defval=None):
    for k in path:
        if obj is None:
            return defval
        elif hasattr(obj, 'get'):
            if k not in obj:
                return defval
            obj = obj.get(k)
        elif hasattr(obj, '__iter__'):
            if type(k) != int:
                return defval
            elif len(obj) - 1 < k:
                return defval
            else:
                obj = obj[k]
        else:
            return defval
    return obj

# Записать значение сложного объекта по указанному пути
def nset(obj, path, val):
    _obj = nget(obj, path[0:-1])
    if _obj:
        _obj[path[-1]] = val
    return val

In [50]:
# find_rel_val
def find_rel_val(ndarr, glob_item, cmp_func):
    glob_index = tuple()
    for index, item in enumerate(ndarr):
        if hasattr(item, '__iter__'):
            _index, _item = find_rel_val(item, glob_item, cmp_func)
            if cmp_func(_item, glob_item):
                glob_item  = _item
                glob_index = (index, *_index)
            continue
        if cmp_func(item, glob_item):
            glob_item = item
            glob_index = tuple([index])
    return (glob_index, glob_item)

def _min_cmp_func(v, g):
    return v < g

def _max_cmp_func(v, g):
    return v > g

def findmin(ndarr, min_v=float("inf")):
    return find_rel_val(ndarr, min_v, _min_cmp_func)

def findmax(ndarr, max_v=-float("inf")):
    return find_rel_val(ndarr, max_v, _max_cmp_func)

def argmin(arr, min_v=float("inf")):
    i, v = findmin(arr)
    if not len(i):
         return None
    if len(i) == 1:
        return i[0]
    return i

def argmax(arr, min_v=float("inf")):
    i, v = findmax(arr)
    if not len(i):
         return None
    if len(i) == 1:
        return i[0]
    return i

In [51]:
# _mat3d

# Конвертировать 2d представление задачи в 3d
def _mat3d(mat2d, shape):
    _res3d = []
    _mat2d = []
    shape = shape.copy()
    p = shape.pop(0)
    for row in mat2d:
        _mat2d.append(row)
        if len(_mat2d) == p:
            _res3d.append(_mat2d)
            if shape:
                p = shape.pop(0)
            _mat2d = []
    return _res3d

In [52]:
# _mat2d

# Конвертировать 3d представление задачи в 2d
def _mat2d(mat3d):
    _res2d = []
    for mat in mat3d:
        _res2d.extend(mat)
    return _res2d

In [53]:
# print_osp_matrix
def print_osp_matrix(w, jsep=None):
    _, m = findmax(w)
    pad = len(str(m)) + 1
    pad0 = len(str(m))
    # print("max:", m, l)
    if type(w[0][0]) == int:
        w = [w]
    for mat in w:
        for row in mat:
            rowstr = ""
            for i, col in enumerate(row):
                rowstr += str(col).rjust(pad0 if not i else pad, " ") + ","
            rowstr = "[" + rowstr.strip(",") + "]"
            print(rowstr)
        if jsep:
            print(jsep)

In [54]:
# create_cost_matrix
# 
# Матричное представление задачи
# Рассчитать и вернуть матрицу взвешенного времени (матрицу стоимостей)
# 
def create_cost_matrix(p, w, r, s1, s2, d1, d2):
    # p:  list = PROC_TIMES     = Количество задач в работах
    # w:  list = WEIGHTS        = Веса
    # r:  list = RELEASE_DATES  = Время старта работы
    # s1: list = EARLY_S_DATES  = Время опережения запуска работы
    # s2: list = DUE_S_DATES    = Время запаздывания запуска работы
    # d1: list = EARLY_C_DATES  = Время опережения завершения работы
    # d2: list = DUE_C_DATES    = Время запаздывания завершения работы

    n    = len(w) # количество работ
    T    = sum(p) # временной горизонт
    _inf = max((T*max(w))**2, 1000) # достаточно большое число (выполняет роль бесконечности)

    # результат, матрица взвешенного времени
    # инициализировать с нулями
    cost = []

    for j in range(n):
        job = []
        
        for k in range(p[j]):
            oper = []
            _k = k + 1  # индекс работы начиная от 1 (в соотв. с описанием в задаче ЛП)
            
            for t in range(T):
                _t = t + 1  # время начиная от 1 (в соотв. с описанием в задаче ЛП)
                
                # w_jpt = последняя операция в работе
                if _k == p[j]:
                    if r[j] + p[j] - 1 <= _t:
                        w_jkt = w[j] * (  max(0, _t - d2[j]) + max(0, d1[j] - _t)  )
                    else:
                        w_jkt = _inf
                        
                # w_j1t = первая операция в работе
                elif _k == 1:
                    if r[j] <= _t and _t <= T - p[j] + 1:
                        w_jkt = w[j] * (  max(0, _t - s2[j]) + max(0, s1[j] - _t)  )
                    else:
                        w_jkt = _inf
                        
                # w_jkt = все остальные операции
                else:
                    if r[j] + _k - 1 <= _t and _t <= T - p[j] + _k:
                        w_jkt = 0
                    else:
                        w_jkt = _inf
                        
                oper.append(w_jkt)
            # end for t
            job.append(oper)
        # end for k
        cost.append(job)
    # end for j

    return cost

In [55]:
# calc_obj_from_mat
# Рассчитать и вернуть значение целевой функции из решения
# Матричное решение
def calc_obj_from_mat(w, x):
    # w = матрица взвешенного времени
    # x = матрица решения
    # Сумма по всем элементам матрицы произведения Адамара (w * x)
    if not hasattr(w, '__iter__'):
        return w * x
    _sum = 0
    for i, _ in enumerate(w):
        _sum += calc_obj_from_mat(w[i], x[i])
    return _sum

# Рассчитать и вернуть значение целевой функции из решения
# Векторное решение. К примеру эвристики
def calc_obj_from_vec(w, v):
    _sum = 0
    ktable = [0] * len(w)
    for t, j in enumerate(v):
        if j < 0: continue
        j = j - 1
        k = ktable[j]
        _sum += w[j][k][t]
        ktable[j] += 1
    return _sum

In [56]:
def get_schedule_vec(x):
    vec = [0] * len(x[0][0])
    for j, _ in enumerate(x):
        for k, _ in enumerate(x[j]):
            for t, _ in enumerate(x[j][k]):
                if x[j][k][t]:
                    vec[t] = j + 1
    return vec      

In [57]:
# get_job_time_slots

# Вернуть время, в которое выполняется каждая работа
def get_job_time_slots(s):
    # s: list = расписание
    job_times = defaultdict(list)
    for t, j in enumerate(s, start=1):
        job_times[j].append(t)
    return job_times

# Проверить прерывает ли работа j2 работу j1
def is_job_intersects(j2, j1, job_times):
    # j1: int = номер прерываемой работы
    # j2: int = номер прерывающей работы
    # job_times: get_job_time_slots()
    if not job_times[j1] or not job_times[j2]:
        return False
    start = job_times[j1][0]
    compl = job_times[j1][-1]
    for t in job_times[j2]:
        if start < t < compl:
            return True
    return False

# Проверить наличие взаимных прерываний
def has_mutual_job_intersection(s, job_times=None):
    # s: list = расписание
    # job_times: get_job_time_slots()
    if not job_times:
        job_times = get_job_time_slots(s)
    for j1, j2 in itertools.combinations(job_times.keys(), 2):
        if (
                is_job_intersects(j2, j1, job_times)
            and is_job_intersects(j1, j2, job_times)
        ):
            return True
    return False

# Проверить выполняется ли работа вовремя: вовремя запускается и вовремя завершается
def is_job_processed_intime(j, params, job_times):
    # j: int = номер работы
    # params: object = параметры работ
    # job_times: get_job_time_slots()
    if not job_times:
        return False
    start = job_times[j][0]
    compl = job_times[j][-1]
    h = j - 1
    if (
            (params["s1"][h] <= start <= params["s2"][h])
        and (params["d1"][h] <= compl <= params["d2"][h])
    ):
        return True
    return False

# Проверить выполняемость вовремя работ. Вернуть список [True, False, ...]
def get_processed_intime_jobs(s, params, job_times=None):
    # s: list = расписание
    # params: object = параметры работ
    # job_times: get_job_time_slots()
    if not job_times:
        job_times = get_job_time_slots(s)
    jobs = sorted(job_times.keys(), reverse=False)
    return [
        is_job_processed_intime(j, params, job_times)
        for j in jobs
    ]

# ----------------------------
# Тест
# ----------------------------
def main():
    test_cases = [
        { "s1": [1, 7, 4], "s2": [ 3,  9,  5], "d1": [3, 10, 7],  "d2": [ 3, 10, 10] },    # -> 1,1,1
        { "s1": [1, 1, 1], "s2": [10, 10, 10], "d1": [1,  1, 1],  "d2": [10, 10, 10] },    # -> 1,1,1
        { "s1": [1, 1, 1], "s2": [10, 10, 10], "d1": [1,  1, 4],  "d2": [10, 10,  7] },    # -> 1,1,0
        { "s1": [2, 1, 1], "s2": [ 3, 10, 10], "d1": [1,  1, 1],  "d2": [10,  8, 10] },    # -> 0,0,1
    ]

    #                1  2  3  4  5  6  7  8  9  10
    test_schedule = [1, 1, 1, 3, 3, 3, 3, 3, 2, 2]
    
    for i, params in enumerate(test_cases, 1):
        print(i, get_processed_intime_jobs(test_schedule, params))
# main()

# ----------------------------
# Тест
# ----------------------------
def main():
    test_cases = [
        [1, 1, 1, 3, 3, 3, 3, 3, 2, 2],    # Без прерываний
        [1, 1, 2, 3, 1, 2, 3, 3, 3, 3],    # Взаимное прерывание двух работ
        [1, 1, 1, 3, 3, 2, 2, 3, 3, 3],    # Одна работа прерывает другую
    ]
    
    for i, s in enumerate(test_cases, 1):
        print(s, has_mutual_job_intersection(s))
# main()

In [58]:
# get_positive_x_overlap_ratio
# Вернуть количество переменных которые положительны в обоих решениях
def get_positive_x_overlap_ratio(xi, xf):
    n_overlap = 0
    n_total_i = 0
    # n_total_f = 0
    for j, _ in enumerate(xi):
        for k, _ in enumerate(xi[j]):
            for t, _ in enumerate(xi[j][k]):
                xi_jkt = round(xi[j][k][t], 10)
                xf_jkt = round(xf[j][k][t], 10)
                # if xf_jkt > 0:
                #     n_total_f += 1
                if xi_jkt > 0:
                    n_total_i += 1
                if xi_jkt > 0 and xf_jkt > 0:
                    n_overlap += 1
    return n_overlap / n_total_i

In [59]:
# _instance_has_mutual_job_intersection, _instance_has_all_intime
# Взаимные прерывания работ и своевременность выполнения работ
# это свойства одного конкретного решения задачи.
# Вернуть обобщенный ответ для всей задачи (если хотя бы одно решение удовлетворяет условию)

def _func2918(row):
    lp_x      = nget(row, ["results", "osp_lp", "x"], None)
    lp_ord    = nget(row, ["results", "osp_lp", "ord"], None)
    blp_x     = nget(row, ["results", "osp_blp", "x"], None)
    lap1_x    = nget(row, ["results", "lap_munkres", "x"], None)
    lap1_ord  = nget(row, ["results", "lap_munkres", "ord"], None)
    lap2_x    = nget(row, ["results", "lap_2d_rect", "x"], None)
    lap2_ord  = nget(row, ["results", "lap_2d_rect", "ord"], None)
    return [
        ("lap_munkres", lap1_x, lap1_ord),
        ("lap_2d_rect", lap2_x, lap2_ord),
        ("osp_lp",      lp_x,   lp_ord),
        ("osp_blp",     blp_x,  1),
    ]

def _instance_has_all_intime(row):
    all_intime = None
    for k, x, ord in _func2918(row):
        if x and ord:
            if type(x[0][0]) != list:
                x = _mat3d(x, row["params"]["p"])
            s = get_schedule_vec(x)
            all_intime = (  all_intime or all(get_processed_intime_jobs(s, row["params"]))  )
    return all_intime

def _instance_has_mutual_job_intersection(row):
    intersec = None
    for k, x, ord in _func2918(row):
        if x and ord:
            if type(x[0][0]) != list:
                x = _mat3d(x, row["params"]["p"])
            s = get_schedule_vec(x)
            intersec = (  intersec or has_mutual_job_intersection(s)  )
    return intersec

In [60]:
# solve_lap_munkres

__global_munk = Munkres()

# Решить задачу о назначениях
# Венгерский алгоритм
# Также известный как Kuhn–Munkres algorithm или Munkres assignment algorithm
# https://en.wikipedia.org/wiki/Hungarian_algorithm
def solve_lap_munkres(_w):
    # _w: list[T,T] = матрица взвешенного времени

    w = _w.copy()
    indexes = __global_munk.compute(w)
    n = len(w)
    m = len(w[0])
    x = [[0] * m for i in range(n)]
    for i,j in indexes:
        x[i][j] = 1
    return x

In [61]:
# solve_lap_scipy

# Решить задачу о назначениях
# Алгоритм из пакета scipy
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html
# https://ieeexplore.ieee.org/document/7738348
# Решение будет оптимальным, но может отличаться от венгерского
def solve_lap_scipy(w):
    # w: list[T,T] = матрица взвешенного времени

    sol_index_x, sol_index_y = linear_sum_assignment(w)
    n, m = len(sol_index_x), len(sol_index_y)
    x = [[0] * m for i in range(n)]
    for k, _ in enumerate(sol_index_x):
        i = sol_index_x[k]
        j = sol_index_y[k]
        x[i][j] = 1
    return x

In [62]:
# create_solver_data_str

# Подготовить и вернуть данные для решателя
def create_solver_data_str(w):
    # w: list[j,k,t] = матрица взвешенного времени

    _inf   = w[0][1][0] # достаточно большое число (выполняет роль бесконечности) # _inf = np.max(w)
    n      = len(w)     # количество работ
    p      = []         # количество операций
    max_p  = 0          # максимальное количество операций из представленных
    T      = 0          # временной горизонт

    for _w in w:
        pj    = len(_w)
        max_p = max(pj, max_p)
        T     += pj
        p.append(pj)

    res = ""
    
    for j, _ in enumerate(w):
        for k, _ in enumerate(w[j]):
            for w_jkt in w[j][k]:
                res += str(int(w_jkt)) + ","
            res += "\n"
        for k in range(  max_p - len(w[j])  ):
            res += ",".join(["D"] * T) + ","
            res += "\n"
        res += "\n"
        
    res = res.strip("\n").strip(",")
    
    data = ""
    data += f"INF = {_inf};\n"
    data += f"N = {n};\n"
    data += f"PROC_TIMES = [{','.join(map(str, p))}];\n\n"
    data += f"ln_weights = array3d(JOB_INDICES, JOB_PART_INDICES, TIME_INTERVAL_INDICES, [\n{res}\n]);\n"
    
    return data


# # TEST
# def main():
#     params = get_test_case_1_params()
#     w = create_cost_matrix(**params)
#     return create_solver_data_str(w)
# # main()

In [63]:
# process_solver_output

# Обработать ответ из решателя
def process_solver_output(x, p):
    # x: list[j,p,t] = ответ из решателя
    # p: list        = количество операций в работах
    
    sol = []
    for j, pj in enumerate(p):
        sol_job = []
        for k in range(pj):
            sol_job.append(x[j][k])
        sol.append(sol_job)
    return sol

In [64]:
# solve_lp

# Решить задачу ЛП
async def solve_lp(
    w,
    model_path,
    data_path=None
):
    # w: list[j,k,t]  = матрица взвешенного времени
    # model_path: str = путь к модели

    data_dzn = create_solver_data_str(w)

    should_delete_data_file = False

    if not data_path:
        should_delete_data_file = True
        data_path = default_minizinc_data_dir + "/" + str(random.random())[3:] + ".dzn"

    async with aiofiles.open(data_path, "w") as fd:
        await fd.write(data_dzn)
    
    cmd = (minizinc_exec_path
    + ' --solver "HiGHS"'
    + f' --model "{model_path}"'
    + f' --data "{data_path}"'
    + ' --output-mode json'
    + ' --solution-separator " "'
    + ' --search-complete-msg " "'
    + ' --solution-comma " "')

    # proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # (out, err) = proc.communicate()
    out, err = await proc.communicate()

    if err:
        print(err)

    p = [len(w_) for w_ in w]
    
    x = json.loads(out)["ln_activators"]
    x = process_solver_output(x, p)

    if should_delete_data_file:
        await aiofiles.os.remove(data_path)

    return x

# # TEST
# async def main():
#     params = get_test_case_1_params()
#     w = create_cost_matrix(**params)
#     res = await solve_lp(
#         w, 
#         model_path="./model_blp.mzn"
#     )
#     print(res)

# # await main()

In [65]:
# solve_osp

# Решить задачу составления оптимального расписания
async def solve_osp_blp(w):
    # w: list[j,k,t] = матрица взвешенного времени

    return await solve_lp(w, "./model_blp.mzn")

In [66]:
# exclude_weight_2d, include_weight_2d, include_weight_r_2d, get_weight_inf_value

# Форсировать исключение коэф. из решения:
# задать достаточно большое число указанному коэф.
def exclude_weight_2d(w, i, j, _inf):
    w[i][j] = _inf
    return w

# Форсировать включение коэф. в решение:
# задать достаточно большое число всем коэф. кроме указанного
def include_weight_2d(*args):
    return include_weight_r_2d(*args)

def include_weight_r_2d(w, i, j, _inf):
    v = w[i][j]
    for jj, _ in enumerate(w[i]):
        w[i][jj] = _inf
    w[i][j] = v
    return w

def get_weight_inf_value(w):
    return w[1][0]

In [67]:
# get_eager_upper_bound

# Вернуть наивную оценку сверху
def get_eager_upper_bound(w):
    _inf = w[0][1][0]
    h = len(w[0][0])
    s = [-1] * h
    ub = 0;
    x = []
    for j, _ in enumerate(w):
        x_job = []
        x.append(x_job)
        for k, _ in enumerate(w[j]):
            x_oper = [0] * h
            x_job.append(x_oper)
            for t, _ in enumerate(w[j][k]):
                if w[j][k][t] < _inf and s[t] == -1:
                    s[t] = j
                    ub += w[j][k][t]
                    x_oper[t] = 1
                    break
    return x, ub

In [68]:
# draw_sst
# Нарисовать ДПОР для АВиГ ЗН
def draw_sst(tree):
    g = nx.Graph()

    edge_labels = {}
    node_color = []

    stack = [tree]
    parent = None

    while stack:
        node = stack.pop()
        node_name = str(node["i"]) + "_" +  str(node["obj"])
        g.add_node(node_name)
        color = "black"
        if node.get("feasible"):
            color = "tab:blue"
        if node.get("lb_bu_geq_ub"):
            color = "darkviolet"
        if node.get("lb_ge_ub"):
            color = "deeppink"
        if node.get("lb_geq_inf"):
            color = "darkred"
        if node.get("out_of_options"):
            color = "gray"
        node_color.append(color)
        
        if not node["nodes"]:
            continue

        left  = node["nodes"][0]
        right = node["nodes"][1]

        # left
        left_name = str(left["i"]) + "_" +  str(left["obj"])
        stack.insert(0, left)
        g.add_edge(node_name, left_name)
        edge_labels[(node_name, left_name)] = "--" + str(left["arc"])

        # right
        right_name = str(right["i"]) + "_" +  str(right["obj"])
        stack.insert(0, right)
        g.add_edge(node_name, right_name)
        edge_labels[(node_name, right_name)] = "+" + str(right["arc"])

    pos = nx.nx_pydot.pydot_layout(g, prog="dot")

    plt.figure(
        num=None,
        figsize=(12, 20),
        # dpi=120
    )

    nx.draw(
        g,
        font_color="white",
        node_color=node_color,
        with_labels=True,
        pos=pos,
        node_size=800,
        font_size=8,
    )
    nx.draw_networkx_edges(g, pos)
    nx.draw_networkx_edge_labels(
        g,
        pos,
        edge_labels=edge_labels,
        font_color='red',
        font_size=8,
    )

In [69]:
# get_lap_tolerances

# Рассчитать верхний допуск ЗН
def get_lap_tolerance(w, x, i, j, params, solver=solve_lap_munkres):
    obj = calc_obj_from_mat(w, x)
    _inf = w[1][0]
    _w = copy.deepcopy(w)
    _w = exclude_weight_2d(_w, i, j, _inf)
    _x = solver(_w)
    _obj = calc_obj_from_mat(_w, _x)
    return _obj - obj, _w, _x, _obj

# Рассчитать верхние допуски ЗН
# Вернуть список от большего к меньшему
def get_lap_tolerances(w, x, inf_def=0, solver=solve_lap_munkres):
    tolerances = []
    obj = calc_obj_from_mat(w, x)
    _inf = w[1][0]
    for i, _ in enumerate(x):
        for j, _ in enumerate(x[i]):
            if x[i][j] > 0:
                _w = copy.deepcopy(w)
                _w = exclude_weight_2d(_w, i, j, _inf)
                _x = solver(_w)
                _obj = calc_obj_from_mat(_w, _x)
                if _obj >= _inf:
                    if inf_def:
                        tolerances.append((
                            (i, j),
                            inf_def,
                        ))
                    continue
                tolerances.append((
                    (i, j),
                    (_obj - obj)
                ))
    return sorted(tolerances, key=_func1172_toler_sort, reverse=True)

def _func1172_toler_sort(a):
    return a[1]

In [70]:
def get_schedule_infeasible_arcs(x, params):
    # x: list[j,k,t] = решение
    
    T = len(x[0])
    prev_t = -1
    j = 0
    k = 0
    for a, _ in enumerate(x):
        f = False
        for t in range(prev_t + 1, T):
            if x[a][t]:
                prev_t = t
                f = True
                break
        if not f:
            for t, _ in enumerate(x[a]):
                if x[a][t]:
                    return [(a, t), (a - 1, prev_t)]
        if params["p"][j] - 1 == k:
            prev_t = -1
            j += 1
            k = 0
        else:
            k += 1
    return None

def main():
    params = {
        "p": [3, 2, 5]
    }
    x = [
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0],

        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        
        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
    ]
    return get_schedule_infeasible_arcs(x, params)
# main()

In [71]:
def solve_osp_lap_bnb_2d_v3_3(
    w,
    ctx=None,
    _parent_node=None,
    _arc=None,
    _is_exclusion=None,
    _ancestors=None,
    _cache_x_lap=None
):
    if _ancestors is None:
        _ancestors = dict()

    _solver = ctx["solver"]
    _print_debug = ctx.get("print_debug", 0)
    _inf = get_weight_inf_value(w)

    _is_inclusion = not _is_exclusion

    if _is_exclusion and _cache_x_lap and ctx.get("reuse_parent_x_for_exclusion", 1):
        x_lap = _cache_x_lap
    elif _is_inclusion and _cache_x_lap and ctx.get("reuse_parent_x_for_inclusion", 1):
        x_lap = _cache_x_lap
    else:
        x_lap = _solver(w)
        ctx["n_lap_solved"] = ctx.get("n_lap_solved", 0) + 1

    ctx["n_tree_nodes"] = ctx.get("n_tree_nodes", 0) + 1

    if ctx["n_tree_nodes"] >= ctx.get("n_tree_nodes_stop_limit", INF):
        raise BaseException("n_tree_nodes_stop_limit")

    obj = calc_obj_from_mat(w, x_lap)

    node = None

    if ctx.get("build_tree", 0):
        node = {
            "i"    : ctx["n_tree_nodes"],
            "arc"  : _arc,
            "obj"  : obj,
            "nodes": [],
            "is_exclusion": _is_exclusion,
        }

    if _parent_node and node:
        _parent_node["nodes"].append(node)

    if _print_debug:
        print()
        print(f"arc:{str(_arc)}; i:{ctx['n_tree_nodes']}; obj:{obj}")
        print_osp_matrix(x_lap)

    if "solution_search_tree" not in ctx:
        ctx["solution_search_tree"] = node

    if obj > ctx["ub"][1]:
        _note = f"obj > ub: {obj} > {ctx['ub'][1]}"
        if node:
            node["note"] = _note
            node["lb_ge_ub"] = 1
        if _print_debug:
            print(_note)
        return ctx["ub"][0]

    if obj >= _inf:
        _note = f"obj >= _inf: {obj} > {_inf}"
        if node:
            node["note"] = _note
            node["lb_geq_inf"] = 1
        if _print_debug:
            print(_note)
        return ctx["ub"][0]

    infeasible_arcs = get_schedule_infeasible_arcs(x_lap, ctx["params"])

    if infeasible_arcs is None: # isValidSchedule
        ctx["ub"] = (x_lap, obj)
        if node:
            node["note"] = f"feasible: {obj}"
            node["feasible"] = 1
        if _print_debug:
            print('feasible:', ctx["ub"][1]);
        return x_lap

    branch_arc = None

    if "lowest_cost_arc" == ctx.get("arc_branch_rule"):
        for arc in infeasible_arcs:
            if (arc in _ancestors):
                continue
            if branch_arc is None:
                branch_arc = arc
                continue
            ci, cj = arc
            mi, mj = branch_arc
            if (w[ci][cj] < w[mi][mj]):
                branch_arc = arc
                
    elif "highest_cost_arc" == ctx.get("arc_branch_rule"):
        for arc in infeasible_arcs:
            if (arc in _ancestors):
                continue
            if branch_arc is None:
                branch_arc = arc
                continue
            ci, cj = arc
            mi, mj = branch_arc
            if (w[ci][cj] > w[mi][mj]):
                branch_arc = arc
                
    elif "first_available_arc" == ctx.get("arc_branch_rule"):
        for arc in infeasible_arcs:
            if (arc in _ancestors):
                continue
            branch_arc = arc
            break
    else:
        raise BaseException("unknown branch_rule " + ctx.get("arc_branch_rule"))
    
    # out of options
    if branch_arc is None:
        _note = (f""
            + f"branch_arc is None (out of options):"
            + f" {str(_arc)};"
            + f" i:{ctx['n_tree_nodes']};"
            + f" infeasible_arcs: {str(infeasible_arcs)};"
        )
        if node:
            node["note"] = _note
            node["out_of_options"] = 1
        if _print_debug:
            print(_note)
        return ctx["ub"][0]

    w1 = x_lap_1 = None

    if ctx.get("tolerance_branch_rule"):
        if "lowest_upper_tolerance" == ctx.get("tolerance_branch_rule"):
            branch_arc = None
            branch_tol = None
            for arc in infeasible_arcs:
                if (arc in _ancestors):
                    continue
                _curr_tol = get_lap_tolerance(w, x_lap, *arc, ctx["params"], solver=_solver)
                ctx["n_lap_solved"] = ctx.get("n_lap_solved", 0) + 1
                if branch_arc is None:
                    branch_arc = arc
                    branch_tol = _curr_tol
                    continue
                if _curr_tol[0] < branch_tol[0]:
                    branch_arc = arc
                    branch_tol = _curr_tol
            tol, w1, x_lap_1, _ = branch_tol

        elif "arc_upper_tolerance" == ctx.get("tolerance_branch_rule"):
            tol, w1, x_lap_1, _ = get_lap_tolerance(w, x_lap, *branch_arc, ctx["params"], solver=_solver)
            ctx["n_lap_solved"] = ctx.get("n_lap_solved", 0) + 1

        else:
            raise BaseException("unknown tolerance_rule" + ctx.get("tolerance_branch_rule"))

        if not (obj + tol < ctx["ub"][1]):
            _note = f"tolerance{str(branch_arc)}: {obj} + {tol} < {ctx['ub'][1]}"
            if node:
                node["lb_bu_geq_ub"] = 1
                node["note"] = _note
            if _print_debug:
                print(_note)
            return ctx["ub"][0]

    w1 = w1 if w1 else exclude_weight_2d(copy.deepcopy(w), *branch_arc, _inf)
    w2 = include_weight_2d(copy.deepcopy(w), *branch_arc, _inf)

    _ancestors = _ancestors.copy()
    _ancestors[branch_arc] = 1

    x_osp_1 = solve_osp_lap_bnb_2d_v3_3(
        w1,
        ctx=ctx,
        _arc=branch_arc,
        _is_exclusion=1,
        _parent_node=node,
        _cache_x_lap=x_lap_1,
        _ancestors=_ancestors
    )

    x_osp_2 = solve_osp_lap_bnb_2d_v3_3(
        w2,
        ctx=ctx,
        _arc=branch_arc,
        _is_exclusion=0, # is_inclusion=1
        _parent_node=node,
        _cache_x_lap=x_lap,
        _ancestors=_ancestors
    )

    if x_osp_1 is None:
        return x_osp_2
    if x_osp_2 is None:
        return x_osp_1

    obj_1 = calc_obj_from_mat(w1, x_osp_1)
    obj_2 = calc_obj_from_mat(w2, x_osp_2)
    
    return x_osp_1 if obj_1 < obj_2 else x_osp_2

In [72]:
async def exec_test_osp_blp(w, params):
    t0 = time.process_time()
    x = await solve_osp_blp(w)
    t1 = time.process_time()
    exec_time = round(t1 - t0, 2)
    obj = calc_obj_from_mat(w, x)
    return {
        "solver": "highs",
        "exec_time": exec_time,
        "obj": obj,
        "x": x,
    }
async def exec_inst_osp_blp(instance):
    params = instance["params"]
    w = create_cost_matrix(**params)
    return exec_test_osp_blp(w, params)

In [73]:
def exec_test_osp_lap_bnb_2d_v33__lc_arc(w, params):
    w2d = _mat2d(w)
    eager_ub = get_eager_upper_bound(w)
    eager_ub_2d = (_mat2d(eager_ub[0]), eager_ub[1])
    ctx = {
        "n_tree_nodes_stop_limit": INF,
        "solver": solve_lap_scipy,
        "params": params,
        "arc_branch_rule": "lowest_cost_arc",
        "tolerance_branch_rule": None,
        "ub": eager_ub_2d,
        "print_debug": 0,
        "build_tree": 0,
    }
    _result = {
        "bnb_solver": "solve_osp_lap_bnb_2d_v3_3",
        "lap_solver": "scipy",
        "n_tree_nodes_stop_limit": ctx["n_tree_nodes_stop_limit"],
    }
    try:
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            _result["obj"] = obj
            _result["n_tree_nodes"] = ctx["n_tree_nodes"]
            _result["n_lap_solved"] = ctx["n_lap_solved"]
            _result["exec_time"] = exec_time
            _result["x"] = _mat3d(x, params["p"])
    except BaseException as e:
        _result["err"] = str(e)
    return _result
def exec_inst_osp_lap_bnb_2d_v33__lc_arc(instance):
    params = instance["params"]
    w = create_cost_matrix(**params)
    return exec_test_osp_lap_bnb_2d_v33__lc_arc(w, params)

In [74]:
def exec_test_osp_lap_bnb_2d_v33__hc_arc(w, params):
    w2d = _mat2d(w)
    eager_ub = get_eager_upper_bound(w)
    eager_ub_2d = (_mat2d(eager_ub[0]), eager_ub[1])
    ctx = {
        "n_tree_nodes_stop_limit": INF,
        "solver": solve_lap_scipy,
        "params": params,
        "arc_branch_rule": "highest_cost_arc",
        "tolerance_branch_rule": None,
        "ub": eager_ub_2d,
        "print_debug": 0,
        "build_tree": 0,
    }
    _result = {
        "bnb_solver": "solve_osp_lap_bnb_2d_v3_3",
        "lap_solver": "scipy",
        "n_tree_nodes_stop_limit": ctx["n_tree_nodes_stop_limit"],
    }
    try:
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            _result["obj"] = obj
            _result["n_tree_nodes"] = ctx["n_tree_nodes"]
            _result["n_lap_solved"] = ctx["n_lap_solved"]
            _result["exec_time"] = exec_time
            _result["x"] = _mat3d(x, params["p"])
        else:
            pass
    except BaseException as e:
        _result["err"] = str(e)
    return _result
def exec_inst_osp_lap_bnb_2d_v33__hc_arc(instance):
    params = instance["params"]
    w = create_cost_matrix(**params)
    return exec_test_osp_lap_bnb_2d_v33__hc_arc(w, params)

In [75]:
def exec_test_osp_lap_bnb_2d_v33__1st_arc(w, params):
    w2d = _mat2d(w)
    eager_ub = get_eager_upper_bound(w)
    eager_ub_2d = (_mat2d(eager_ub[0]), eager_ub[1])
    ctx = {
        "n_tree_nodes_stop_limit": INF,
        "solver": solve_lap_scipy,
        "params": params,
        "arc_branch_rule": "first_available_arc",
        "tolerance_branch_rule": None,
        "ub": eager_ub_2d,
        "print_debug": 0,
        "build_tree": 0,
    }
    _result = {
        "bnb_solver": "solve_osp_lap_bnb_2d_v3_3",
        "lap_solver": "scipy",
        "n_tree_nodes_stop_limit": ctx["n_tree_nodes_stop_limit"],
    }
    try:
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            _result["obj"] = obj
            _result["n_tree_nodes"] = ctx["n_tree_nodes"]
            _result["n_lap_solved"] = ctx["n_lap_solved"]
            _result["exec_time"] = exec_time
            _result["x"] = _mat3d(x, params["p"])
        else:
            pass
    except BaseException as e:
        _result["err"] = str(e)
    return _result
def exec_inst_osp_lap_bnb_2d_v33__1st_arc(instance):
    params = instance["params"]
    w = create_cost_matrix(**params)
    return exec_test_osp_lap_bnb_2d_v33__1st_arc(w, params)

In [76]:
def exec_test_osp_lap_bnb_2d_v33__lc_arc_utol(w, params):
    w2d = _mat2d(w)
    eager_ub = get_eager_upper_bound(w)
    eager_ub_2d = (_mat2d(eager_ub[0]), eager_ub[1])
    ctx = {
        "n_tree_nodes_stop_limit": INF,
        "solver": solve_lap_scipy,
        "params": params,
        "arc_branch_rule": "lowest_cost_arc",
        "tolerance_branch_rule": "arc_upper_tolerance",
        "ub": eager_ub_2d,
        "print_debug": 0,
        "build_tree": 0,
    }
    _result = {
        "bnb_solver": "solve_osp_lap_bnb_2d_v3_3",
        "lap_solver": "scipy",
        "n_tree_nodes_stop_limit": ctx["n_tree_nodes_stop_limit"],
    }
    try:
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            _result["obj"] = obj
            _result["n_tree_nodes"] = ctx["n_tree_nodes"]
            _result["n_lap_solved"] = ctx["n_lap_solved"]
            _result["exec_time"] = exec_time
            _result["x"] = _mat3d(x, params["p"])
        else:
            pass
    except BaseException as e:
        _result["err"] = str(e)
    return _result
def exec_inst_osp_lap_bnb_2d_v33__lc_arc_utol(instance):
    params = instance["params"]
    w = create_cost_matrix(**params)
    return exec_test_osp_lap_bnb_2d_v33__lc_arc_utol(w, params)

In [77]:
def exec_test_osp_lap_bnb_2d_v33__lowest_utol(w, params):
    w2d = _mat2d(w)
    eager_ub = get_eager_upper_bound(w)
    eager_ub_2d = (_mat2d(eager_ub[0]), eager_ub[1])
    ctx = {
        "n_tree_nodes_stop_limit": INF,
        "solver": solve_lap_scipy,
        "params": params,
        "arc_branch_rule": "lowest_cost_arc",
        "tolerance_branch_rule": "lowest_upper_tolerance",
        "ub": eager_ub_2d,
        "print_debug": 0,
        "build_tree": 0,
    }
    _result = {
        "bnb_solver": "solve_osp_lap_bnb_2d_v3_3",
        "lap_solver": "scipy",
        "n_tree_nodes_stop_limit": ctx["n_tree_nodes_stop_limit"],
    }
    try:
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            _result["obj"] = obj
            _result["n_tree_nodes"] = ctx["n_tree_nodes"]
            _result["n_lap_solved"] = ctx["n_lap_solved"]
            _result["exec_time"] = exec_time
            _result["x"] = _mat3d(x, params["p"])
        else:
            pass
    except BaseException as e:
        _result["err"] = str(e)
    return _result
def exec_inst_osp_lap_bnb_2d_v33__lowest_utol(instance):
    params = instance["params"]
    w = create_cost_matrix(**params)
    return exec_test_osp_lap_bnb_2d_v33__lowest_utol(w, params)

In [78]:
# create_problem_instance_id
# Собрать уникальный id для экземпляра теста
def create_problem_instance_id(instance):
    _str = ""
    params = instance["params"]
    _keys = sorted(params.keys(), reverse=False)
    for k in _keys:
        _str += str(k) + "=" + str(params[k]) + ";"
    return hashlib.md5(_str.encode()).hexdigest()

In [79]:
# generate_test_case_params_2
# Иной вариант

def generate_test_case_params_2(n, p):
    # n: int = количество работ
    # p: int = количество операций в работах

    params = {}
    params["p"]  = []
    params["w"]  = []
    params["d1"] = []
    params["d2"] = []
    params["s1"] = []
    params["s2"] = []

    params["r"]  = list(range(1, n + 1))
    random.shuffle(params["r"])
    
    for j in range(n):
        # r = random.randint(1, n * p - p);
        r = params["r"][j]
        w = random.randint(1, 30)
        d1 = random.randint(r + p, n * p)
        d2 = random.randint(d1, n * p)
        s1 = random.randint(r, d1)
        s2 = random.randint(s1, d1)
        params["p"].append(p)
        # params["r"].append(r)
        params["w"].append(w)
        params["d1"].append(d1)
        params["d2"].append(d2)
        params["s1"].append(s1)
        params["s2"].append(s2)

    return params

In [None]:
# Ручной тест одной задачи. Вывести решение, нарисовать дерево

def main():
    # params = {"p": [5, 5, 5], "w": [18, 4, 26], "d1": [12, 13, 9], "d2": [14, 15, 14], "s1": [12, 8, 1], "s2": [12, 11, 7], "r": [3, 2, 1]}
    params = generate_test_case_params_2(3, 3)

    w = create_cost_matrix(**params)

    print("w =")
    print_osp_matrix(w, jsep=" ")
    
    w2d = _mat2d(w)

    try:
        ctx = {
            "n_tree_nodes_stop_limit": INF,
            "solver": solve_lap_scipy,
            "params": params,
            "arc_branch_rule": "lowest_cost_arc",
            # "tolerance_branch_rule": None,
            "tolerance_branch_rule": "arc_upper_tolerance",
            "ub": (None, INF),
            "print_debug": 0,
            "build_tree": 1,
        }
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            x3d = _mat3d(x, params["p"])
            print("С допусками")
            print("id =", create_problem_instance_id({ "params": params }))
            print("obj = ", obj)
            print("n_tree_nodes =", ctx["n_tree_nodes"])
            print("n_lap_solved =", ctx["n_lap_solved"])
            print("exec_time =", exec_time)
            print("x =")
            print_osp_matrix(x3d, jsep=" ")
            print("w =")
            print_osp_matrix(w, jsep=" ")
            draw_sst(ctx["solution_search_tree"])
    except BaseException as e:
        print(e)

    try:
        ctx = {
            "n_tree_nodes_stop_limit": INF,
            "solver": solve_lap_scipy,
            "params": params,
            "arc_branch_rule": "lowest_cost_arc",
            "tolerance_branch_rule": None,
            "ub": (None, INF),
            "print_debug": 0,
            "build_tree": 1,
        }
        t0 = time.process_time()
        x = solve_osp_lap_bnb_2d_v3_3(w2d, ctx=ctx)
        t1 = time.process_time()
        exec_time = round(t1 - t0, 2)
        if x:
            obj = calc_obj_from_mat(w2d, x)
            x3d = _mat3d(x, params["p"])
            print("Без допусков")
            print("id =", create_problem_instance_id({ "params": params }))
            print("obj = ", obj)
            print("n_tree_nodes =", ctx["n_tree_nodes"])
            print("n_lap_solved =", ctx["n_lap_solved"])
            print("exec_time =", exec_time)
            print("x =")
            print_osp_matrix(x3d, jsep=" ")
            draw_sst(ctx["solution_search_tree"])
    except BaseException as e:
        print(e)
main()

In [81]:
# Ручной тест серии задач

_stats_table = []

async def main():
    n, p = 4, 4
    epochs = 100
    
    for _ in tqdm(range(epochs)):
        pbar = tqdm(total=3, leave=False)

        _stats_row = {}
        _stats_table.append(_stats_row)
        
        params = generate_test_case_params_2(n, p)
        w = create_cost_matrix(**params)

        _stats_row["params"] = params
        
        # _stats_row["osp_blp"]                 = await exec_test_osp_blp(w, params);                   pbar.update(1);
        _stats_row["osp_lap_bnb_1st_arc"]     = exec_test_osp_lap_bnb_2d_v33__1st_arc(w, params);     pbar.update(1);
        _stats_row["osp_lap_bnb_hc_arc"]      = exec_test_osp_lap_bnb_2d_v33__hc_arc(w, params);      pbar.update(1);
        _stats_row["osp_lap_bnb_lc_arc"]      = exec_test_osp_lap_bnb_2d_v33__lc_arc(w, params);      pbar.update(1);
        _stats_row["osp_lap_bnb_lc_arc_utol"] = exec_test_osp_lap_bnb_2d_v33__lc_arc_utol(w, params); pbar.update(1);
        _stats_row["osp_lap_bnb_lo_utol"]     = exec_test_osp_lap_bnb_2d_v33__lowest_utol(w, params); pbar.update(1);

        pbar.clear()
        pbar.close()
# await main()

In [82]:
# Статистика из ручного серии задач

def main():
    _table = []
    _test_types = list(_stats_table[0].keys())
    _test_props = ["exec_time", "obj", "n_tree_nodes", "n_lap_solved"]
    _test_types.remove("params")
    for statrow in _stats_table:
        _row = {}
        _table.append(_row)
        _row["p"] = statrow["params"]["p"][0]
        _obj = nget(statrow, ["osp_blp", "obj"]) or nget(statrow, ["osp_lap_bnb_lc_arc", "obj"])
        for test_type in _test_types:
            _row[f"{test_type}.eq"] = (statrow[test_type]["obj"] == _obj)
            for prop in _test_props:
                key = f"{test_type}.{prop}"
                val = nget(statrow, [test_type, prop], -1)
                _row[key] = val
    
    df = pd.DataFrame(_table)

    df = df[
        True
        & (df["osp_lap_bnb_lc_arc_utol.n_tree_nodes"] > 1)
        & (df["osp_lap_bnb_lc_arc_utol.n_lap_solved"] > 1)
        & (df["osp_lap_bnb_lc_arc_utol.eq"] == 1)
    ]

    qcol = "osp_lap_bnb_lc_arc.n_lap_solved"
    q0 = df[qcol].quantile(0.05)
    q1 = df[qcol].quantile(0.95)
    df = df[(q0 <= df[qcol]) & (df[qcol] <= q1)]

    print(df.shape)
    
    df_mean = df.mean()

    for test_type in _test_types:
        key = f"{test_type}.n_lap_solved"
        df_mean[f"{test_type}.min_n_lap_solved"] = df[key].min()
        df_mean[f"{test_type}.max_n_lap_solved"] = df[key].max()

    display(df_mean)
# main()

In [83]:
# write_instance_into_disk

# Записать тест и его результаты на диск
async def write_instance_into_disk(
    instance,
    data_path=None
):
    problem_instances_path = data_path or default_problem_instances_path
    
    params  = instance["params"]
    n       = len(params["p"])
    p       = params["p"][0]
    npkey   = f"n{n}p{p}"

    _dirpath  = problem_instances_path + "/json/" + npkey
    _filepath = _dirpath + "/" + instance["id"] + ".json"

    os.makedirs(_dirpath, exist_ok=True)

    async with aiofiles.open(_filepath, "w") as fd:
        await fd.write(json.dumps(instance))

In [84]:
# BaseProblemInstanceLoader
# Загрузчик датасета с диска
class BaseProblemInstanceLoader:
    def __init__(self, path):
        self._path = path
        self._items = None
        self._curr = None
        self._dirs = os.listdir(self._path)
        self._dirs = list(
            filter(
                lambda x: x[0] != "_",
                self._dirs,
            )
        )

    def __iter__(self):
        return self
        
    def __next__(self):
        if (not self._dirs) and (not self._items):
            raise StopIteration

        if not self._items:
            _dirname    = self._dirs.pop(0)
            self._curr  = os.path.join(self._path, _dirname)
            self._items = os.listdir(self._curr)

        _filename = self._items.pop(0)
        _filepath = os.path.join(self._curr, _filename) 

        fd = open(_filepath, "r", encoding="utf-8")
        data = json.load(fd)
        fd.close()

        return data

class PILStd(BaseProblemInstanceLoader):
    def __next__(self):
        instance  = super().__next__()
        # instance["results"]["all_intime"] = _instance_has_all_intime(instance)
        # instance["results"]["intersec"]   = _instance_has_mutual_job_intersection(instance)

        return instance

# Вариант без решений -- меньше памяти
class PILVerbose0(PILStd):
    def __next__(self):
        instance = super().__next__()
        for test_name in instance["results"]:
            if "x" in instance["results"][test_name]:
                instance["results"][test_name]["x"] = None
        return instance

In [85]:
# Собрать тестовые задачи. Записать их на диск

async def main():
    _generated_ids = dict()

    async def is_uniq(inst, data_path=None):
        nonlocal _generated_ids
        
        if _generated_ids.get(inst["id"], None):
            return False
        
        params  = inst["params"]
        n       = len(params["p"])
        p       = params["p"][0]
        npkey   = f"n{n}p{p}"
        
        problem_instances_path = data_path or default_problem_instances_path
        
        _dirpath  = problem_instances_path + "/json/" + npkey
        _filepath = _dirpath + "/" + inst["id"] + ".json"
        
        _is_exists = await aiofiles.os.path.exists(_filepath)

        return (not _is_exists)

    _cases = [
        {"n": 3, "p": 2, "e": 1000},
        {"n": 3, "p": 3, "e": 1000},
        {"n": 3, "p": 4, "e": 1000},
        {"n": 3, "p": 5, "e": 1000},
        
        {"n": 4, "p": 2, "e": 1000},
        {"n": 4, "p": 3, "e": 1000},
        {"n": 4, "p": 4, "e": 1000},
        {"n": 4, "p": 5, "e": 100},
        
        {"n": 5, "p": 2, "e": 1000},
        {"n": 5, "p": 3, "e": 1000},
        {"n": 5, "p": 4, "e": 100},
    ]

    for _case in tqdm(_cases):
        n = _case["n"]
        p = _case["p"]
        e = _case["e"]

        for instance_i in tqdm(range(e), leave=False):
            # Повторять пока не соберется решаемый пример
            _safe_iterator = range(10)
            
            for _ in _safe_iterator: 
                params = generate_test_case_params_2(n, p)
                pid    = create_problem_instance_id({ "params": params })

                _instance = {
                    "id": pid,
                    "params": params,
                    "results": {},
                }

                # Неуникальный экземпляр задачи -- пропустить
                if (await is_uniq(_instance)) == False:
                    continue

                _generated_ids[pid] = 1

                await write_instance_into_disk(_instance)

                break
            else:
                print("!safe_counter")
                print(params)
                return

            # end _safe_iterator
        # end instance_i

# await main()

In [86]:
# Решить тестовые задачи. Записать результаты на диск

async def main():
    pil = PILStd(default_problem_instances_path + "/json")

    for instance in tqdm(pil):
        params = instance["params"]
        n = len(params["p"])
        p = params["p"][0]

        w = create_cost_matrix(**params)

        tests = {
            "osp_blp"                : { "test_func": exec_test_osp_blp },
            "osp_lap_bnb_1st_arc"    : { "test_func": exec_test_osp_lap_bnb_2d_v33__1st_arc },
            "osp_lap_bnb_lc_arc"     : { "test_func": exec_test_osp_lap_bnb_2d_v33__lc_arc },
            "osp_lap_bnb_hc_arc"     : { "test_func": exec_test_osp_lap_bnb_2d_v33__hc_arc },
            "osp_lap_bnb_lc_arc_utol": { "test_func": exec_test_osp_lap_bnb_2d_v33__lc_arc_utol },
            "osp_lap_bnb_lo_utol"    : { "test_func": exec_test_osp_lap_bnb_2d_v33__lowest_utol },
        }

        _total = 0
        for a in tests:
            if a not in instance["results"]:
                _total += 1

        pbar = tqdm(total=_total, leave=False)

        for a in tests:
            if a not in instance["results"]:
                func = tests[a]["test_func"]
                if inspect.iscoroutinefunction(func):
                    res = await func(w, params)
                else:
                    res = func(w, params)
                instance["results"][a] = res
                pbar.update(1)

        await write_instance_into_disk(instance)
        
        pbar.clear()
        pbar.close()
# await main()