In [15]:
import random
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML, display

# Імпорти для CNN та обробки даних
import tensorflow as tf
import numpy as np
import os
import time

# Переконайтеся, що бекенд встановлений для коректного відображення в Jupyter
%matplotlib inline

In [16]:
class RoadGraph:
    """Генератор сіткового графа доріг з атрибутами обмеження швидкості."""

    def __init__(self, size: int = 25, remove_edges: int = 5):
        self.size = size
        self.grid_size = int(size ** 0.5)

        if self.grid_size ** 2 != size:
            raise ValueError("Кількість вершин повинна утворювати квадратну сітку")

        self.remove_edges = remove_edges
        self.graph = nx.Graph()

        self.generate_graph()
        self._remove_edges()

    def generate_graph(self):
        # Вузли 0, 1, 2, ...
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                node = i * self.grid_size + j
                # pos=(j, -i) для правильного відображення сітки
                self.graph.add_node(node, pos=(j, -i))

                # Додавання горизонтальних ребер
                if j < self.grid_size - 1:
                    speed_limit = random.randint(2, 9)
                    self.graph.add_edge(node, node + 1, speed_limit=speed_limit)

                # Додавання вертикальних ребер
                if i < self.grid_size - 1:
                    speed_limit = random.randint(2, 9)
                    self.graph.add_edge(node, node + self.grid_size,
                                        speed_limit=speed_limit)

    def _remove_edges(self):
        """Видаляє випадкові ребра, уникаючи видалення мостів."""
        removed = 0
        while removed < self.remove_edges:
            # Логіка запобігання видаленню мостів
            bridges = set(nx.bridges(self.graph))
            candidates = [i for i in self.graph.edges() if i not in bridges]

            if not candidates:
                break

            edg = random.choice(candidates)
            self.graph.remove_edge(*edg)
            removed += 1

In [17]:
GLOBAL_MODEL = None
SIMULATION_IMAGES = {}
VALID_DIGITS = np.arange(2, 10)  # Цифри 2, 3, ..., 9


def prepare_and_train_cnn():
    """Створює та тренує модель CNN для розпізнавання цифр MNIST (2-9)."""
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

    train_mask = np.isin(y_train, VALID_DIGITS)
    test_mask = np.isin(y_test, VALID_DIGITS)
    x_train, y_train = x_train[train_mask], y_train[train_mask]
    x_test, y_test = x_test[test_mask], y_test[test_mask]

    x_train = np.expand_dims(x_train.astype('float32') / 255.0, -1)
    x_test = np.expand_dims(x_test.astype('float32') / 255.0, -1)

    y_train_shifted = y_train - 2
    y_test_shifted = y_test - 2

    y_train_cat = tf.keras.utils.to_categorical(y_train_shifted, num_classes=8)
    y_test_cat = tf.keras.utils.to_categorical(y_test_shifted, num_classes=8)

    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(100, activation='relu'),
        tf.keras.layers.Dense(8, activation='softmax')
    ])

    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    print("Модель CNN не знайдена. Запуск тренування (5 епох)...")
    model.fit(x_train, y_train_cat, epochs=5, batch_size=32,
              validation_data=(x_test, y_test_cat), verbose=1) # verbose=1 для виводу в Jupyter

    model.save("speed_sign_cnn.h5")
    print("Модель успішно збережена як speed_sign_cnn.h5")
    return model


def load_sign_images():
    """Завантажує та зберігає зображення знаків (цифр 2-9) для симуляції камери."""
    global SIMULATION_IMAGES
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = np.expand_dims(x_train.astype('float32') / 255.0, -1)
    for digit in VALID_DIGITS:
        SIMULATION_IMAGES[digit] = x_train[y_train == digit]


def get_sign_image(digit_value):
    """Повертає випадкове зображення цифри для симуляції камери."""
    if digit_value not in SIMULATION_IMAGES:
        load_sign_images()

    if digit_value not in SIMULATION_IMAGES:
        raise ValueError(f"Цифра {digit_value} не підтримується як знак.")

    images = SIMULATION_IMAGES[digit_value]
    return random.choice(images)


def get_cnn_model():
    """Завантажує або тренує і повертає збережену модель CNN."""
    global GLOBAL_MODEL

    if GLOBAL_MODEL is None:
        if not os.path.exists("speed_sign_cnn.h5"):
            GLOBAL_MODEL = prepare_and_train_cnn()
        else:
            print("Завантаження збереженої моделі CNN з speed_sign_cnn.h5")
            GLOBAL_MODEL = tf.keras.models.load_model("speed_sign_cnn.h5")

    return GLOBAL_MODEL


def recognize_sign(image_data):
    """Приймає зображення знака, повертає визначену швидкість (20-90 км/год)."""
    model = get_cnn_model()
    input_data = np.expand_dims(image_data, 0)
    predictions = model.predict(input_data, verbose=0)
    predicted_class = np.argmax(predictions[0])
    predicted_digit = predicted_class + 2 # Зворотний зсув
    return predicted_digit * 10

In [18]:
class KnowledgeBase:
    """База знань для зберігання відомих сусідів."""

    def __init__(self):
        self.world = {}

    def tell(self, node, neighbors):
        """Записує сусідів для даного вузла."""
        if node not in self.world:
            self.world[node] = list(neighbors)

    def ask(self, node):
        """Повертає відомих сусідів вузла."""
        return self.world.get(node, [])


class CarAgent:
    """Інтелектуальний агент-автомобіль з CNN для розпізнавання швидкості."""

    def __init__(self, graph: nx.Graph, start: int, finish: int):
        self.graph = graph
        self.start = start
        self.finish = finish
        self.current_node = start

        # Стан пошуку
        self.visited = {start}
        self.history = [start]  # Для візуалізації пройденого шляху
        self.stack = [start]  # Для DFS

        self.kb = KnowledgeBase()
        self.positions = nx.get_node_attributes(graph, 'pos')
        self.speed_history = []  # (from_node, to_node, recognized_speed)

        # Ініціалізація CNN та бази зображень (відбувається тут)
        get_cnn_model()
        load_sign_images()
        self.kb.tell(start, self.perceive())
        self.is_finished = False

    def perceive(self):
        """Сприйняття: повертає всіх сусідів поточного вузла."""
        return list(self.graph.neighbors(self.current_node))

    def manhattan_distance(self, node1, node2):
        """Евристика: обчислення Манхеттенської відстані."""
        x1, y1 = self.positions[node1]
        x2, y2 = self.positions[node2]
        return abs(x1 - x2) + abs(y1 - y2)

    def set_speed(self, from_node, to_node):
        """Симуляція камери, розпізнавання знака CNN та повернення швидкості."""
        # Отримуємо 'ground truth' обмеження швидкості
        edge_data = self.graph.get_edge_data(from_node, to_node)
        sign_digit = edge_data.get('speed_limit',
                                   self.graph.get_edge_data(to_node, from_node).get('speed_limit'))

        if sign_digit is None:
            return 0

        # Симуляція камери: отримуємо зображення знака
        simulated_image = get_sign_image(sign_digit)

        # Розпізнавання за допомогою навченої CNN
        recognized_speed = recognize_sign(simulated_image)

        return recognized_speed

    def decide_next(self):
        """Прийняття рішення: вибір наступного кроку (DFS + Евристика)."""
        known_neighbors = self.kb.ask(self.current_node)
        if not known_neighbors:
            return None 

        unvisited = [n for n in known_neighbors if n not in self.visited]

        if unvisited:
            # Обираємо невідвіданий вузол з мінімальною Манхеттенською відстанню до фінішу
            return min(unvisited, key=lambda n: self.manhattan_distance(n, self.finish))
        else:
            return None 

    def move(self):
        """Метод, що керує рухом та оновленням стану агента."""

        if self.current_node == self.finish:
            self.is_finished = True
            return

        next_node = self.decide_next()

        if next_node is not None:
            # Рух вперед
            max_speed = self.set_speed(self.current_node, next_node)
            self.speed_history.append((self.current_node, next_node, max_speed))
            
            self.current_node = next_node
            self.visited.add(next_node)
            self.history.append(next_node)
            self.stack.append(next_node)
            self.kb.tell(next_node, self.perceive())

        else:
            # Повернення назад (Backtracking)
            if len(self.stack) > 1:
                self.stack.pop()
                self.current_node = self.stack[-1]
                self.history.append(self.current_node)
            else:
                self.is_finished = True

    def get_colors(self):
        """Визначення кольорів вузлів для візуалізації."""
        colors = []
        for node in self.graph.nodes():
            if node == self.start:
                colors.append("green")
            elif node == self.current_node:
                colors.append("blue")
            elif node == self.finish:
                colors.append("gold")
            elif node in self.visited:
                colors.append("lightgrey")
            else:
                colors.append("white")
        return colors

In [19]:
def draw_graph(agent, ax):
    """Відображає поточний стан графа."""

    # 1. Малювання вузлів
    nx.draw(agent.graph, agent.positions, node_color=agent.get_colors(),
            with_labels=False, node_size=500, edgecolors="black", linewidths=2,
            ax=ax)

    # 2. Малювання міток швидкості (Ground Truth: 20-90 км/год)
    edge_labels = {
        (a, b): f"{agent.graph.get_edge_data(a, b).get('speed_limit', agent.graph.get_edge_data(b, a).get('speed_limit')) * 10} км/год"
        for a, b in agent.graph.edges()
    }

    nx.draw_networkx_edge_labels(agent.graph, agent.positions,
                                 edge_labels=edge_labels, font_color='purple',
                                 font_size=8, ax=ax)

    # 3. Малювання пройденого шляху (включно зі зворотним)
    path_edges = list(zip(agent.history[:-1], agent.history[1:]))
    nx.draw_networkx_edges(agent.graph, agent.positions, edgelist=path_edges,
                           edge_color="red", width=1, ax=ax)


def update(frame, agent, fig, ani):
    """Функція, що викликається на кожному кадрі анімації."""
    ax.clear()

    if agent.is_finished:
        # Зупинка анімації
        ani.event_source.stop()

        # Виведення результатів
        print(f"Пошук завершено. Фінальний шлях: {agent.history}")
        print("\nШвидкість, розпізнана агентом на кожному відрізку:")
        for a, b, speed in agent.speed_history:
            print(f"Дорога ({a} до {b}): {speed} км/год")
            
        # Додайте кінцевий результат у вигляді тексту на графіку
        ax.text(0.5, 1.05, "ПОШУК ЗАВЕРШЕНО", transform=ax.transAxes, 
                ha="center", fontsize=12, color="red", weight="bold")
        
        # Відображення фінального стану
        draw_graph(agent, ax)

        return

    if frame > 0:
        agent.move()

    draw_graph(agent, ax)
    ax.set_title(f"Крок: {len(agent.history) - 1} | Поточний вузол: {agent.current_node}", fontsize=12)

In [26]:
PRINT_DONE = False 


def draw_graph(agent, ax):
    """Відображає поточний стан графа."""
    # (Ця функція залишається без змін)
    nx.draw(agent.graph, agent.positions, node_color=agent.get_colors(),
            with_labels=False, node_size=500, edgecolors="black", linewidths=2,
            ax=ax)

    edge_labels = {
        (a, b): f"{agent.graph.get_edge_data(a, b).get('speed_limit', agent.graph.get_edge_data(b, a).get('speed_limit')) * 10} км/год"
        for a, b in agent.graph.edges()
    }

    nx.draw_networkx_edge_labels(agent.graph, agent.positions,
                                 edge_labels=edge_labels, font_color='purple',
                                 font_size=8, ax=ax)

    path_edges = list(zip(agent.history[:-1], agent.history[1:]))
    nx.draw_networkx_edges(agent.graph, agent.positions, edgelist=path_edges,
                           edge_color="red", width=1, ax=ax)


def update(frame, agent, fig, ani):
    """Функція, що викликається на кожному кадрі анімації."""
    global PRINT_DONE
    ax.clear()

    if agent.is_finished:
        # Зупинка анімації
        ani.event_source.stop()

        if not PRINT_DONE:
            # Друк результатів лише один раз
            print(f"Пошук завершено. Фінальний шлях: {agent.history}")
            print("\n🏎️ Швидкість, розпізнана агентом на кожному відрізку:")
            for a, b, speed in agent.speed_history:
                print(f"Дорога ({a} --> {b}): {speed} км/год")
            PRINT_DONE = True # Встановлюємо прапорець, що друк виконано

        # Додайте кінцевий результат у вигляді тексту на графіку
        ax.text(0.5, 1.05, "ПОШУК ЗАВЕРШЕНО", transform=ax.transAxes, 
                ha="center", fontsize=12, color="red", weight="bold")
        
        # Відображення фінального стану
        draw_graph(agent, ax)

        return

    if frame > 0:
        agent.move()

    draw_graph(agent, ax)
    ax.set_title(f"Крок: {len(agent.history) - 1} | Поточний вузол: {agent.current_node}", fontsize=12)


#ЗАПУСК В JUPYTER

if __name__ == '__main__':
    # Скидаємо прапорець перед новим запуском, якщо це потрібно
    PRINT_DONE = False 
    
    # Створення графа 5x5 (25 вузлів)
    road = RoadGraph(size=25, remove_edges=10)
    start_node, goal_node = 0, 24

    # Ініціалізація агента (завантажить/навчить CNN)
    agent = CarAgent(road.graph, start_node, goal_node)

    fig, ax = plt.subplots(figsize=(8, 8))

    # Запуск анімації
    print("Генерація анімації...")
    
    # Використовуємо 80 кадрів, інтервал 500мс
    ani = FuncAnimation(fig, update, frames=range(0, 80), fargs=(agent, fig, None), 
                        interval=500, repeat=False)
    
    # Перепризначення fargs для коректної зупинки
    ani._args = (agent, fig, ani)

    plt.close(fig) 
    display(HTML(ani.to_jshtml()))

Генерація анімації...
Пошук завершено. Фінальний шлях: [0, 1, 2, 7, 12, 13, 14, 9, 4, 3, 8, 3, 4, 9, 14, 13, 18, 19, 18, 23, 24]

🏎️ Швидкість, розпізнана агентом на кожному відрізку:
Дорога (0 --> 1): 50 км/год
Дорога (1 --> 2): 90 км/год
Дорога (2 --> 7): 50 км/год
Дорога (7 --> 12): 40 км/год
Дорога (12 --> 13): 20 км/год
Дорога (13 --> 14): 80 км/год
Дорога (14 --> 9): 80 км/год
Дорога (9 --> 4): 70 км/год
Дорога (4 --> 3): 60 км/год
Дорога (3 --> 8): 70 км/год
Дорога (13 --> 18): 80 км/год
Дорога (18 --> 19): 80 км/год
Дорога (18 --> 23): 50 км/год
Дорога (23 --> 24): 90 км/год
