### Import packages

In [1]:
!pip install pathfinding

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pathfinding
  Downloading pathfinding-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: pathfinding
Successfully installed pathfinding-1.0.1


In [56]:
import requests
import zipfile
import io
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re
from typing import List
import pickle
import time
from IPython.display import display
from pathfinding.core.diagonal_movement import DiagonalMovement
from pathfinding.core.grid import Grid
from pathfinding.finder.a_star import AStarFinder

### Data mining

In [3]:
url_string = 'https://www.movingai.com/benchmarks/street/street-map.zip'
r = requests.get(url_string, stream=True)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall('maps')

In [4]:
url_string = 'https://www.movingai.com/benchmarks/street/street-scen.zip'
r = requests.get(url_string, stream=True)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall('scenarios')

In [5]:
url_string = 'https://www.movingai.com/benchmarks/street/street-png.zip'
r = requests.get(url_string, stream=True)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall('images')

### Routines

#### Work with files

In [41]:
!mkdir results

In [6]:
def readMap(fileName: str) -> np.array:
    """
    Read map file, process it and return map object.
    While preprocessing maps passable terrain to 
    True and other to False.
    :param fileName: name of file .map
    :type fileName: str
    :return: shaped preprocessed array with map
    :rtype: np.array
    """
    map_path = os.path.join('maps/', fileName)
    with open(map_path, 'r') as map_file:
        data = map_file.read().split('\n')
        map_height = int(data[1].split()[1])
        map_width = int(data[2].split()[1]) 
        data = "".join(data[4:])
        #map = [True if x == '.' else False for x in data]
        map = [1 if x == '.' else 0 for x in data]
        map = np.array(map).reshape(map_width, map_height)
    return map

In [7]:
def readScenario(fileName: str) -> pd.DataFrame:
    """
    Read scenario file, and return scenario object.
    :param fileName: name of file .scen
    :type fileName: str
    :return: data frame with scenario
    :rtype: pd.DataFrame
    """
    col_names = ['i', 'Map name', 'Width', 'Height', 'start_x', 'start_y', 'goal_x', 'goal_y', 'Path length']
    df = pd.DataFrame(columns=col_names)
    scenario_path = os.path.join('scenarios/', fileName)
    with open(scenario_path, 'r') as scenario_file:
        data = scenario_file.read().split("\n")[1:]
        for line in data:
            try:
                row = pd.DataFrame([line.split()], columns=col_names)
                df = df.append(row, ignore_index=True)
            except:
                pass
    return df

#### A* algorithm

In [71]:
def plot_astar(map: str, map_size: int, fileName: str, show: bool = False):
    """
    Visualize results of A* algorithm.
    :param map: outputted map with path
    :type map: str
    :param map_size: size of the output map
    :type map_size: int
    :param fileName: desired name of saved image
    :type filename: str
    :param show: whether to show the plot
    :type show: bool
    """
    map = map.split('\n')
    matrix = []
    mapper = {'|': 0, '+': 0, '-': 0, '#': 0, ' ': 1, 's': 2, 'x': 3, 'e': 4}
    for row in map:
        matrix.append([mapper[x] for x in list(row)])
    matrix = np.array(matrix).reshape(map_size, map_size)
    plt.ioff()
    fig, axs = plt.subplots(figsize=(25, 25))
    axs.matshow(matrix, cmap='gist_earth')
    plt.savefig('results/' + fileName + '.png')
    if show:
        plt.show()
    else:
        plt.close(fig)

In [72]:
def run_astar(map: np.array, start_x: int, start_y: int, goal_x: int, goal_y: int, fileName: str, show: bool = False):
    """
    Run A* algorithm on setted environment.
    :param map: map for searching path
    :type map: np.array
    :param start_x: x coordinape of starting point
    :type start_x: int
    :param start_y: y coordinape of starting point
    :type start_y: int
    :param goal_x: x coordinape of ending point
    :type goal_x: int
    :param goal_y: y coordinape of ending point
    :type goal_y: int
    :param fileName: name of the map & scen used for output
    :type fileName: str
    :param show: whether to show the plot
    :type show: bool
    """
    grid = Grid(matrix=map)
    start = grid.node(start_x, start_y)
    end = grid.node(goal_x, goal_y)
    finder = AStarFinder(diagonal_movement=DiagonalMovement.always)
    t1 = time.perf_counter()
    path, runs = finder.find_path(start, end, grid)
    t2 = time.perf_counter()
    map_with_path = grid.grid_str(path=path, start=start, end=end)

    runName = '_' + str(start_x) + '_' + str(start_y) + '_' + str(goal_x) + '_' + str(goal_y) + '_' + 'Astar'
    text_file = open('results/' + fileName + runName + '.txt', 'w')
    text_file.write(map_with_path)
    text_file.close()

    plot_astar(map_with_path, map.shape[0] + 2, fileName + runName)

    return len(path), runs, t2 - t1

#### JPS algorithm

#### Running all

In [78]:
def run(map_size: List = [512], scen_num: int = 1, algoritms: List = ['A*', 'JPS']):
    """
    Run all algorithms on selected maps and scenarios.
    While selecting scenario check desired `map_size`
    and choose top `scen_num` from descending path
    length list.
    :param map_size: desired sizes of map
    :type map_size: List[int]
    :param scen_num: desired amount of scenarios per map
    :type scen_num: int
    :param algoritms: desired algorithms to run
    :type algoritms: List[str]
    """
    col_names = ['Map name', 'Width', 'Height', 'start_x', 'start_y', 'goal_x', 'goal_y', 'Path length', 'Operations', 'Time', 'Algorithm']
    res_df = pd.DataFrame(columns=col_names)
    
    for path in os.listdir('scenarios/'):
        name = re.split('_|\.', path)
        if name[1] == '0' and int(name[2]) in map_size:
            fileName = name[0] + '_' + name[1] + '_' + name[2]
            map = readMap(fileName + '.' + name[3])
            scen_df = readScenario(path).iloc[-scen_num:]
            for i in range(scen_num):
                row = scen_df.iloc[i]
                start_x = int(row['start_x'])
                start_y = int(row['start_y'])
                goal_x = int(row['goal_x'])
                goal_y = int(row['goal_y'])
                for algorithm in algoritms:
                    if algorithm == 'A*':
                        path_len, runs, time = run_astar(map, start_x, start_y, goal_x, goal_y, fileName)
                        result = [fileName, map.shape[0], map.shape[1], start_x, start_y, goal_x, goal_y, path_len, runs, time, 'A*']
                        row = pd.DataFrame([result], columns=col_names)
                        res_df = res_df.append(row, ignore_index=True)
                    elif algorithm == 'JPS':
                        pass
                    else:
                        pass
    res_df.to_pickle('results/results.pkl')

### Results

In [79]:
run()

In [81]:
unpickled_df = pd.read_pickle("results/results.pkl")  
unpickled_df.head(100)

Unnamed: 0,Map name,Width,Height,start_x,start_y,goal_x,goal_y,Path length,Operations,Time,Algorithm
0,Sydney_0_512,512,512,63,510,509,10,545,31189,7.922137,A*
1,Berlin_0_512,512,512,487,504,14,42,612,35844,9.524996,A*
2,Paris_0_512,512,512,509,48,12,495,551,39925,11.558375,A*
3,Milan_0_512,512,512,506,341,110,3,571,134604,37.299698,A*
4,Denver_0_512,512,512,445,1,8,511,571,41641,10.443288,A*
5,Boston_0_512,512,512,24,458,263,9,571,127135,32.294683,A*
6,Shanghai_0_512,512,512,466,491,33,1,526,28465,9.811463,A*
7,Moscow_0_512,512,512,67,499,502,6,552,16188,3.482897,A*
8,London_0_512,512,512,214,26,51,199,675,68016,10.164441,A*
9,NewYork_0_512,512,512,452,486,14,7,545,41103,13.309922,A*
