# Dijkstra计算有向加权图最短路径
# 1: 三个表 邻接权重表 各点权重表 标记最优表 父节点表(回溯路径用)
# 2: 利用堆实现优先队列 弹出最小权重点 标记为最优 后续无需再判断它 
# 3: 查找该最优点的邻接点 (已标记最优点跳过) 计算当前权重 判断是否更新

In [1]:
# Dijkstra.狄杰斯特拉 加权有向图 没有负权重边
# https://www.bilibili.com/video/BV1zz4y1m7Nq?spm_id_from=333.1007.top_right_bar_window_history.content.click
import heapq
import math


# 初始化记 录start到其他节点距离，初始为无穷大
def init_distance(graph, s):
    distance = {s: 0}
    for vertex in graph:
        if vertex != s:
            distance[vertex] = math.inf
    return distance


def dijkstra(graph, s):
    pqueue = []
    heapq.heappush(pqueue, (0, s))  # 实现优先队列
    seen = set()  # 记录可以标记最优路径的节点
    parent = {s: None}  # 记录每个节点父亲节点
    distance = init_distance(graph, s)  # 记录start到其他节点距离，初始为无穷大

    while len(pqueue) > 0:
        #1 弹出最小的节点并标记为最优路径的节点
        pair = heapq.heappop(pqueue)
        dist = pair[0]  # 当前距离
        vertex = pair[1]  # 当前节点
        if vertex not in seen:  # 可有可无 根据题意 可以保证最优点只访问一次
            # if vertex==target:
            #     return dist
            # 每次从队列里面拿出来的都是 distFromStart 值最小的，
            # 所以当你第一次从队列中拿出终点 end 时，
            # 此时的 distFromStart 对应的值就是从 start 到 end 的最短距离。
            seen.add(s)  # 标记最优
            neis = graph[vertex].keys()  # 邻接点
            #2 寻找当前节点的邻接点(排除已经标记的最优节点)，
            # 当前从start的距离+当前节点到邻接点距离 与 已记录距离比较以更新
            # 更新时将 新距离和节点入队 并更新邻接点的父节点，已经覆盖已记录的start到该点的距离
            for nei in neis:
                if nei not in seen:  # 可有可无 最优点肯定过不了下一行判断
                    if dist + graph[vertex][nei] < distance[nei]:
                        heapq.heappush(pqueue, (dist + graph[vertex][nei], nei))
                        parent[nei] = vertex
                        distance[nei] = dist + graph[vertex][nei]
    return parent, distance


if __name__ == '__main__':
    graph_dict = {
        "A": {
            "B": 5,
            "C": 1
        },
        "B": {
            "A": 5,
            "C": 2,
            "D": 1
        },
        "C": {
            "A": 1,
            "B": 2,
            "D": 4,
            "E": 8
        },
        "D": {
            "B": 1,
            "C": 4,
            "E": 3,
            "F": 6
        },
        "E": {
            "C": 8,
            "D": 3
        },
        "F": {
            "D": 6
        },
    }

    parent_dict, distance_dict = dijkstra(graph_dict, "A")
    print(parent_dict)
    print(distance_dict)


{'A': None, 'B': 'C', 'C': 'A', 'D': 'B', 'E': 'D', 'F': 'D'}
{'A': 0, 'B': 3, 'C': 1, 'D': 4, 'E': 7, 'F': 10}


https://www.redblobgames.com/pathfinding/a-star/implementation.html

In [None]:
# BFS
from implementation import *

def breadth_first_search(graph: Graph, start: Location, goal: Location):
    frontier = Queue()
    frontier.put(start)
    came_from: Dict[Location, Optional[Location]] = {}
    came_from[start] = None
    
    while not frontier.empty():
        current: Location = frontier.get()
        
        if current == goal:
            break
        
        for next in graph.neighbors(current):
            if next not in came_from:
                frontier.put(next)
                came_from[next] = current
    
    return came_from

g = SquareGrid(30, 15)
g.walls = DIAGRAM1_WALLS

start = (8, 7)
goal = (17, 2)
parents = breadth_first_search(g, start, goal)
draw_grid(g, point_to=parents, start=start, goal=goal)

In [None]:
# dijkstra
def dijkstra_search(graph: WeightedGraph, start: Location, goal: Location):
    frontier = PriorityQueue()
    frontier.put(start, 0)
    came_from: Dict[Location, Optional[Location]] = {}
    cost_so_far: Dict[Location, float] = {}
    came_from[start] = None
    cost_so_far[start] = 0
    
    while not frontier.empty():
        current: Location = frontier.get()
        
        if current == goal:
            break
        
        for next in graph.neighbors(current):
            # 当节点间步长为1即graph.cost(current, next)=1 则不需要new_cost < cost_so_far[next] 因为肯定成立
            new_cost = cost_so_far[current] + graph.cost(current, next)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost
                frontier.put(next, priority)
                came_from[next] = current
    
    return came_from, cost_so_far

def reconstruct_path(came_from: Dict[Location, Location],
                     start: Location, goal: Location) -> List[Location]:

    current: Location = goal
    path: List[Location] = []
    while current != start: # note: this will fail if no path found
        path.append(current)
        current = came_from[current]
    path.append(start) # optional
    path.reverse() # optional
    return path

In [None]:
# A* 必须有提前退出
# https://godorz.info/2009/11/heuristics-2/
# 曼哈顿距离（Manhattan Distance）：D*(abs(x1 - x2) + abs(y1 - y2))
# 一般只能在四个方向上移动时用（右、左、上、下）

# 对角线距离（Diagonal Distance）：
# 当我们只允许向八个方向移动时用（国际象棋中的王移动方式那种）

# 欧几里得距离（Euclidean Distance）：D*(((x1-x2)**2+(y1-y2)**2)**.5)
# 不受限制，允许向任何方向移动时。 八个方向欧几里得就会高估 采用切比雪夫或对角线

# 切比雪夫距离（Chebyshev Distance）：D*(max(abs(x1-x2),abs(y1-y2)))
# 可参考LeetCode 1266，https://leetcode-cn.com/problems/minimum-time-visiting-all-points/solution/fang-wen-suo-you-dian-de-zui-xiao-shi-jian-by-le-2/
def heuristic(a: GridLocation, b: GridLocation) -> float:
    (x1, y1) = a
    (x2, y2) = b
    return abs(x1 - x2) + abs(y1 - y2)

def a_star_search(graph: WeightedGraph, start: Location, goal: Location):
    frontier = PriorityQueue()
    frontier.put(start, 0)
    came_from: Dict[Location, Optional[Location]] = {}
    cost_so_far: Dict[Location, float] = {}
    came_from[start] = None
    cost_so_far[start] = 0
    
    while not frontier.empty():
        current: Location = frontier.get()
        
        if current == goal:
            break
        
        for next in graph.neighbors(current):
            # 当节点间步长为1即graph.cost(current, next)=1 则不需要new_cost < cost_so_far[next] 因为肯定成立
            new_cost = cost_so_far[current] + graph.cost(current, next) 
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost + heuristic(next, goal)
                frontier.put(next, priority)
                came_from[next] = current
    
    return came_from, cost_so_far

# 关于对角线距离
如果对角线移动一个栅格的代价并不是D,而是类似于D2=sqrt(2)*D,那么上面的启发函数对你而言又是错误的了.你可以使用一个更为复杂的启发函数代替它.

h_diagonal(n) = min(abs(n.x-goal.x), abs(n.y-goal.y))
h_straight(n) = (abs(n.x-goal.x) + abs(n.y-goal.y))
h(n) = D2 * h_diagonal(n) + D * (h_straight(n) - 2*h_diagonal(n)))
这里我们计算 h_diagonal(n) = 沿着对角线移动的步数, h_straight(n) = 曼哈顿距离,你可以通过考虑所有的对角线移动的步数(每步耗散D2)以及剩下的直线移动的步数(每步耗散D)将这两者结合在一起.

In [1]:
class Graph:

    def __init__(self):
        self.edges = {}
        self.weights = {}

    def neighbors(self, id):
        return self.edges.get(id, [])

    def cost(self, a, b):
        return self.weights.get((a, b), 1)

    def add(self, a, b, w):
        self.edges.setdefault(a, []).append(b)
        self.weights[(a, b)] = w


class PriorityQueue:

    def __init__(self):
        self.elements = {}

    def empty(self):
        return len(self.elements) == 0

    def put(self, item, priority):
        if item in self.elements:
            print("Reprioritizing", item, "from", self.elements[item], "to", priority)
        else:
            print("Inserting", item, "with priority", priority)
        self.elements[item] = priority

    def get(self):
        best_item, best_priority = None, None
        for item, priority in self.elements.items():
            if best_priority is None or priority < best_priority:
                best_item, best_priority = item, priority

        del self.elements[best_item]
        return best_item


def search(graph, start, stop):
    frontier = PriorityQueue()
    frontier.put(start, 0)
    came_from = {}
    cost_so_far = {}
    came_from[start] = None
    cost_so_far[start] = 0

    while not frontier.empty():
        current = frontier.get()
        if current == stop: break

        for next in graph.neighbors(current):
            new_cost = cost_so_far[current] + graph.cost(current, next)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                frontier.put(next, new_cost)
                came_from[next] = current

    return came_from


g = Graph()
g.add('A', 'B', 1)
g.add('B', 'C', 1)
g.add('C', 'D', 1)
g.add('D', 'E', 1)
g.add('B', 'E', 4)
for key, value in search(g, 'A', 'E').items():
    print(value, '->', key)

Inserting A with priority 0
Inserting B with priority 1
Inserting C with priority 2
Inserting E with priority 5
Inserting D with priority 3
Reprioritizing E from 5 to 4
None -> A
A -> B
B -> C
D -> E
C -> D
