## Задача 5-1. A\* поиск в задаче о кратчайших путях.

В этой задаче Вам предлагается реализовать поиск кратчайших путей в графе с помощью A\*-поиска с использованием эвристической функции («потенциала»), основанном на landmarks. Теоретические основы можно посмотреть [здесь](http://logic.pdmi.ras.ru/midas/sites/default/files/midas-werneck.pdf), слайды 20—36.

Вам предлагается скачать [отсюда](http://www.diag.uniroma1.it/challenge9/download.shtml) файлы “Travel time graph” и “Coordinates” для штата Флорида. Для Вашего удобства они также размещены в архиве `florida.7z` в настоящем репозитории на GitHub.

Функции `read_node_coords` и `read_arcs` возвращают соответственно координаты вершин графа (отнормированные; координаты нужны только для обеспечения возможности выбора landmarks “по периметру графа”) и структура дуг графа.

In [1]:
import math
from typing import List, Dict, Tuple
from collections import namedtuple, defaultdict
Coords = namedtuple('Coords', ['x', 'y'])

def read_node_coords(filename='USA-road-d.FLA.co') -> List[int]:
    node_coords = []
    
    with open(filename, 'r') as coord_file:
        for line in coord_file:
            if line.startswith('v '):
                node_number, x, y = map(int, line.split()[1:])
                node_coords.append(Coords(x, y))
    
    minx = min(c.x for c in node_coords)
    miny = min(c.y for c in node_coords)
    for i, c in enumerate(node_coords):
        node_coords[i] = Coords(c.x-minx, c.y-miny)
    
    return node_coords


def read_arcs(filename='USA-road-t.FLA.gr') -> Dict[int, Dict[int, float]]:
    adjacency_lists = defaultdict(dict)
    
    with open(filename, 'r') as coord_file:
        for line in coord_file:
            if line.startswith('a '):
                node_from, node_to, weight = map(int, line.split()[1:])
                adjacency_lists[node_from-1][node_to-1] = weight
                
    return adjacency_lists

Реализуйте процедуру `good_old_dijkstra`, которая для пары номеров вершин графа ищет кратчайший путь между ними и возвращает список номеров вершин, образующих оптимальный путь и его длину.

In [10]:

import queue

def good_old_dijkstra(adjacency_lists, node_from, node_to):    
    
    q = queue.PriorityQueue()
    parent = {}
    dist = {}
    
    for v in adjacency_lists:
        parent[v] = -1
        dist[v] = math.inf    
    
    dist[node_from] = 0
    q.put((0, node_from))
    while not q.empty():
        cur_dist, cur_v = q.get()
        for v in adjacency_lists[cur_v]:
            if cur_dist + adjacency_lists[cur_v][v] < dist[v]:
                dist[v] = cur_dist + adjacency_lists[cur_v][v]
                parent[v] = cur_v
                q.put((cur_dist + adjacency_lists[cur_v][v], v))

    cur = node_to
    to_return = [] # восстановление пути
    while cur != -1:
        to_return.append(cur)
        cur = parent[cur]
    to_return.reverse()
    return (to_return, dist[node_to])




In [3]:
%%time
adj = read_arcs()
coords = read_node_coords()

Wall time: 52.4 s


Реализуйте тройку процедур `choose_landmarks`, `precalculate_landmark_distances` и `a_star_with_landmarks`. Процедура `choose_landmarks` выбирает нужное количество специальных вершин графа — этот выбор делается равномерным выбором по периметру графа (см. слайд 30 в [презентации](http://logic.pdmi.ras.ru/midas/sites/default/files/midas-werneck.pdf)). Процедура `precalculate_landmark_distances` для каждой вершины из заданного набора с помощью обычного алгоритма Дейкстры вычисляет расстояния до всех вершин графа. Эта информация затем используется в `a_star_with_landmarks` для ускорения поиска кратчайшего пути.

In [22]:
def dist(v1, v2):
        return math.sqrt((v1.x - v2.x)*(v1.x - v2.x) + (v1.y - v2.y)*(v1.y - v2.y))

def angle(v, center):
    alpha = math.atan2(v.x - center[0], v.y - center[1])
    if alpha < 0:
        alpha += 2 * math.pi

from numpy import arctan2    
    
def choose_landmarks(node_coords, num_landmarks=15) -> List[int]:
    
    center = [sum([v.x for v in node_coords]) / len(node_coords),
            sum([v.y for v in node_coords]) / len(node_coords)]
    
    sectors = [[-1, 1] for i in range(num_landmarks)]
    
    landmarks = []
    for i in range(len(node_coords)): # распределим по секторам
        sector_num = int(angle(node_coords[i], center) * num_landmarks / (2 * math.pi))
        sectors[sector_num].append((v, i))
        
    for sector in sectors: # ищем в секторе лучшую
        best = sector[0][0]
        for v, num in sector:
            if dist(v, Coords(center[0], center[1])) < dist(best, Coords(center[0], center[1])):
                best = v
        landmarks.append(num)
    
    return landmarks

    

In [23]:

def precalculate_landmark_distances(adjacency_lists: Dict[int, Dict[int, float]], landmarks: List[int]) -> Dict[int, Dict[int, float]]:
    ddist = {}
    for i in range(len(landmarks)):
        node_from = landmarks[i]
        q = queue.PriorityQueue()
        parent = {}
        dist = {}
    
        for v in adjacency_lists:
            parent[v] = -1
            dist[v] = math.inf    

        dist[node_from] = 0
        q.put((0, node_from))
        while not q.empty():
            cur_dist, cur_v = q.get()
            for v in adjacency_lists[cur_v]:
                if cur_dist + adjacency_lists[cur_v][v] < dist[v]:
                    dist[v] = cur_dist + adjacency_lists[cur_v][v]
                    parent[v] = cur_v
                    q.put((cur_dist + adjacency_lists[cur_v][v], v))

        ddist[i] = dist
    return ddist
        


In [24]:
def a_star_with_landmarks(adjacency_lists: Dict[int, Dict[int, float]], node_from, node_to, landmark_distances: Dict[int, Dict[int, float]]) -> Tuple[List[int], float]:
    
    q = queue.PriorityQueue()
    parent = {}
    dist = {}
    real_dist = {}
    
    for v in adjacency_lists:
        parent[v] = -1
        dist[v] = math.inf    
    
    dist[node_from] = 0
    real_dist[node_from] = 0
    q.put((0, node_from))
    while not q.empty():
        cur_dist, cur_v = q.get()
        if cur_v == node_to:
            break   
        for v in adjacency_lists[cur_v]:
            best = 0
            for ld in landmark_distances:
                best = max(res, abs(landmark_distances[ld][v] - landmark_distances[ld][node_to]))
            if real_dist[cur_v] + adjacency_lists[cur_v][v] + best < dist[v]:
                real_dist[v] = real_dist[cur_v] + adjacency_lists[cur_v][v] + best
                dist[v] = real_dist[cur_v] + adjacency_lists[cur_v][v]
                parent[v] = cur_v
                q.put((cur_dist + adjacency_lists[cur_v][v], v))

    cur = node_to
    to_return = [] # восстановление пути
    while cur != -1:
        to_return.append(cur)
        cur = parent[cur]
    to_return.reverse()
    return (to_return, real_dist[node_to])

In [25]:
import time
from random import randrange

def run_all():
    node_coords = read_node_coords()
    adjacency_lists = read_arcs()
    num_nodes = len(node_coords)
    
    time_start = time.monotonic()
    landmark_distances = precalculate_landmark_distances(choose_landmarks(node_coords))
    print('Precalculation done in {:.2} seconds.'.format(time.monotonic() - time_start))
    
    time_dijkstra = 0
    time_a_star = 0
    
    num_tests = 100
    for _ in range(num_tests):
        node_from, node_to = randrange(num_nodes), randrange(num_nodes)
        time_start = time.monotonic()
        good_old_dijkstra(adjacency_lists, node_from, node_to)
        time_dijkstra += time.monotonic()-time_start
        time_start = time.monotonic()
        a_star_with_landmarks(adjacency_lists, node_from, node_to, landmark_distances)
        time_a_star += time.monotonic()-time_start
    
    print('Time elapsed in {} test: {:.2} second for A* vs. {:.2} seconds for Dijkstra.'.format(num_tests, time_a_star, time_dijkstra))

In [None]:
run_all()