https://www.geeksforgeeks.org/a-search-algorithm/

and AI_russel book

In [41]:
import numpy as np
import heapq

import os, subprocess, copy
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.patches as patches

In [101]:
BOARD_SIZE = (15, 15)
DIRECTION_MOVES = [(0,1), (0,-1), (-1,0), (1, 0)] # down, up, left, right
MAX_ITER = 100

XY_INITIAL = (0, 0)  # source coordinate
XY_DEST = (3, 4)  # destination coordinate
XY_WALLS = [(3, 3), (2, 4), (3, 5), (4, 5), (4, 2)]

# F = init_cost_table(BOARD_SIZE, XY_WALLS)
# sns.heatmap(F)

In [102]:
def is_move_valid(xy, board_size, excluded_moves=None):
    x, y = xy
    v = True
    if isinstance(excluded_moves, set):
        v = not tuple(xy) in excluded_moves
    return (
        0 <= x
        and 0 <= y
        and x < board_size[0] 
        and y < board_size[1]
        and v
    )

def init_cost_table(board_size, xy_walls=None):
    F = np.zeros(board_size)
    for x, y in xy_walls:
        F[x, y] = np.inf
    return F

def get_neighbors(xy, direction_moves, excluded_moves):
    neighbors = np.add(xy, direction_moves)
    valid = [is_move_valid(move, BOARD_SIZE, excluded_moves) for move in neighbors]
    neighbors = neighbors[valid]
    return neighbors

def update_cost_table(F, xys, xy_current, xy_dest):
    actual_neighbors  = []
    for xy in xys:
        x, y = xy
        G = 1
        H = np.abs(x - xy_dest[0])  + np.abs(y - xy_dest[1])
        if F[x, y] != np.inf: # if is not a wall
            F[x, y] = 1 + H
            heapq.heappush(actual_neighbors , (F[x,y], tuple(xy)))
    return actual_neighbors 

In [103]:
# setup folder for video
dir1 = "./tmp/A-star_searching"
if not os.path.exists(dir1):
    os.mkdir(dir1)
    
dir2 = "./tmp/A-star_path_reconstructing"
if not os.path.exists(dir2):
    os.mkdir(dir2)

In [104]:
open_list = []
came_from = {}
closed_set = set()
F = init_cost_table(BOARD_SIZE, XY_WALLS)

iteration = 0
heapq.heappush(open_list, (0.0, XY_INITIAL))
xy = XY_INITIAL
while len(open_list) > 0:
    if xy == XY_DEST or iteration > MAX_ITER: 
        break
    xy_previous = copy.copy(xy)
    cost, xy = heapq.heappop(open_list)
    neighbors = get_neighbors(xy, DIRECTION_MOVES, closed_set)
    closed_set.add(xy)
    actual_neighbors = update_cost_table(F, neighbors, xy, XY_DEST)
    for neighbor in actual_neighbors:
        _, neighbor_xy = neighbor
        came_from[neighbor_xy] = xy  # Set the predecessor of neighbor_xy as xy
        heapq.heappush(open_list, neighbor)  # Add the neighbor to the open list

    
    # just for plotting 
    sns.heatmap(F, annot=True)
    agent = patches.Circle((xy[1]+0.5, xy[0]+0.5), radius=0.2, color='red', fill='red', alpha=0.6) 
    dst = patches.Circle((XY_DEST[1]+0.5, XY_DEST[0]+0.5), radius=0.2, color='blue', fill='blue', alpha=0.6) 
    plt.gca().add_patch(agent)
    plt.gca().add_patch(dst)

    plt.savefig(f"{dir1}/frame_{iteration:04d}.png")
    plt.close()  # Close the plot to free memory
    iteration += 1

In [105]:
out_dir = 'path_finding_animation.mp4'
if os.path.exists(out_dir):
    os.remove(out_dir)
    
subprocess.run([
    'ffmpeg',
    '-framerate', '10',
    '-i', f'{dir1}/frame_%04d.png',
    '-c:v', 'libx264',
    '-profile:v', 'high',
    '-crf', '20',
    '-pix_fmt', 'yuv420p',
    out_dir
])

CompletedProcess(args=['ffmpeg', '-framerate', '10', '-i', './tmp/A-star_searching/frame_%04d.png', '-c:v', 'libx264', '-profile:v', 'high', '-crf', '20', '-pix_fmt', 'yuv420p', 'path_finding_animation.mp4'], returncode=0)

In [106]:
def reconstruct_path(came_from, start, destination):
    current = destination
    path = []
    while current != start:
        path.append(current)
        print(current)
        current = came_from.get(current)
        if current is None:  # Safety check
            return []
    path.append(start)  # optional, include the start in the path
    path.reverse()  # because we built it backwards
    return path

path = reconstruct_path(came_from, XY_INITIAL, XY_DEST)

(3, 4)
(4, 4)
(5, 4)
(5, 5)
(5, 6)
(4, 6)
(3, 6)
(2, 6)
(2, 5)
(1, 5)
(1, 4)
(0, 4)
(0, 3)
(0, 2)
(0, 1)


In [107]:
x_coords, y_coords = zip(*[(y+0.5, x+0.5) for x, y in path])  # Note the adjustment for heatmap indexing

for iteration, (x, y) in enumerate(path):
    sns.heatmap(F, annot=True)
    agent = patches.Circle((y+0.5, x+0.5), radius=0.2, color='red', fill='red', alpha=0.6) 
    dst = patches.Circle((XY_DEST[1]+0.5, XY_DEST[0]+0.5), radius=0.2, color='blue', fill='blue', alpha=0.6) 
    plt.gca().add_patch(agent)
    plt.gca().add_patch(dst)

    plt.plot(x_coords[:iteration], y_coords[:iteration], color="yellow", linewidth=2)
    plt.savefig(f"{dir2}/frame_{iteration:04d}.png")
    plt.close()  # Close the plot to free memory

In [108]:
out_dir = 'path_reconstruction_animation.mp4'
if os.path.exists(out_dir):
    os.remove(out_dir)
    
subprocess.run([
    'ffmpeg',
    '-framerate', '10',
    '-i', f'{dir2}/frame_%04d.png',
    '-c:v', 'libx264',
    '-profile:v', 'high',
    '-crf', '20',
    '-pix_fmt', 'yuv420p',
    out_dir
])

CompletedProcess(args=['ffmpeg', '-framerate', '10', '-i', './tmp/A-star_path_reconstructing/frame_%04d.png', '-c:v', 'libx264', '-profile:v', 'high', '-crf', '20', '-pix_fmt', 'yuv420p', 'path_reconstruction_animation.mp4'], returncode=0)