In [1]:
import torch
from torch import nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import networkx as nx
from random import choice, random, sample, seed
from tqdm.auto import tqdm
from timeit import default_timer as timer

ModuleNotFoundError: No module named 'torch'

In [None]:
'''Зібрання необхідних данних для тренування та тестування моделі'''

train_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
    target_transform=None)

test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor())

transform = transforms.ToTensor()

print(f"Залишено {len(train_data)} прикладів після фільтрації.")
print(f"Залишено {len(test_data)} прикладів після фільтрації.")

data_loader_train = DataLoader(train_data, batch_size=64, shuffle=True)
data_loader_test = DataLoader(test_data, batch_size=64, shuffle=False)

for images, labels in data_loader_train:
    print(f"Розмір батчу: {images.size()}, Мітки: {labels.unique()}")
    break

for images, labels in data_loader_test:
    print(f"Розмір батчу: {images.size()}, Мітки: {labels.unique()}")
    break


train_features, train_labels = next(iter(data_loader_train))

In [None]:
'''Сама модель з її шарами та функцією forward'''

from torch import nn

class CNNModel(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride=2)
        )
        self.block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_units*7*7,
                      out_features=output_shape)
        )

    def forward(self, x: torch.Tensor):
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.classifier(x)
        return x

torch.manual_seed(42)
model_0 = CNNModel(input_shape=1,
                   hidden_units=10,
                   output_shape=len(train_data.classes))
model_0

In [2]:
'''Додаткові функції для тренування та висвітлення результатів моделі'''

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_0.parameters(), lr=0.01)

def print_train_time(start: float, end: float):
    total_time = end - start
    print(f"Train time: {total_time:.3f} seconds")
    return total_time

def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(data_loader):
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        train_loss += loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(data_loader)
    print(f"Train loss: {train_loss:.5f}")

def test_step(data_loader: torch.utils.data.DataLoader,
              model: torch.nn.Module,
              loss_fn: torch.nn.Module):
    test_loss, test_acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            test_pred = model(X)
            test_loss += loss_fn(test_pred, y)
        test_loss /= len(data_loader)
        print(f"Test loss: {test_loss:.5f}\n")

NameError: name 'nn' is not defined

In [3]:
'''Тренування моделі та висвітлення втрат пі час тренування'''

# torch.manual_seed(42)
train_time_start_on_gpu = timer()

epochs = 5
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n---------")
    train_step(data_loader=data_loader_train,
               model=model_0,
               loss_fn=loss_fn,
               optimizer=optimizer)

    test_step(data_loader=data_loader_test,
              model=model_0,
              loss_fn=loss_fn)

train_time_end_on_gpu = timer()
total_train_time_model_1 = print_train_time(start=train_time_start_on_gpu,
                                            end=train_time_end_on_gpu)

NameError: name 'timer' is not defined

In [1]:
plt.plot(list(range(5)), [float(i) for i in y_loss])
plt.title('Функція втрат від епохи')
plt.show();

NameError: name 'plt' is not defined

In [4]:
# seed(42)
'''Перевірка на правильність роботи моделі, якщо потрібен фіксований результат, то можна розкоментувати команду seed(42)
Якщо ж необхідно розглянути сам семпл(картинку), що розібрана на тенсори, то треба розкоментувати та запустити код'''

test_samples = [choice(test_data) for _ in range(3)]
unsqueezed_samples = [sample[0].unsqueeze(dim=0) for sample in test_samples]
res = []
for i in unsqueezed_samples:
    res.append(model_0(i))
res_prob = torch.stack([torch.softmax(i.squeeze(), dim=0) for i in res])
# image, label = test_samples[2]
# print(f"Image shape: {image.shape}")
# plt.imshow(image.squeeze()) # image shape is [1, 28, 28] (colour channels, height, width)
# print(test_samples[2][1])
# print(res)
# plt.title(label);
pred_classes = res_prob.argmax(dim=1)
pred_classes, [i[1] for i in test_samples]

NameError: name 'choice' is not defined

In [5]:
def create_graph(f, s):
    '''Функція для створення графу'''
    Gr = nx.Graph()
    nodes = [[(i, j) for i in range(f)] for j in range(s)]
    edges = []
    for i in range(s - 1):
        for j in range(f - 1):
            edges.append((nodes[i][j], nodes[i][j+1]))
            edges.append((nodes[i][j], nodes[i+1][j]))
        edges.append((nodes[i][f-1], nodes[i+1][f-1]))
    Gr.add_edges_from(edges)
    return Gr

def delete_edges(n, graphs):
    '''Функція для видалення випадкових ребер з графу так,
    щоб граф при цьому залишався зв'язним'''
    seq = list(G.edges)
    for i in range(n):
        while seq:
            ran = choice(seq)
            graphs.remove_edge(ran[0], ran[1])
            seq.remove(ran)
            if nx.is_connected(G):
                break
            graphs.add_edge(ran[0], ran[1])
    return graphs


m = 15
ln1 = 5
ln2 = 5
G = create_graph(ln1, ln2)
G = delete_edges(m, G)
edge_labels = dict()
for i in G.edges:
    edge_labels[i] = 1
node_colors = ['blue' for _ in range(len(G.nodes))]
pos = {node: node for node in G.nodes}
nx.draw(G, pos, with_labels=True, node_color=node_colors)

NameError: name 'nx' is not defined

In [6]:
'''Класи бази Агента, що містить в собі всі властивості та дані, а також сам клас Агента, що містить всі необхідні методи для функціонування'''

class AgentBase:
    '''Клас бази даних Агента'''
    def __init__(self, start_pos, end_pos):
        self.new_edges_speed = dict()
        self.new_edges_speed_copy = dict()
        self.current_pos = start_pos
        self.depth = {1: self.current_pos}
        self.start_pos = start_pos
        self.end_pos = end_pos
        self.pos_for_neighbours = {}
        self.check_environment(start_pos)
        self.path = [start_pos]
        self.length = ln1
        self.width = ln2
        self.speed = None
        self.point_for_speed = None
        self.lis = list(G.edges)
        ln = 0
        while len(self.new_edges_speed) != len(self.lis):
            new = choice(test_data)
            if new[1] >= 2:
                self.new_edges_speed.update({self.lis[ln]: new})
                ln += 1
        self.new_edges_speed_copy = self.new_edges_speed.copy()

        for j in range(len(self.lis)):
            self.lis[j] = list(self.lis[j])
            self.lis[j][0] = list(self.lis[j][0])
            self.lis[j][1] = list(self.lis[j][1])

    def check_environment(self, new_pos):
        if new_pos != self.current_pos:
            speed_sample = None
            if (tuple(self.current_pos), tuple(new_pos)) in self.new_edges_speed.keys():
                speed_sample = self.new_edges_speed[(tuple(self.current_pos), tuple(new_pos))]
                self.point = 0
            if (tuple(new_pos), tuple(self.current_pos)) in self.new_edges_speed.keys():
                speed_sample = self.new_edges_speed[(tuple(new_pos), tuple(self.current_pos))]
                self.point = 1
            prediction = torch.softmax(model_0(speed_sample[0].unsqueeze(dim=0)).squeeze(), dim=0).argmax(dim=0)
            # print(int(prediction))
            # print(speed_sample[1])
            self.speed = int(prediction)
            if self.point:
                self.new_edges_speed.update({(tuple(new_pos), tuple(self.current_pos)): int(prediction)})
            else:
                self.new_edges_speed.update({(tuple(self.current_pos), tuple(new_pos)): int(prediction)})
        self.current_pos = new_pos
        self.depth[len(self.depth) + 1] = new_pos
        self.pos_for_neighbours[tuple(new_pos)] = list(G.neighbors(tuple(new_pos)))
        for a in list(G.neighbors(tuple(new_pos))):
            node_colors[list(G.nodes).index(a)] = 'red'



class Agent(AgentBase):
    '''Клас самого Агента'''
    def __init__(self, start_pos, end_pos):
        super().__init__(start_pos, end_pos)

    def next_move(self):
        if self.current_pos[0] < self.end_pos[0]:
            if ([self.current_pos, [self.current_pos[0] + 1, self.current_pos[1]]] in self.lis and
                    [self.current_pos[0] + 1, self.current_pos[1]] not in self.path):
                self.step_up()

            elif (self.current_pos[1] != self.width - 1 and
                  ([self.current_pos, [self.current_pos[0], self.current_pos[1] + 1]] in self.lis and
                   [self.current_pos[0], self.current_pos[1] + 1] not in self.path)):
                self.step_right()

            elif self.current_pos[1] != 0 and (
                    [[self.current_pos[0], self.current_pos[1] - 1], self.current_pos] in self.lis and
                    [self.current_pos[0], self.current_pos[1] - 1] not in self.path):
                self.step_left()

            elif self.current_pos[0] != 0 and (
                    [[self.current_pos[0] - 1, self.current_pos[1]], self.current_pos] in self.lis and
                    [self.current_pos[0] - 1, self.current_pos[1]] not in self.path):
                self.step_down()

            else:
                self.go_back()

        elif self.current_pos[0] > self.end_pos[0]:
            if ([[self.current_pos[0] - 1, self.current_pos[1]], self.current_pos] in self.lis and
                    [self.current_pos[0] - 1, self.current_pos[1]] not in self.path):
                self.step_down()

            elif self.current_pos[1] != self.width - 1 and (
                    [self.current_pos, [self.current_pos[0], self.current_pos[1] + 1]] in self.lis and
                    [self.current_pos[0], self.current_pos[1] + 1] not in self.path):
                self.step_right()

            elif self.current_pos[1] != 0 and (
                    [[self.current_pos[0], self.current_pos[1] - 1], self.current_pos] in self.lis and
                    [self.current_pos[0], self.current_pos[1] - 1] not in self.path):
                self.step_left()

            elif self.current_pos[0] != self.length - 1 and (
                    [self.current_pos, [self.current_pos[0] + 1, self.current_pos[1]]] in self.lis and
                    [self.current_pos[0] + 1, self.current_pos[1]] not in self.path):
                self.step_up()

            else:
                self.go_back()

        elif self.current_pos[1] < self.end_pos[1]:
            if ([self.current_pos, [self.current_pos[0], self.current_pos[1] + 1]] in self.lis and
                    [self.current_pos[0], self.current_pos[1] + 1] not in self.path):
                self.step_right()

            elif self.current_pos[0] != 0 and (
                    [[self.current_pos[0] - 1, self.current_pos[1]], self.current_pos] in self.lis and
                    [self.current_pos[0] - 1, self.current_pos[1]] not in self.path):
                self.step_down()

            elif self.current_pos[1] != 0 and (
                    [[self.current_pos[0], self.current_pos[1] - 1], self.current_pos] in self.lis and
                    [self.current_pos[0], self.current_pos[1] - 1] not in self.path):
                self.step_left()

            elif self.current_pos[0] != self.length - 1 and (
                    [self.current_pos, [self.current_pos[0] + 1, self.current_pos[1]]] in self.lis and
                    [self.current_pos[0] + 1, self.current_pos[1]] not in self.path):
                self.step_up()

            else:
                self.go_back()

        elif self.current_pos[1] > self.end_pos[1]:
            if ([[self.current_pos[0], self.current_pos[1] - 1], self.current_pos] in self.lis and
                    [self.current_pos[0], self.current_pos[1] - 1] not in self.path):
                self.step_left()

            elif self.current_pos[0] != 0 and (
                    [[self.current_pos[0] - 1, self.current_pos[1]], self.current_pos] in self.lis and
                    [self.current_pos[0] - 1, self.current_pos[1]] not in self.path):
                self.step_down()

            elif self.current_pos[1] != self.width - 1 and (
                    [self.current_pos, [self.current_pos[0], self.current_pos[1] + 1]] in self.lis and
                    [self.current_pos[0], self.current_pos[1] + 1] not in self.path):
                self.step_right()

            elif self.current_pos[0] != self.length - 1 and (
                    [self.current_pos, [self.current_pos[0] + 1, self.current_pos[1]]] in self.lis and
                    [self.current_pos[0] + 1, self.current_pos[1]] not in self.path):
                self.step_up()

            else:
                self.go_back()

        elif self.current_pos == self.end_pos:
            print(f"Program has finished it's work with success at the point {self.current_pos}")
            return False

        else:
            print('Wrong start or wrong code')
            return False

        return True



    def step_up(self):
        new_pos = [self.current_pos[0] + 1, self.current_pos[1]]
        self.check_environment(new_pos)
        self.path.append(new_pos)

    def step_down(self):
        new_pos = [self.current_pos[0] - 1, self.current_pos[1]]
        self.check_environment(new_pos)
        self.path.append(new_pos)

    def step_right(self):
        new_pos = [self.current_pos[0], self.current_pos[1] + 1]
        self.check_environment(new_pos)
        self.path.append(new_pos)

    def step_left(self):
        new_pos = [self.current_pos[0], self.current_pos[1] - 1]
        self.check_environment(new_pos)
        self.path.append(new_pos)

    def go_back(self):
        self.depth.pop(len(self.depth))
        new_pos = self.depth[len(self.depth)]
        self.path.append(new_pos)
        self.current_pos = new_pos

pos1 = pos.copy()

NameError: name 'pos' is not defined

In [7]:
'''Запуск роботи Агента та виконання всіх необхідних інструкцій для демонстрації його шляху проходження,
а також висвітлення на його ребрах швидкостей, з якими він пересувався по цим "ділянкам"(ребрам)'''

agent = Agent([4, 1], [1, 4])
while True:
    if not agent.next_move():
        break

print(f"Agent path: {agent.path}")

for i in range(len(agent.path)):
    node_colors[list(G.nodes).index(tuple(agent.path[i]))] = 'green'

for i in list(G.nodes):
    if node_colors[list(G.nodes).index(i)] == 'blue':
        node_colors.pop(list(G.nodes).index(i))
        G.remove_node(i)

pos = {node: node for node in G.nodes}
plt.figure(figsize=(8, 6))
nx.draw(G, pos, with_labels=True, node_color=node_colors)

edge_labels = dict()
for key, value in agent.new_edges_speed.items():
    if type(value) is int:
        edge_labels.update({key: value})

nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=10)
plt.title("Graph with Weighted Edges", fontsize=15)
plt.show()

NameError: name 'G' is not defined

In [None]:
'''Щоб подивитися як виглядає картинка швидкості для певного ребра оберіть номер проходження ребра у функції та запустіть її'''

def check_image(order):
    if (tuple(agent.path[order]), tuple(agent.path[order+1])) in agent.new_edges_speed_copy.keys():
        image, label = agent.new_edges_speed_copy[(tuple(agent.path[order]), tuple(agent.path[order+1]))]
    if (tuple(agent.path[order+1]), tuple(agent.path[order])) in agent.new_edges_speed_copy.keys():
        image, label = agent.new_edges_speed_copy[(tuple(agent.path[order+1]), tuple(agent.path[order]))]
    print(f"Image shape: {image.shape}")
    plt.imshow(image.squeeze())
    print((tuple(agent.path[order+1]), tuple(agent.path[order])))
    plt.title(label);

check_image(3)