# Поиск кратчайшего пути на реальных картах

In [None]:
!pip install -r requirements.txt

# Подготовка карты

Для задания загрузим карты 3 размеров:

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from src.osmnx_utils import PlaceGraph

In [None]:
small_graph = PlaceGraph("Naro-Fominsk, Russia", road_mode="bike")
print(len(small_graph.G.vs))
small_graph.plot_graph()

In [None]:
medium_graph = PlaceGraph("Sochi, Russia", road_mode="bike")
print(len(medium_graph.G.vs))
medium_graph.plot_graph()

In [None]:
large_graph = PlaceGraph("Saint Petersburg, Russia", road_mode="bike")
print(len(medium_graph.G.vs))
large_graph.plot_graph()

In [None]:
def run_test(solver, graph, start=0, target=315, visualize=False):

    cost, path, steps = solver.find_path(graph, start, target)
    if visualize:
        display(graph.plot_route_animated(path, targets=[start, target], nodes_are_fast=True, online_mode=False))
    return cost, path, steps

# Implement generic pathfinding

Для начала реализуем обобщенный алгоритм поиска пути. 

Шаги работы алгоритма:
1. *GenericPathfinder.__init__(node_sorting_func, heuristic_func=None)* - инициализируем параметры:
   * *node_sorting_func(node)* - функция сортировки вершин в очереди
   * *heuristic_func(node, target_node)* - функция оценки длины оставшегося пути
2. запустим цикл *find_path(G, starting_node, target_nodes)*:
    1. *prepare_for_pathfinding(G, starting_node)* - инициализирует очередь и уставливает начальные значения атрибутов поиска (i.e. длина кратчайшего пути и вершина-источник)
    2. *_queue_iteration(G, target_node)* - основной цикл поиска:
       * get_next() берем следующую в очереди вершину node
       * Проверяем node["visited"]
       * Если не посещали, то для каждого соседа:
         * Вычисляем стоимость пути (реальную + оценку) с учетом перехода от node
         * Сравниваем эту стоимость с ранее посчитанной
         * Если стоимость меньше, то обновляем поля соседской вершины *set_pathfinding_attrs*
         * Если достигли целевой вершины target_node, то возвращаем стоимость пути и сам путь
         * Если не достигли, то добавим соседа в очередь

Внизу дана заготовка. Нужно дополнить блоки с меткой "TO IMPLEMENT".

In [263]:
from sortedcontainers import SortedList

from typing import List, Tuple, Union
from tqdm.auto import tqdm
import numpy as np

def default_heuristic(node_a, node_b):
    return 0

class GenericPathfinder:
    def __init__(self, node_sorting_func, heuristic_func=None, weight='travel_time'):
        self.node_sorting_func=node_sorting_func
        self.w='travel_time'
        self.heuristic_func=heuristic_func
        if self.heuristic_func is None:
            self.heuristic_func = default_heuristic
    
    def _init_queue(self, start=None):
        self.queue = SortedList(key=self.node_sorting_func)
        if start is not None:
            self.queue.add(start)

    def enqueue(self, node):
        self.queue.add(node)

    def get_next(self):
        return self.queue.pop(0)

    @staticmethod
    def set_pathfinding_attrs(
        node, sp_source=None, sp_len=None, sp_est=None, visited=None
    ):
        if sp_source is not None:
            node["shortest_path_source"]=sp_source
        if sp_len is not None:
            node["shortest_path_len"]=sp_len
        if sp_est is not None:
            node["heuristic"]=sp_est
        if visited is not None:
            node["visited"]=visited
        if all(map(lambda x: x is None, [sp_source, sp_len, sp_est, visited])):
            node["shortest_path_source"]=None
            node["shortest_path_len"]=np.inf
            node["heuristic"]=np.inf
            node["visited"]=False
    
    def recover_shortest_path(self, node):
        """
        Recursive function that recovers the sequence of ["shortest_path_source"] of node and its parents
        
        TO IMPLEMENT
        """
        pass

        
    def _queue_iteration(self, G, target_node) -> Tuple[bool, Union[None, Tuple[int, List[int]]]]:
        """
        TO IMPLEMENT
        - use methods get_next, enqueue, set_pathfinding_attrs, recover_shortest_path
        - follow typing instructions
        - write pathfinding attributes with set_pathfinding_attrs
        - dont forget to clean Node attributes at 
        - if last queue node has been already visited return False, None
        - if reached target_node return True, shortest path length and path in form of List[Node.index]
        - otherwise return True, None
        """
        ###############################################################
        return (True, None)
        ###############################################################

    def prepare_for_pathfinding(self, G, starting_node):
        """
        TO IMPLEMENT
        - resets queue and enqueues starting_node
        - sets initial pathfinding attributes for starting_node
        - use set_pathfinding_attrs and _init_queue methods
        """
        pass
    
    def find_path(self, graph, start, target):
        G=graph.G
        assert start in set(range(len(G.vs)))
        assert target in set(range(len(G.vs)))

        # graph.set_online_mode(landmark_size=0.8, traffic_factor=3.0)
        # graph.start_traffic()
        # graph.get_online_update()

        target_node=G.vs[target]
        
        self.prepare_for_pathfinding(G, G.vs[start])
        
        with tqdm(total=len(G.vs), miniters=1, desc='Looking for path...') as tracker:
            while(self.queue):
                updated, found = self._queue_iteration(G, target_node)
                if found is not None:
                    tracker.set_description(f"Found path with cost: {found[0]}")
                    steps=tracker.n
                    return (*found, steps)
                if updated:
                    tracker.update()
        return (np.inf, None, len(G.vs))

## BFS test

In [None]:
def bfs_cost(x):
    ### TO IMPLEMENT ###
    pass

bfs=GenericPathfinder(bfs_cost)

cost, path, steps = run_test(bfs, small_graph, visualize=True)

## Dijkstra variant

In [None]:
def dijkstra_cost(x):
    ### TO IMPLEMENT ###
    pass

dijkstra = GenericPathfinder(dijkstra_cost)

cost, path, steps = run_test(dijkstra, small_graph, visualize=True)

## Best-first pathfinding

Доступная информация по вершинам:

In [None]:
print(small_graph.G.vs[0])

In [None]:
for e in graph.G.es:
    print(e)
    break

Используя имеющиеся поля вершины придумайте эвристику кратчайшего пути

In [None]:
from sklearn.metrics import pairwise_distances

def simple_heuristic(node_a, node_b):
    ### TO IMPLEMENT ###
    pass

def best_first_cost(x):
    ### TO IMPLEMENT ###
    pass

best_first = GenericPathfinder(best_first_cost, heuristic_func=simple_heuristic)


cost, path, steps = run_test(best_first, small_graph, visualize=True)

## A-star

In [None]:
def a_star_cost(x):
    ### TO IMPLEMENT ###
    pass

a_star = GenericPathfinder(a_star_cost, heuristic_func=simple_heuristic)

cost, path, steps = run_test(a_star, small_graph, visualize=True)

# Advanced heuristics

Тут понадобятся методы
* graph.G - доступ к графу карты
* graph.G.vs - доступ к вершинам Node графа
* graph.G.es - доступ к вершинам Edge графа
* Node["ATTRIBUTЕ_NAME"] = X - добавление нового признака "ATTRIBUTЕ_NAME" к вершинам
* Edge["ATTRIBUTЕ_NAME"] = X - добавление нового признака "ATTRIBUTЕ_NAME" к ребрам

## ALT

Из всех вершин выбирается небольшое количество *landmarks*: $\lambda$. Для каждой вершины предварительно рассчитываются стоимости до каждого $\lambda$.

Поиск в ALT осуществляется как в A*, но оценка оставшегося пути делается на основе рассчитанных стоимостей. Пусть мы рассматриваем ребро $(u,v)$ на пути к целевой вершине $t$.\
Для каждой $\lambda$ в соответствии с неравенством треугольника мы имеем оценку оставшейся части пути (через $\lambda$): 
$$dist(\lambda, t) − dist(\lambda, v) \leq dist(v, t),$$ 
$$dist(v, \lambda) − dist(t, \lambda) \leq dist(v, t).$$ 
Минимум для всех $\lambda$ и даст искомую оценку.

Часто в качестве *landmarks* выбирают вершины, через которые проходят 3 и более кратчайших маршрутов.

**Замечание:** считать все расстояния долго и дорого, поэтому обычно вершины бьют на регионы (кластеры), например, по географическим координатам, и считают расстояния\
от их центров до *landmarks*, а затем при поиске используют эвристики для этих центроидов, применяя точный расчет A-star только для концов маршрута при пересечении границ их регионов.

In [None]:
### TO IMPLEMENT ###
####################

## Contraction hierarchies

Следующий логичный шаг - воспользоваться фактом, что некоторые кратчайшие пути имеют общие последовательности ребер $u,...,v$.\
А значит эти последовательности $u,...,v$ можно заранее рассчитать и соединить их концы $(u,v)$ новым ребром $shortcut(u,v)=dist(u,v)$.

Пары $(u,v)$ определяют по следующему алгоритму:
1. Найти всех соседей $w\in Neighbor(v)$
2. Для каждого $w\in Neighbor(v)$ рассчитать $shortest\_path(u,w)$
3. Если $|\{w:v\in shortest\_path(u,w)\}| > k$, то добавляем в граф ребро $shortcut(u,v)=dist(u,v)$

Обычно порядок перебора вершин такой: *landmarks* -> вершины с наименьшим числом инцидентных ребер (альтернатив) -> случайные вершины

In [None]:
### TO IMPLEMENT ###
####################