Формат файла с тестами:

```smallmap.test
количество_вершин количество_рёбер 
вершина_старт вершина_конец стоимость_1 стоимость_2
вершина_1 вершина_2 стоисоть_1 стоимость_2
.
.
.
вершина_1 вершина_2 стоимость_1 стоимость_2```

In [53]:
from collections import defaultdict, namedtuple
from abc import ABC, abstractmethod
import random

In [54]:
class Node:
    def __init__(self, id) -> None:
        self.id = id
        self.connections = defaultdict(list)

    def get_edge_costs(self, other_node: 'Node'):
        return self.connections[other_node]
    
    def have_connection(self, other_node: 'Node'):
        return len(self.connections[other_node]) != 0
    
    def __str__(self) -> str:
        return f"Node: id={self.id}"

In [55]:
class NodesGroup:
    def __init__(self) -> None:
        self.nodes = {} # Id вершины: объект вершины

    def add_node(self, node: Node, replace=False):
        if node.id in self.nodes and replace is False:
            return False
        self.nodes[node.id] = node

    def add_edge(self, first_node: 'Node', second_node: 'Node', *costs):
        if second_node.id not in self.nodes:
            raise ValueError(f"В NodesGroup нет одной из вершин: {first_node} {second_node}")
        self.nodes[first_node.id].connections[second_node.id] = costs

    def add_nodes(self, *nodes, replace=False):
         for node in nodes:
              if type(node) != Node:
                   raise TypeError(f"Объект{node}: типа {type(node)} нельзя добавить в NodeGroup")
              if node.id in self.nodes and replace is False:
                   continue
              self.nodes[node.id] = node

    def delete_node(self, node: Node):
            self.nodes.pop(node.id)

    def __getitem__(self, key: int):
        if type(key) != int:
            raise TypeError("Обращаться к NodeGroup можно только по id ноды (тип int)")
        if key not in self.nodes:
            raise KeyError(f"В NodeGroup нет ноды с id:{key}")
        return self.nodes[key]
    
    def __iter__(self):
        return iter(self.nodes.values())
    
    def __len__(self):
         return len(self.nodes)
    
    def extend(self, other_nodes_group: 'NodesGroup'):
         for node_id in other_nodes_group.nodes:
              if node_id not in self.nodes:
                   self.nodes[node_id] = other_nodes_group[node_id]

    def get_nodes(self):
        return list(self.nodes.values())


In [56]:
n1 = Node(1)
n2 = Node(2)

ngp1 = NodesGroup()
ngp1.add_nodes(n1, n2)
ngp1.add_edge(n1, n2, 13, 17, 1)

print(ngp1.nodes.values())
for n in ngp1:
    print(n)

len(ngp1)
# for node in ngp1:
#     for edge_and_cost in node.connections.items():
#         print(f"{node.id} {edge_and_cost[0]} {' '.join(map(str, edge_and_cost[1]))}")

dict_values([<__main__.Node object at 0x7f382e1312d0>, <__main__.Node object at 0x7f382e131010>])
Node: id=1
Node: id=2


2

In [57]:
class TestBuilder(ABC):
    @abstractmethod
    def build_map(self):
        pass

    @abstractmethod
    def build_test(self):
        pass

In [58]:
class TestBuildDirector:
    def __init__(self, builder: TestBuilder) -> None:
        self.builder = builder
    
    def construct_test(self):
        self.builder.build_map()
        self.builder.build_solution()
        self.builder.build_test()

    def change_builder(self, new_builder: TestBuilder) -> None:
        self.builder = new_builder

In [59]:
class SimpleTestBuilder(TestBuilder):
    def __init__(self, output_file_name) -> None:
        self.nodes_group = NodesGroup()
        self.output_file_name = output_file_name
        self.test_full_text = ""
        self.solutuion_info = ""
    
    def build_map(self):
        pass

    def build_solution(self):
        pass

    def build_test(self):
        edges_formated = []
        for node in self.nodes_group:
            for edge_and_cost in node.connections.items():
                edges_formated.append(f"{node.id} {edge_and_cost[0]} {' '.join(map(str, edge_and_cost[1]))}")
        test_info = f"{len(self.nodes_group)} {len(edges_formated)}"

        test_nodes_and_edges_info = "\n".join([test_info, *edges_formated])

        self.test_full_text = "\n".join([self.solutuion_info, test_nodes_and_edges_info])
        return self.test_full_text

    def get_test(self):
        if not self.test_full_text:
            return ValueError("Тест ещё не был создан") 
        return self.test_full_text

# Брутфорс алгоритм нахождения Bi-objective решения

In [60]:
from collections import defaultdict
import random

class Graph:
    def __init__(self):
        self.adjacency_list = defaultdict(dict)
        self.vertices = set()

    def add_edge(self, vertex1, vertex2, cost1, cost2):
        self.adjacency_list[vertex1][vertex2] = (cost1, cost2)
        self.vertices.update([vertex1, vertex2])

    def read_from_file(self, file_path):
        self.adjacency_list = defaultdict(dict)
        self.vertices = set()
        try:
            with open(file_path, 'r') as file:
                for line in file:
                    vertex1, vertex2, cost1, cost2 = map(int, line.split())
                    self.add_edge(vertex1, vertex2, cost1, cost2)
        except FileNotFoundError:
            print(f"File {file_path} not found.")

    def print_graph(self):
        print("  V1 -> V2  :  C1, C2")
        for vertex, edges in self.adjacency_list.items():
            for neighbor, costs in edges.items():
                print(f"{vertex:>4} -> {neighbor:<4}: {costs[0]:>3}, {costs[1]}")

    def get_neighbors(self, state):
        if state in self.adjacency_list:
            return list(self.adjacency_list[state].items())
        return []
    
from abc import ABC, abstractmethod
import matplotlib.pyplot as plt

class Solution(ABC):
    @abstractmethod
    def __init__(self, solution_values):
        pass

    @abstractmethod
    def dominates(self, other):
        pass

class BiObjSolution(Solution):
    def __init__(self, solution_values, path):
        self.solution_values = tuple(solution_values)
        self.g1 = solution_values[0]
        self.g2 = solution_values[1]
        self.path = path

    def dominates(self, other: 'BiObjSolution'):
        return (self.g1 < other.g1 and self.g2 <= other.g2) or (self.g1 <= other.g1 and self.g2 < other.g2)
    
    def is_dominated_by(self, other: 'BiObjSolution'):
        return not self.dominates(other)

    def __str__(self):
        return f"({self.g1}, {self.g2})"

    def __hash__(self) -> int:
        return hash((self.g1, self.g2))

    def __repr__(self) -> str:
        return f"BiObjSolution([{self.g1}, {self.g2}])"

    def __eq__(self, other: 'BiObjSolution') -> bool:
        return isinstance(other, BiObjSolution) and self.g1 == other.g1 and self.g2 == other.g2


class ParetoSet:
    def __init__(self, SolutionClass=BiObjSolution):
        self.solutions = set()
        self.SolutionClass = SolutionClass

        #Для визуализации
        self.max_y = 0
        self.max_x = 0
        self.all_solusions_ever = set()


    def add_solution(self, solution_values, path):
        solution = self.SolutionClass(solution_values, path)
        non_dominated = self._is_non_dominated(solution)

        #Для визуализации
        self.all_solusions_ever.add(solution)
        self.max_x= max(self.max_x, solution.solution_values[0])
        self.max_y = max(self.max_y, solution.solution_values[1])

        if non_dominated:
            
            self.solutions = {s for s in self.solutions if not solution.dominates(s)}
            self.solutions.add(solution)
            return True
        
        return False

    def remove_solution(self, solution_values):
        solution = self.SolutionClass(solution_values)
        self.solutions.remove(solution)

    def _is_non_dominated(self, solution):
        return not any(s.dominates(solution) for s in self.solutions)

    def is_non_dominated(self, solution_values):
        solution = self.SolutionClass(solution_values)
        return not any(s.dominates(solution) for s in self.solutions)

    def get_solutions(self,values=True):
        if values:
            return {s.solution_values for s in self.solutions}
        return self.solutions

    def solutions_dominated_by(self, solution_values):
        solution = self.SolutionClass(solution_values)
        return [s for s in self.solutions if solution.dominates(s)]

    def _solutions_dominated_by(self, solution):
        return [s for s in self.solutions if solution.dominates(s)]

    def remove_worse(self, better_solution_values):
        better_solution = self.SolutionClass(better_solution_values)
        self.solutions.difference_update(self._solutions_dominated_by(better_solution))

    def visualize(self, color='blue', marker='o'):
        if not self.solutions:
            print("Empty Pareto set. Nothing to visualize.")
            return
        x_values, y_values = zip(*[(sol.g1, sol.g2) for sol in self.solutions])
        plt.scatter(x_values, y_values, label='Pareto Set', color=color, marker=marker, s=100, edgecolors='black')
        plt.xlabel('Objective 1')
        plt.ylabel('Objective 2')
        plt.title('Pareto Set Visualization')
        plt.grid(True, linestyle='--', which='both', alpha=0.7)
        plt.show()

    def __str__(self) -> str:
        return f"({', '.join(str(sol) for sol in self.solutions)})"

    def __contains__(self, solution_values):
        solution = self.SolutionClass(solution_values)
        return solution in self.solutions
    
def calculate_path_cost(graph, path):
    cost = [0, 0]
    for i in range(0, len(path)-1):
        c1, c2 = graph[path[i]][path[i+1]]
        cost[0] += c1
        cost[1] += c2
    return cost

In [61]:
def find_all_paths(graph, start, end):
    pareto_set_solution = ParetoSet() 
    stack = [(start, [start], {start})]
    paths = []
    true_paths = []

    while stack:
        current, path, visited = stack.pop()

        if current == end:
            paths.append(path)
            cost = [0, 0]
            for i in range(0, len(path)-1):
                c1, c2 = graph[path[i]][path[i+1]]
                cost[0] += c1
                cost[1] += c2
            if pareto_set_solution.add_solution(cost, path):
                true_paths.append(path)
            continue

        if graph[current]: # т.к graph - defaultdict
            for neighbor in graph[current]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    stack.append((neighbor, path + [neighbor], visited))

    return paths, true_paths, pareto_set_solution

In [62]:
class RandomTestsBuilder(SimpleTestBuilder):
    def __init__(self, output_file_name, nodes_count, edges_per_node_count) -> None:
        super().__init__(output_file_name)
        self.nodes_count = nodes_count
        self.edges_per_node_count = edges_per_node_count
        if edges_per_node_count > nodes_count:
            raise ValueError("Количество ребёр одной вершины не может превышать общее число вершин")
    
    def build_map(self):
        for node_id in range(self.nodes_count):
            self.nodes_group.add_node(Node(node_id))
        
        for cur_node in self.nodes_group:
            connections_count = random.randint(0, self.edges_per_node_count)
            choosed_nodes = random.sample(self.nodes_group.get_nodes(), connections_count)
            for neighbour_node in choosed_nodes:
                cost1 = round(random.random(), 5)
                cost2 = round(random.random(), 5)

                cost1 = random.randint(1, 100)
                cost2 = random.randint(1, 100)
                self.nodes_group.add_edge(cur_node, neighbour_node, cost1, cost2)

    def build_solution(self):
        graph = Graph()
        for node in self.nodes_group:
            for neighbour_node in node.connections:
                cost1, cost2 = node.connections[neighbour_node]
                graph.add_edge(node.id, neighbour_node, cost1, cost2)

        counter = 0
        while True:
            start, stop = random.sample(list(self.nodes_group.nodes.keys()), 2)
            paths, true_paths, pareto_set_solution = find_all_paths(graph.adjacency_list, start, stop)
            if len(true_paths) != 0 or counter >= 15:
                break
            counter += 1
            
        self.solutuion_info = f"{start} {stop} {len(pareto_set_solution.solutions)}"
        for sol in pareto_set_solution.solutions:
            self.solutuion_info += f"\n{' '.join(map(str, sol.solution_values))} {' '.join(map(str, sol.path))}"

    def build_test(self):
        return super().build_test()
    
    def reset(self):
        self.nodes_group = NodesGroup()
        self.test_full_text = ""
        self.solutuion_info = ""

In [63]:
# rtb = RandomTestsBuilder("out.txt", 1024*10, 1024)

# tbd = TestBuildDirector(rtb)
# tbd.construct_test()

# for i in range(6):
#     test_file_name = f"../tests/random_{i}.txt"
#     rtb.reset()
#     tbd.change_builder(rtb)
#     tbd.construct_test()
#     with open(test_file_name, 'w') as f:
#         f.write(rtb.get_test())

In [64]:


from heapq import heapify, heappop, heappush
from typing import Optional

class Solution(ABC):
    """
    Abstract base class for solutions in multi-objective optimization.

    Parameters:
    - solution_values (list): List of objective values.

    Returns:
    - None
    """
    @abstractmethod
    def __init__(self, solution_values):
        pass

    @abstractmethod
    def is_dominates(self, other):
        """
        Check if this solution dominates another.

        Parameters:
        - other (BiObjSolution): Another bi-objective solution.

        Returns:
        - bool: True if this solution dominates the other, False otherwise.
        """
        pass


class BiObjSolution(Solution):
    def __init__(self, solution_values):
        self.solution_values = solution_values
        self.g1 = solution_values[0]
        self.g2 = solution_values[1]

    def is_dominated_by(self, other: 'BiObjSolution'):
        return not self.is_dominates(other)
    
    def is_dominates(self, other: 'BiObjSolution'):
        return (self.g1 < other.g1 and self.g2 <= other.g2) or (self.g1 <= other.g1 and self.g2 < other.g2)
    
    def __str__(self):
        return f"({self.g1}, {self.g2})"
    
    def __hash__(self) -> int:
        return hash((self.g1, self.g2))
    
    def __repr__(self) -> str:
        return f"BiObjSolution([{self.g1}, {self.g2}])"
    
    def __eq__(self, another_solution) -> bool:
        return self.g1 == another_solution.g1 and self.g2 == another_solution.g2
    



class ParetoSet:
    def __init__(self, SolutionClass=BiObjSolution):
        self.history = []
        self.solutions = set()
        self.SolutionClass = SolutionClass

        #Для визуализации
        self.max_y = 0
        self.max_x = 0
        self.all_solusions_ever = set()

    def add_solution(self, solution_values):
        solution = self.SolutionClass(solution_values)
        #Для визуализации
        self.all_solusions_ever.add(solution)

        # Проверяем, является ли новое решение недоминируемым
        self.max_x= max(self.max_x, solution.solution_values[0])
        self.max_y = max(self.max_y, solution.solution_values[1])
        non_dominated = self._is_non_dominated(solution)

        if non_dominated:
            
            # Удаляем решения, которые становятся доминированными
            self.history.append(self.solutions)
            self.solutions = set([s for s in self.solutions if not solution.is_dominates(s)])
            self.solutions.add(solution)
            return

    def del_solution(self, solution_values):
        solution = self.SolutionClass(solution_values)
        self.solutions.remove(solution)

    def _is_non_dominated(self, solution):
        """
        Проверяет, является ли solution недоминируемым по отношению к текущим решениям в множестве
        """
        for s in self.solutions:
            if s.is_dominates(solution):
                return False
        return True

    def is_non_dominated(self, solution_values):
        """
        Проверяет, является ли solution недоминируемым по отношению к текущим решениям в множестве
        """
        solution = self.SolutionClass(solution_values)
        for s in self.solutions:
            if s.is_dominates(solution):
                return False
        return True

    def get_solutions(self):
        return self.solutions
    
    def all_solutions_dominated_by(self, solution_values):
        solution = self.SolutionClass(solution_values)
        return [s for s in self.solutions if solution.is_dominates(s)]
    
    def _all_solutions_dominated_by(self, solution):
        return [s for s in self.solutions if solution.is_dominates(s)]
    
    def remove_worse(self, better_solution_values):
        """
        Удаляет из парето-множества все решения, которые доминируются better_solution
        """
        better_solution = self.SolutionClass(better_solution_values)
        for s in self._all_solutions_dominated_by(better_solution):
            self.solutions.remove(s)
    
    def visualize(self):
        solutions = self.get_solutions()
        if not solutions:
            print("Пустое Парето-множество. Нечего визуализировать.")
            return
        # Разделяем координаты решений
        x_values, y_values = [], []
        for solution in self.solutions:
            x_values.append(solution.g1)
            y_values.append(solution.g2)
            
        # Визуализация точек в Парето-множестве
        plt.scatter(x_values, y_values, label='Pareto Set', color='blue', marker='o', s=100, edgecolors='black')
        # Настройка графика
        plt.xlabel('Критерий 1')
        plt.ylabel('Критерий 2')
        plt.title('Визуализация Парето-множества')
        # Включаем грид только для местоположений точек
        plt.grid(True, linestyle='--', which='both', alpha=0.7)
        # Показываем график
        plt.show()

    def __str__(self) -> str:
        return f"({', '.join((str(sol) for sol in self.solutions))})"
    
    def __contains__(self, solution_values):
        solution = self.SolutionClass(solution_values)
        return solution in self.solutions





class Node:
    def __init__(self, state, g=[0, 0], h=[0, 0], parent=None, s=None):
        self.state = state
        self.g = tuple(g)
        self.h = tuple(h)
        self.f = tuple(x + y for x, y in zip(g, h))
        self.parent = parent
        self.s = s
        self.g2 = self.g[1]
        
    def __eq__(self, other: 'Node'):
        # Т.к у нас multi-objective вариант, то у одного и того-же стейта могут быть разные g значения. Поэтому нужно проверять и по ним
        if self.state == other.state and \
           self.g == other.g and \
           self.h == other.h:
            return True
        return False

    def __hash__(self) -> int:
        return hash((self.state, self.g, self.h))

    def __lt__(self, other: 'Node'):
        # Это проверка именно на f значения, не на доминантность!
        return self.f < other.f
    
    def is_dominates(self, other: 'Node'):
        return (self.f < other.f and self.f <= other.f) or (self.f <= other.f and self.f < other.f)
    
    def __str__(self) -> str:
        return f"{self.state, self.g}"


class SearchTreePQD:

    def __init__(self):
        self._open = []
        self._closed = {}
        self._enc_open_dublicates = 0

    def __len__(self) -> int:
        return len(self._open) + len(self._closed)

    def open_is_empty(self) -> bool:
        return not self._open

    def add_to_open(self, item: Node):
        heappush(
            self._open, item
        )

    def get_best_node_from_open(self) -> Optional[Node]:
        while self._open:
            best_node = heappop(self._open)
            if not self.was_expanded(best_node):
                return best_node
        return None
    
    def remove_worse_nodes(self, better_node: 'Node'):
        """
        Убирает из дерева поиска все ноды, которые доминируются other (по значению f)
        """
        self._open = [node for node in self._open if not better_node.is_dominates(node)] 
        heapify(self._open)

    def remove_worse_opened(self, state, better_f):
        """
        Очищает opened от всех state, у которых f-значение доминируется f-значением у better_node
        """
        better_node = Node(state, better_f)
        new_open = []
        for node in self._open:
            if node.state != better_node.state:
                new_open.append(node)
                continue
            else:
                if not not better_node.is_dominates(node):
                    new_open.append(node)
        self._open =  new_open
        heapify(self._open)
        
    def add_to_closed(self, item: Node):
        self._closed[item] = item

    def was_expanded(self, item: Node) -> bool:
        return item in self._closed

    @property
    def opened(self):
        return self._open

    @property
    def expanded(self):
        return self._closed.values()

    @property
    def number_of_open_dublicates(self):
        return self._enc_open_dublicates
    

def construct_path(node: Node):
    path = [node]
    path_ids = [node.state]
    prev_node = node.parent
    while prev_node is not None:
        path.append(prev_node)
        path_ids.append(prev_node.state)
        prev_node = prev_node.parent
    return (path, path_ids)

def BDijkstra(search_graph: Graph,  start_state, search_tree: SearchTreePQD,):
    sols = defaultdict(ParetoSet)
    g2_min = defaultdict(lambda: float('inf'))
    start_node = Node(state=start_state, g=(0, 0), parent=None, s=start_state)
    discarded_paths = defaultdict(list)
    good_paths = defaultdict(list)
    search_tree = SearchTreePQD()
    search_tree.add_to_open(start_node)

    while not search_tree.open_is_empty():
        x = search_tree.get_best_node_from_open()
        if x.g2 >= g2_min[x.state]:
            discarded_paths[x.state].append((construct_path(x), x.g))
            continue
        
        g2_min[x.s] = x.g2
        sols[x.state].add_solution(x.g)
        good_paths[x.state].append(construct_path(x))

        for state, costs in search_graph.get_neighbors(x.state):
            # [vertex_id, (cost1, cost2)]
            neighbour_g = tuple(x + y for x, y in zip(x.g, costs))
            y = Node(state, g=neighbour_g, parent=x)
            if y.g2 >= g2_min[state]:
                discarded_paths[state].append((construct_path(y), (y.g, y.g2)))
                continue
            search_tree.add_to_open(y)

    return sols, good_paths, discarded_paths



In [65]:
for i in range(1):
    test_file = f"../tests/random_{i}.txt"
    with open(test_file, "r") as f:
        start, stop, sols_count = map(int, f.readline().split())
        sols = []
        for _ in range(sols_count):
            cost1, cost2, *path = map(int, f.readline().split())
            sols.append(((cost1, cost2), path))
        
        nodes_count, edges_count = map(int, f.readline().split())
        graph = Graph()
        for _ in range(edges_count):
            n1, n2, c1, c2 = map(int, f.readline().split())
            graph.add_edge(n1, n2, c1, c2)
        b_sols, good_paths, discarded_paths = BDijkstra(graph, start, search_tree=SearchTreePQD)
        print(b_sols[stop])
        print(sols)
