# SLAM Unit H
## Path planning
This unit has a slightly different topic than the previous units. It discusses some approaches for planning the path of a robot, given start and endpoint.

## Introduction and the Algorithm of Dijkstra
This very famous algorithm is named after the Dutch computer scientist [Edsger Dijkstra](https://en.wikipedia.org/wiki/Edsger_W._Dijkstra) \[ˈdɛikstra\] who, besides many other important contributions to computer science, found it in 1956.

In [3]:
# If you don't see a video below, run this cell.
# YouTube = True  # Uncomment to get YouTube videos instead of TIB AV.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/_Bmnxjj9lJw" if "YouTube" in globals() else "//av.tib.eu/player/49039",
       width=560, height=315)

### A first quiz: Cost adjustment (2 Points).
Consider our first implementation of the Dijkstra algorithm. A node has been seen for the first time already. Then (in the for loop over all neighbors), its cost is adjusted.

In a graph of 100 nodes, how many times can a node cost be adjusted (at most)?

In [4]:
how_many_times_adjusted = 98
# YOUR CODE HERE
#raise NotImplementedError()

In [5]:
from hashlib import shake_128
def public_string_test(the_answer_string, reference):
    m = shake_128()
    m.update(the_answer_string.encode())
    return m.hexdigest(4) == reference
assert public_string_test(("adj=%d" % how_many_times_adjusted), 'cddd7485'), "Oh no, your answer is wrong!"

## The Algorithm of Dijkstra: the set of visited nodes.

In [6]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/8qXG_k1bXEs" if "YouTube" in globals() else "//av.tib.eu/player/49040",
       width=560, height=315)

Note that the comments in the video regarding the required installation of additional libraries do not apply to this jupyter notebook version of the course. Also, `world_extents`, `world_obstacles`, `visited_nodes` and `optimal_path` are not handled using global variables, callback functions are not visible to you, and there is no "main function". While this "decoration" is different, the core functions (e.g., `dijkstra()`) are the same as in the videos.

### Programming assignment: Implement the Algorithm of Dijkstra (20 Points).
As a first step, we compute the visited nodes, but not yet the actual path from start to goal.
In your implementation, try to work along the detailed comments in the code below. Do not attempt to return a shortest path yet (we will do this later). Note that start and goal are (x,y) tuples of the start and goal position. For `obstacles`, our convention is that the value is 255 if a cell is occupied and 0 if it is free.

In [46]:
# pp_01_a_dijkstra
import numpy as np
# Allowed movements and costs on the grid.
# Each tuple is: (movement_x, movement_y, cost).
s2 = np.sqrt(2) #斜着走的长度
movements = [ # Direct neighbors (4N). （x,y,distance）
              (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
              # Diagonal neighbors.
              # Comment this out to play with 4N only (faster).
              (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2),
            ]

def dijkstra(start, goal, obstacles, movements):
    """Dijkstra's algorithm. First version does not use a heap."""
    # In the beginning, the start is the only element in our front.
    # The first element is the cost of the path from the start to the point.
    # The second element is the position (cell) of the point.
    front = [ (0.0, start) ]

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited = np.zeros(extents, dtype=np.float32)#一个在内存中占32个bits，也就是4bytes

    # While there are elements to investigate（调查） in our front.
    # YOUR CODE HERE
    
   
    
    #raise NotImplementedError()

    while front:
        # Get smallest item and remove it from front.
        # CHANGE 01_a:
        # - Get smallest element from 'front'. Hint: min() may be useful.
        # - Remove this element from 'front'. Hint: 'front' is a list.
        element_min = min(front)
        front.remove(element_min)
        # Check if this has been visited already.
        cost, pos = element_min  # Change if you named 'element' differently.
        # CHANGE 01_a: Skip the rest of the loop body if visited[pos] is > 0.
        if visited[pos] > 0:
            continue
        # Now it is visited. Mark with 1.
        # CHANGE 01_a: Set visited[pos] to 1.
        visited[pos] = 1
        # Check if the goal has been reached.
        if pos == goal:
            break  # Finished!

        # Check all neighbors.
        for dx, dy, deltacost in movements:
            # Determine new position and check bounds.
            # CHANGE 01_a:
            # - Compute new_x and new_y from old position 'pos' and dx, dy.
            # - Check that new_x is >= 0 and < extents[0], similarly for new_y.
            # - If not, skip the remaining part of this loop.
            new_x = pos[0] + dx;
            new_y = pos[1] + dy;
            if not(new_x >= 0 and new_x < extents[0] and new_y >= 0 and new_y < extents[1]):
                continue
            # Add to front if: not visited before and no obstacle.
            new_pos = (new_x, new_y)
            # CHANGE 01_a:
            # If visited is 0 and obstacles is not 255 (both at new_pos), then:
            # append the tuple (cost + deltacost, new_pos) to the front.
            if visited[new_pos] == 0 and obstacles[new_pos] != 255:
                front.append((cost + deltacost, new_pos))
    return ([], visited)

First, the test.

In [47]:
import numpy as np
from scipy.ndimage import minimum_filter
from numpy.random import default_rng
rng = default_rng()

def dt(obstacles, start):
    acc = np.full_like(obstacles, np.inf, dtype=float)
    acc[start] = 1
    m4 = np.array([[0,1,0],[1,0,1],[0,1,0]]).astype(bool)
    m4d = np.array([[1,0,1],[0,1,0],[1,0,1]]).astype(bool)
    ps2 = 2**(np.sqrt(2))
    obsm = obstacles > 0
    acc[obsm] = np.inf
    accp = np.zeros_like(acc)
    while (accp != acc).any():
        accp = acc
        acc = np.minimum(acc, 2*minimum_filter(acc, footprint=m4))
        acc[obsm] = np.inf
        acc = np.minimum(acc, ps2*minimum_filter(acc, footprint=m4d))
        acc[obsm] = np.inf
    return np.log2(acc)

def test(the_function):
    s2 = np.sqrt(2)
    movements = [ (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
                  (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2) ]
    n = 8
    for experiment in range(100):
        # Random start and goal.
        sg = rng.integers(0,n,size=(2,2))
        start, goal = tuple(sg[0]), tuple(sg[1])
        # Random number of obstacles in random places.
        obs_idx = rng.integers(0, n, size=(rng.integers(0, n*n), 2))
        obstacles = np.zeros((n,n), dtype=np.uint8)
        obstacles[obs_idx[:,0], obs_idx[:,1]] = 255
        obstacles[start] = 0  # Never on start or goal.
        obstacles[goal] = 0

        # Call user function.    
        path, visited = the_function(start, goal, obstacles, movements)
        if path != []:
            print("Returned path should be empty.")
            return False

        # Compute df.
        eps = 1e-3
        d = dt(obstacles, start)
        dg = d[goal]
        if dg < np.inf and visited[goal] != 1:
            print("Goal not in visited")
            return False
        inside = d < dg-eps
        outside = (d==np.inf) if dg==np.inf else (d > dg+eps)
        if not ((visited[inside]==1).all() and (visited[outside]==0).all()):
            print("Error: for start, goal:", start, goal, "and obstacles\n",
                  obstacles, "\nvisited was\n", visited,
                  "\nand distances were\n", np.round(d,1))
            return False
    return True
assert test(dijkstra), "Oh no, it's wrong!"

### Try your algorithm in the interactive viewer!
Here is a short manual:
- Hold the left mouse button and drag the mouse to set obstacles
- Hold the middle or right mouse button and drag the mouse to clear obstacles
- Hold shift and click the left mouse button to set the start point
- Hold shift and click the middle or right mouse button to set the goal point
- Press the Clear button below the canvas to remove all obstacles.

When you release the mouse buttons, your `dijkstra()` implementation from above will be called and the visited cells it returns will be drawn in green. This may take a few seconds (depending on the distance between your start and goal), as our implementation is not yet efficient.

In [9]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return dijkstra((x1,y1), (x2,y2), obstacles, movements)
MyPPViewer(200, 150).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

## The Algorithm of Dijkstra: distance from the start node

Next, instead of just marking which nodes are visited, we will instead record the distance of each visited node from the start node. We will use the same array `visited` for this, so the major change in comparison to the previous code will be to store the "cost from start" instead of "1" when we visit a node. As explained in the video, we will also start our search with a very small positive cost (instead of 0.0), to keep the semantics of "any node with cost greater than zero has been visited".

In [48]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/n_xxEIwVO44" if "YouTube" in globals() else "//av.tib.eu/player/49041",
       width=560, height=315)

### Programming assignment: modify your previous code so that the distances are returned (5 Points).

In [49]:
# pp_01_b_dijkstra_visited
def dijkstra_2(start, goal, obstacles, movements):
    """Dijkstra's algorithm, second version records cost in 'visited' array."""
    # In the beginning, the start is the only element in our front.
    # The first element is the cost of the path from the start to the point.
    # The second element is the position (cell) of the point.
    front = [ (0.001, start) ]  # CHANGE 01_b: set the cost to e.g. 0.001

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited = np.zeros(extents, dtype=np.float32)

    # While there are elements to investigate in our front.
    # YOUR CODE HERE
    #raise NotImplementedError()

    while front:
        # Get smallest item and remove it from front.
        element_min = min(front)
        front.remove(element_min)
        # Check if this has been visited already.
        cost, pos = element_min
        if visited[pos] > 0:
            continue
        # Now it is visited. Mark with cost.
        # CHANGE 01_b: set visited[pos] to cost instead of 1.
        
        visited[pos] = cost
        # Check if the goal has been reached.
        if pos == goal:
            break  # Finished!

        # Check all neighbors.
        for dx, dy, deltacost in movements:
            # Determine new position and check bounds.
            new_x = pos[0] + dx;
            new_y = pos[1] + dy;
            if not(new_x >= 0 and new_x < extents[0] and new_y >= 0 and new_y < extents[1]):
                continue
            # Add to front if: not visited before and no obstacle.
            new_pos = (new_x, new_y)
            if visited[new_pos] == 0 and obstacles[new_pos] != 255:
                front.append((cost + deltacost, new_pos))
    return ([], visited)

First the test. This test is also very similar to the previous one, except that it tests for the correct distances instead of just testing if a node is visited or not.

In [50]:
import numpy as np
from scipy.ndimage import minimum_filter
from numpy.random import default_rng
rng = default_rng()

def dt(obstacles, start):
    acc = np.full_like(obstacles, np.inf, dtype=float)
    acc[start] = 1
    m4 = np.array([[0,1,0],[1,0,1],[0,1,0]]).astype(bool)
    m4d = np.array([[1,0,1],[0,1,0],[1,0,1]]).astype(bool)
    ps2 = 2**(np.sqrt(2))
    obsm = obstacles > 0
    acc[obsm] = np.inf
    accp = np.zeros_like(acc)
    while (accp != acc).any():
        accp = acc
        acc = np.minimum(acc, 2*minimum_filter(acc, footprint=m4))
        acc[obsm] = np.inf
        acc = np.minimum(acc, ps2*minimum_filter(acc, footprint=m4d))
        acc[obsm] = np.inf
    return np.log2(acc)

def test(the_function):
    s2 = np.sqrt(2)
    movements = [ (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
                  (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2) ]
    n = 8
    for experiment in range(100):
        # Random start and goal.
        sg = rng.integers(0,n,size=(2,2))
        start, goal = tuple(sg[0]), tuple(sg[1])
        # Random number of obstacles in random places.
        obs_idx = rng.integers(0, n, size=(rng.integers(0, n*n), 2))
        obstacles = np.zeros((n,n), dtype=np.uint8)
        obstacles[obs_idx[:,0], obs_idx[:,1]] = 255
        obstacles[start] = 0  # Never on start or goal.
        obstacles[goal] = 0

        # Call user function.    
        path, visited = the_function(start, goal, obstacles, movements)
        if path != []:
            print("Returned path should be empty.")
            return False
        offset = visited[start]
        if offset <= 0 or offset >= 1:
            print("You should start the search with a small positive offset.")
            return False

        # Compute df.
        eps = 1e-3
        d = dt(obstacles, start) + offset
        dg = d[goal]
        if dg < np.inf and not (dg-eps < visited[goal] < dg+eps):
            print("Goal has wrong cost.")
            return False
        inside = d < dg-eps
        outside = (d==np.inf) if dg==np.inf else (d > dg+eps)
        if not (np.abs(visited[inside] - d[inside]) < eps).all() or\
           not (visited[outside]==0).all():
            print("Error: for start, goal:", start, goal, "and obstacles\n",
                  obstacles, "\nvisited was\n", np.round(visited,1),
                  "\nand distances were\n", np.round(d,1))
            return False
    return True
assert test(dijkstra_2), "Oh no, it's wrong!"

### Then, have a look!
Now after setting start and goal (remember: use shift-left click and shift-middle/right click to set start and goal, respectively), you will see a "green gradient" instead of a "flat green" for all visited nodes. Also play with setting and removing obstacles!

In [51]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return dijkstra_2((x1,y1), (x2,y2), obstacles, movements)
MyPPViewer(200, 150).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

## Speeding up the algorithm of Dijkstra by adding a heap.

In this modification, we will keep the functionality, but improve the speed considerably. If you know about the concept of *asymptotic runtime*, and/or [*Big O notation*](https://en.wikipedia.org/wiki/Big_O_notation), our modification will bring us from an $O(n^2)$ algorithm to a $O(n\cdot \log n)$ algorithm, or in words, from a quadratic to a quasilinear algorithm.

In [52]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/FEdid3OXTcw" if "YouTube" in globals() else "//av.tib.eu/player/49042",
       width=560, height=315)

### Programming assignment: modify your previous code to use a heap (5 Points).

In [53]:
# pp_01_c_dijkstra_heap
from heapq import heappush, heappop
def dijkstra_3(start, goal, obstacles, movements):
    """Dijkstra's algorithm. Third version introduces a heap and is faster."""
    # In the beginning, the start is the only element in our front.
    # The first element is the cost of the path from the start to the point.
    # The second element is the position (cell) of the point.
    front = [ (0.001, start) ]

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited = np.zeros(extents, dtype=np.float32)

    # While there are elements to investigate in our front.
    # YOUR CODE HERE
    #raise NotImplementedError()
    while front:
        # Get smallest item and remove from front.
        # CHANGE 01_c: replace the minimum search and removal of element
        #   by a single call to heappop()

        element_min = heappop(front)
        
        cost, pos = element_min
        
        # Check if this has been visited already.
        if visited[pos] > 0:
            continue
    

        # Now it is visited. Mark with cost.
        visited[pos] = cost
        # Check if the goal has been reached.
        if pos == goal:
            break  # Finished!

        # Check all neighbors.
        for dx, dy, deltacost in movements:
            # Determine new position and check bounds.
            new_x = pos[0] + dx;
            new_y = pos[1] + dy;
            if not(new_x >= 0 and new_x < extents[0] and new_y >= 0 and new_y < extents[1]):
                continue
            # Add to front if: not visited before and no obstacle.
            new_pos = (new_x, new_y)
            # CHANGE 01_c: instead of calling append() on the list, use
            #   heappush(). This will move the new tuple to the correct
            #   location in the heap.
            if visited[new_pos] == 0 and obstacles[new_pos] != 255:
                heappush(front, (cost+deltacost, new_pos))
    return ([], visited)

First, the test.

In [54]:
import numpy as np
from scipy.ndimage import minimum_filter
from numpy.random import default_rng
rng = default_rng()

def dt(obstacles, start):
    acc = np.full_like(obstacles, np.inf, dtype=float)
    acc[start] = 1
    m4 = np.array([[0,1,0],[1,0,1],[0,1,0]]).astype(bool)
    m4d = np.array([[1,0,1],[0,1,0],[1,0,1]]).astype(bool)
    ps2 = 2**(np.sqrt(2))
    obsm = obstacles > 0
    acc[obsm] = np.inf
    accp = np.zeros_like(acc)
    while (accp != acc).any():
        accp = acc
        acc = np.minimum(acc, 2*minimum_filter(acc, footprint=m4))
        acc[obsm] = np.inf
        acc = np.minimum(acc, ps2*minimum_filter(acc, footprint=m4d))
        acc[obsm] = np.inf
    return np.log2(acc)

def test(the_function):
    s2 = np.sqrt(2)
    movements = [ (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
                  (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2) ]
    n = 8
    for experiment in range(100):
        # Random start and goal.
        sg = rng.integers(0,n,size=(2,2))
        start, goal = tuple(sg[0]), tuple(sg[1])
        # Random number of obstacles in random places.
        obs_idx = rng.integers(0, n, size=(rng.integers(0, n*n), 2))
        obstacles = np.zeros((n,n), dtype=np.uint8)
        obstacles[obs_idx[:,0], obs_idx[:,1]] = 255
        obstacles[start] = 0  # Never on start or goal.
        obstacles[goal] = 0

        # Call user function.    
        path, visited = the_function(start, goal, obstacles, movements)
        if path != []:
            print("Returned path should be empty.")
            return False
        offset = visited[start]
        if offset <= 0 or offset >= 1:
            print("You should start the search with a small positive offset.")
            return False

        # Compute df.
        eps = 1e-3
        d = dt(obstacles, start) + offset
        dg = d[goal]
        if dg < np.inf and not (dg-eps < visited[goal] < dg+eps):
            print("Goal has wrong cost.")
            return False
        inside = d < dg-eps
        outside = (d==np.inf) if dg==np.inf else (d > dg+eps)
        if not (np.abs(visited[inside] - d[inside]) < eps).all() or\
           not (visited[outside]==0).all():
            print("Error: for start, goal:", start, goal, "and obstacles\n",
                  obstacles, "\nvisited was\n", np.round(visited,1),
                  "\nand distances were\n", np.round(d,1))
            return False
    return True
assert test(dijkstra_3), "Oh no, it's wrong!"

### Try it out interactively!
You should notice that the solution now runs considerably faster.

In [55]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return dijkstra_3((x1,y1), (x2,y2), obstacles, movements)
MyPPViewer(200, 150).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

## Reconstructing the path
So far, we found out the shortest distance to the goal, but we did not figure out *how to get from start to goal*. As we will see, this requires only minor modifications.

In [56]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/sVSV2lOq6UM" if "YouTube" in globals() else "//av.tib.eu/player/49043",
       width=560, height=315)

### Programming assignment: reconstruct the path (5 Points).

In [57]:
# pp_01_d_dijkstra_path
def dijkstra_4(start, goal, obstacles, movements):
    """Dijkstra's algorithm. Fourth version also returns optimal path."""
    # In the beginning, the start is the only element in our front.
    # The first element is the cost of the path from the start to the point.
    # The second element is the position (cell) of the point.
    # The third component is the position we came from when entering the tuple
    #   to the front.
    front = [ (0.001, start,None) ]  # CHANGE 01_d: Add None to this tuple.

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited = np.zeros(extents, dtype=np.float32)

    # Also, we use a dictionary to remember where we came from.
    came_from = {}  # CHANGE

    # While there are elements to investigate in our front.
    # YOUR CODE HERE
    #raise NotImplementedError()

    while front:
        # Get smallest item and remove from front.
        element_min = heappop(front)
        # Check if this has been visited already.
        cost, pos, previous = element_min  # CHANGE 01_d: add 'previous' (as shown).
        if visited[pos] > 0:
            continue
        # Now it is visited. Mark with cost.
        visited[pos] = cost
        # Also remember that we came from previous when we marked pos.
        # CHANGE 01_d: enter 'previous' (value) into the 'came_from' dictionary
        #   at index (key) 'pos'.
        came_from[pos] = previous 
        # Check if the goal has been reached.
        if pos == goal:
            break  # Finished!

        # Check all neighbors.
        for dx, dy, deltacost in movements:
            # Determine new position and check bounds.
            new_x = pos[0] + dx;
            new_y = pos[1] + dy;
            if not(new_x >= 0 and new_x < extents[0] and new_y >= 0 and new_y < extents[1]):
                continue
            # Add to front if: not visited before and no obstacle.
            new_pos = (new_x, new_y)
            # CHANGE 01_d: When push'ing the new tuple to the heap, make
            #   sure it is a 3-tuple, with the last component of the tuple
            #   being the position 'we came from' (which is 'pos').
            if visited[new_pos] == 0 and obstacles[new_pos] != 255:
                heappush(front, (cost+deltacost, new_pos, pos))
    # CHANGE 01_d: Make sure to include the following code, which 'unwinds'
    #   the path from goal to start, using the came_from dictionary.
    #   Make sure to include the (modified!) return statement, otherwise
    #   the path will not show up in the visualization.

    # Reconstruct path, starting from goal.
    path = []
    if pos == goal:  # If we reached the goal, unwind backwards.
        while pos:
            path.append(pos)
            pos = came_from[pos]
        path.reverse()  # Reverse so that path is from start to goal.

    return (path, visited)

As usual, first the test...

In [58]:
import numpy as np
from scipy.ndimage import minimum_filter
from numpy.random import default_rng
rng = default_rng()

def dt(obstacles, start):
    acc = np.full_like(obstacles, np.inf, dtype=float)
    acc[start] = 1
    m4 = np.array([[0,1,0],[1,0,1],[0,1,0]]).astype(bool)
    m4d = np.array([[1,0,1],[0,1,0],[1,0,1]]).astype(bool)
    ps2 = 2**(np.sqrt(2))
    obsm = obstacles > 0
    acc[obsm] = np.inf
    accp = np.zeros_like(acc)
    while (accp != acc).any():
        accp = acc
        acc = np.minimum(acc, 2*minimum_filter(acc, footprint=m4))
        acc[obsm] = np.inf
        acc = np.minimum(acc, ps2*minimum_filter(acc, footprint=m4d))
        acc[obsm] = np.inf
    return np.log2(acc)

def test(the_function):
    s2 = np.sqrt(2)
    movements = [ (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
                  (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2) ]
    n = 8
    for experiment in range(100):
        # Random start and goal.
        sg = rng.integers(0,n,size=(2,2))
        start, goal = tuple(sg[0]), tuple(sg[1])
        # Random number of obstacles in random places.
        obs_idx = rng.integers(0, n, size=(rng.integers(0, n*n), 2))
        obstacles = np.zeros((n,n), dtype=np.uint8)
        obstacles[obs_idx[:,0], obs_idx[:,1]] = 255
        obstacles[start] = 0  # Never on start or goal.
        obstacles[goal] = 0

        # Call user function.    
        path, visited = the_function(start, goal, obstacles, movements)
        offset = visited[start]
        if offset <= 0 or offset >= 1:
            print("You should start the search with a small positive offset.")
            return False

        # Compute df.
        eps = 1e-3
        d = dt(obstacles, start) + offset
        dg = d[goal]
        if dg < np.inf and not (dg-eps < visited[goal] < dg+eps):
            print("Goal has wrong cost.")
            return False
        inside = d < dg-eps
        outside = (d==np.inf) if dg==np.inf else (d > dg+eps)
        if not (np.abs(visited[inside] - d[inside]) < eps).all() or\
           not (visited[outside]==0).all():
            print("Error: for start, goal:", start, goal, "and obstacles\n",
                  obstacles, "\nvisited was\n", np.round(visited,1),
                  "\nand distances were\n", np.round(d,1))
            return False

        # Some path checks.
        if dg < np.inf:
            if tuple(path[0]) != start or\
               tuple(path[-1]) != goal:
                print("Path start or goal is wrong.")
                return False
            # Check that path has monotonously increasing cost.
            costs = [d[p] for p in path]
            if costs != sorted(costs):
                print("Path is wrong - should have increasing cost.")
                return False
            # Check that path proceeds in steps of one.
            if len(path) > 1:
                steps = [ (p[0],p[1],q[0],q[1]) for p, q in zip(path, path[1:])]
                if max(abs(t[0]-t[2]) for t in steps) > 1 or\
                   max(abs(t[1]-t[3]) for t in steps) > 1:
                    print("Path should have only +/-1 steps in x and y.")
                    return False
    return True
assert test(dijkstra_4), "Oh no, it's wrong!"

### Try it out... play with adding obstacles and watch how the path changes!

In [59]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return dijkstra_4((x1,y1), (x2,y2), obstacles, movements)
MyPPViewer(200, 150).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

## The cost function, and a greedy path search algorithm

In [60]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/Lq6XC9rWRDs" if "YouTube" in globals() else "//av.tib.eu/player/49044",
       width=560, height=315)

### Quiz: Does the *greedy* algorithm, described in the video, provide optimal results? (2 Points)
- A: Its speed is optimal.
- B: It may not find the path to the goal.
- C: There may be a shorter path.
- D: There is always a shorter path.

In [23]:
the_greedy_fulfills = "C"
# YOUR CODE HERE
#raise NotImplementedError()

In [24]:
from hashlib import shake_128
def public_string_test(the_answer_string, reference):
    m = shake_128()
    m.update(the_answer_string.encode())
    return m.hexdigest(4) == reference
assert public_string_test("greedy="+the_greedy_fulfills.strip().lower(), 'dcef8344'), "Oh no, your answer is wrong!"

## The (famous) A* algorithm
The A* algorithm (pronounced "A-star") is widely used, since it is faster than Dijkstra's original algorithm. It reduces the number of unnecessarily visited nodes, by "walking towards the goal", or "walking less in the wrong direction". To achieve this, it needs to integrate some "information about the goal" into the algorithm. As we will see, all there is to do is to cleverly modify the cost function of our previous algorithm, so that it does not only contain the cost from the start to the node, but also an estimate of the remaining cost to the goal. That is, the A* algorithm is nothing else than Dijkstra's algorithm, with a modified cost function.

In [61]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/oMR-wIu38_s" if "YouTube" in globals() else "//av.tib.eu/player/49045",
       width=560, height=315)

### Programming assignment: modify the algorithm so that it performs A* search (10 Points).

In [62]:
# pp_01_e_astar
def distance(p, q):
    """Helper function to compute distance between two points."""
    return np.sqrt((p[0]-q[0])**2 + (p[1]-q[1])**2)

def astar(start, goal, obstacles, movements):
    """A* algorithm."""
    # In the beginning, the start is the only element in our front.
    # NOW, the first element is the total cost through the point, which is
    #   the cost from start to point plus the estimated cost to the goal.
    # The second element is the cost of the path from the start to the point.
    # The third element is the position (cell) of the point.
    # The fourth component is the position we came from when entering the tuple
    #   to the front.
    # CHANGE 01_e: add the total cost of the start node as FIRST ELEMENT to
    #   the tuple.
    front = [ (0,0.001, start, None) ]

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited = np.zeros(extents, dtype=np.float32)

    # Also, we use a dictionary to remember where we came from.
    came_from = {}

    # While there are elements to investigate in our front.
    # YOUR CODE HERE
    #raise NotImplementedError()
    while front:
        # Get smallest item and remove from front.
        element_min = heappop(front)
        # Check if this has been visited already.
        # CHANGE 01_e: use the following line as shown.
        total_cost, cost, pos, previous = element_min
        if visited[pos] > 0:
            continue
        # Now it has been visited. Mark with cost.
        visited[pos] = cost
        # Also remember that we came from previous when we marked pos.
        came_from[pos] = previous 
        # Check if the goal has been reached.
        if pos == goal:
            break  # Finished!

        # Check all neighbors.
        for dx, dy, deltacost in movements:
            # Determine new position and check bounds.
            new_x = pos[0] + dx;
            new_y = pos[1] + dy;
            if not(new_x >= 0 and new_x < extents[0] and new_y >= 0 and new_y < extents[1]):
                continue
            # Add to front if: not visited before and no obstacle.
            new_pos = (new_x, new_y)
            # CHANGE 01_e: When push'ing the new tuple to the heap,
            #   this now has to be a 4-tuple:
            #   (new_total_cost, new_cost, new_pos, pos)
            #   where new_cost is the cost from start to new_pos,
            #   and new_total_cost is new_cost plus the estimated cost
            #   from new_pos to goal.
            if visited[new_pos] == 0 and obstacles[new_pos] != 255:
                new_cost = cost+deltacost
                new_total_cost = new_cost+distance(new_pos, goal)
                heappush(front, (new_total_cost,new_cost, new_pos, pos))
    # Reconstruct path, starting from goal.
    path = []
    if pos == goal:  # If we reached the goal, unwind backwards.
        while pos:
            path.append(pos)
            pos = came_from[pos]
        path.reverse()  # Reverse so that path is from start to goal.

    return (path, visited)

First the test...

In [63]:
import numpy as np
from scipy.ndimage import minimum_filter
from numpy.random import default_rng
rng = default_rng()

def dt(obstacles, start):
    acc = np.full_like(obstacles, np.inf, dtype=float)
    acc[start] = 1
    m4 = np.array([[0,1,0],[1,0,1],[0,1,0]]).astype(bool)
    m4d = np.array([[1,0,1],[0,1,0],[1,0,1]]).astype(bool)
    ps2 = 2**(np.sqrt(2))
    obsm = obstacles > 0
    acc[obsm] = np.inf
    accp = np.zeros_like(acc)
    while (accp != acc).any():
        accp = acc
        acc = np.minimum(acc, 2*minimum_filter(acc, footprint=m4))
        acc[obsm] = np.inf
        acc = np.minimum(acc, ps2*minimum_filter(acc, footprint=m4d))
        acc[obsm] = np.inf
    return np.log2(acc)

def test(the_function):
    s2 = np.sqrt(2)
    movements = [ (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
                  (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2) ]
    n = 8
    for experiment in range(100):
        # Random start and goal.
        sg = rng.integers(0,n,size=(2,2))
        start, goal = tuple(sg[0]), tuple(sg[1])
        # Random number of obstacles in random places.
        obs_idx = rng.integers(0, n, size=(rng.integers(0, n*n), 2))
        obstacles = np.zeros((n,n), dtype=np.uint8)
        obstacles[obs_idx[:,0], obs_idx[:,1]] = 255
        obstacles[start] = 0  # Never on start or goal.
        obstacles[goal] = 0

        # Call user function.    
        path, visited = the_function(start, goal, obstacles, movements)
        offset = visited[start]
        if offset <= 0 or offset >= 1:
            print("You should start the search with a small positive offset.")
            return False

        # Compute df.
        eps = 1e-3
        d = dt(obstacles, start) + offset
        dg = d[goal]
        if dg < np.inf and not (dg-eps < visited[goal] < dg+eps):
            print("Goal has wrong cost.")
            return False
        xg, yg = np.meshgrid(np.arange(n)-goal[1], np.arange(n)-goal[0])
        d2 = d + np.sqrt(xg*xg+yg*yg)
        inside = d2 < dg-eps
        outside = (d2==np.inf) if dg==np.inf else (d2 > dg+eps)
        if not (np.abs(visited[inside] - d[inside]) < eps).all() or\
           not (visited[outside]==0).all():
            print("Error: for start, goal:", start, goal, "and obstacles\n",
                  obstacles, "\nvisited was\n", np.round(visited,1),
                  "\nand distances were\n", np.round(d,1))
            return False

        # Some path checks.
        if dg < np.inf:
            if tuple(path[0]) != start or\
               tuple(path[-1]) != goal:
                print("Path start or goal is wrong.")
                return False
            # Check that path has monotonously increasing cost.
            costs = [d[p] for p in path]
            if costs != sorted(costs):
                print("Path is wrong - should have increasing cost.")
                return False
            # Check that path proceeds in steps of one.
            if len(path) > 1:
                steps = [ (p[0],p[1],q[0],q[1]) for p, q in zip(path, path[1:])]
                if max(abs(t[0]-t[2]) for t in steps) > 1 or\
                   max(abs(t[1]-t[3]) for t in steps) > 1:
                    print("Path should have only +/-1 steps in x and y.")
                    return False
    return True
assert test(astar), "Oh no, it's wrong!"

### Try out the A* algorithm!
You should notice:
- It visits far less nodes, especially, it omits the nodes far away from the goal.
- It is much faster.

In [64]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return astar((x1,y1), (x2,y2), obstacles, movements)
MyPPViewer(200, 150).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

## A* with an added potential function.
This is our first attempt to get a more "realisitc" solution to our path. We notice that in our previous solution, the path will often go very close along the obstacles. This is of course correct, given that the cost function minimizes the path cost only and does not take "safety envelopes" of obstacles into account. However, in reality, we would like to have a trade-off between the shortest path and the distance to obstacles, as explained in the following video.

In [65]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/nQtmUH3UCi4" if "YouTube" in globals() else "//av.tib.eu/player/49046",
       width=560, height=315)

### Programming assignment: adding the potential function (5 Points).

In [43]:
# pp_01_f_astar_potential_function
def astar_2(start, goal, obstacles, movements):
    """A* algorithm."""
    # In the beginning, the start is the only element in our front.
    # The first element is the cost of the path from the start to the point.
    # The second element is the position (cell) of the point.
    front = [ (distance(start, goal), 0.001, start, None) ]
    #front = [ (start) ]
    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited = np.zeros(extents, dtype=np.float32)

    # Also, we use a dictionary to remember where we came from.
    came_from = {}

    # While there are elements to investigate in our front.
    # YOUR CODE HERE
    #raise NotImplementedError()
    while front:
        # Get smallest item and remove from front.
        element_min = heappop(front)
        # Check if this has been visited already.
        total_cost, cost, pos, previous = element_min
        if visited[pos] > 0:
            continue
        # Now it has been visited. Mark with cost.
        visited[pos] = cost
        # Also remember that we came from previous when we marked pos.
        came_from[pos] = previous 
        # Check if the goal has been reached.
        if pos == goal:
            break  # Finished!

        # Check all neighbors.
        for dx, dy, deltacost in movements:
            # Determine new position and check bounds.
            new_x = pos[0] + dx;
            new_y = pos[1] + dy;
            if not(new_x >= 0 and new_x < extents[0] and new_y >= 0 and new_y < extents[1]):
                continue
            # Add to front if: not visited before and no obstacle.
            new_pos = (new_x, new_y)
            # CHANGE 01_f: add the 'obstacle cost' to new_cost AND
            #   new_total_cost. As obstacle cost, use:
            #   obstacles[new_pos] / 64.
            #   The divider 64 determines the tradeoff between 'avoiding
            #   obstacles' and 'driving longer distances'. You may experiment
            #   with other values, but make sure you set it back to 64 for
            #   the grader.
            # Please check again that you do not enter a tuple into
            # the heap if it has been visited already or its obstacles[]
            # value is 255 (check for '==255', not for '> 0').
            if visited[new_pos] == 0 and obstacles[new_pos] != 255:
                new_cost = cost+deltacost + obstacles[new_pos]/64
                new_total_cost = new_cost+distance(new_pos, goal) 
                heappush(front, (new_total_cost, new_cost, new_pos, pos))
                
    # Reconstruct path, starting from goal.
    path = []
    if pos == goal:  # If we reached the goal, unwind backwards.
        while pos:
            path.append(pos)
            pos = came_from[pos]
        path.reverse()  # Reverse so that path is from start to goal.

    return (path, visited)

First, the test...

In [66]:
import numpy as np
from scipy.ndimage import minimum_filter
from scipy.ndimage.morphology import distance_transform_edt
from numpy.random import default_rng
rng = default_rng()

def dt(obstacles, potential, start):
    acc = np.full_like(obstacles, np.inf, dtype=float)
    acc[start] = 1
    m4 = np.array([[0,1,0],[1,0,1],[0,1,0]]).astype(bool)
    m4d = np.array([[1,0,1],[0,1,0],[1,0,1]]).astype(bool)
    ps2, pot2 = 2**(np.sqrt(2)), 2**(potential/64)
    obsm = obstacles > 0
    acc[obsm] = np.inf
    accp = np.zeros_like(acc)
    while (accp != acc).any():
        accp = acc
        acc = np.minimum(acc, 2*minimum_filter(acc, footprint=m4)*pot2)
        acc[obsm] = np.inf
        acc = np.minimum(acc, ps2*minimum_filter(acc, footprint=m4d)*pot2)
        acc[obsm] = np.inf
    return np.log2(acc)

def test(the_function):
    s2 = np.sqrt(2)
    movements = [ (1,0, 1.), (0,1, 1.), (-1,0, 1.), (0,-1, 1.),
                  (1,1, s2), (-1,1, s2), (-1,-1, s2), (1,-1, s2) ]
    n = 16
    for experiment in range(100):
        # Random start and goal.
        sg = rng.integers(0,n,size=(2,2))
        start, goal = tuple(sg[0]), tuple(sg[1])
        # Random number of obstacles in random places.
        obs_idx = rng.integers(0, n, size=(rng.integers(0, n), 2))
        obstacles = np.zeros((n,n), dtype=np.uint8)
        obstacles[obs_idx[:,0], obs_idx[:,1]] = 255
        obstacles[start] = 0  # Never on start or goal.
        obstacles[goal] = 0
        # Add potential.
        dt_edt = distance_transform_edt(255-obstacles)
        potential = np.maximum(255.0 - dt_edt*16, 0).astype(np.uint8)\
                  - obstacles
        # Call user function.    
        path, visited = the_function(start, goal, obstacles+potential,
                                     movements)
        offset = visited[start]
        if offset <= 0 or offset >= 1:
            print("You should start the search with a small positive offset.")
            return False
        # Compute df. This time, include potential.
        eps = 1e-3
        d = dt(obstacles, potential, start) + offset
        dg = d[goal]
        if dg < np.inf and not (dg-eps < visited[goal] < dg+eps):
            print("Goal has wrong cost.")
            return False
        xg, yg = np.meshgrid(np.arange(n)-goal[1], np.arange(n)-goal[0])
        d2 = d + np.sqrt(xg*xg+yg*yg)
        inside = d2 < dg-eps
        outside = (d2==np.inf) if dg==np.inf else (d2 > dg+eps)
        if not (np.abs(visited[inside] - d[inside]) < eps).all() or\
           not (visited[outside]==0).all():
            print("Error: for start, goal:", start, goal, "and obstacles\n",
                  obstacles, "\nvisited was\n", np.round(visited,1),
                  "\nand distances were\n", np.round(d,1))
            return False

        # Some path checks.
        if dg < np.inf:
            if tuple(path[0]) != start or\
               tuple(path[-1]) != goal:
                print("Path start or goal is wrong.")
                return False
            # Check that path has monotonously increasing cost.
            costs = [d[p] for p in path]
            if costs != sorted(costs):
                print("Path is wrong - should have increasing cost.")
                return False
            # Check that path proceeds in steps of one.
            if len(path) > 1:
                steps = [ (p[0],p[1],q[0],q[1]) for p, q in zip(path, path[1:])]
                if max(abs(t[0]-t[2]) for t in steps) > 1 or\
                   max(abs(t[1]-t[3]) for t in steps) > 1:
                    print("Path should have only +/-1 steps in x and y.")
                    return False
    return True
assert test(astar_2), "Oh no, it's wrong!"

### Then, again, try it out interactively!
Try to invent some "parking lot" examples on your own!

In [67]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return astar_2((x1,y1), (x2,y2), obstacles+potential, movements)
MyPPViewer(200, 150, enable_potential_field=True).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

# Search in kinematic space
So far, we have searched in a grid of cells. This can be useful for robots (like the one we had in the earlier units) which are able to turn on the spot. However, imagine a normal car. It can only move straight or along a circle, whose radius is determined by the steering wheel, and the radius cannot be smaller than a given minimum radius, determined by the construction (and specified in the car's user manual).

In this last part of the unit, we will explore an algorithm that searches in *kinematic space*, so that the generated path would be suitable to control the steering wheel of a car.

In [68]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/XNZkiDOVsNk" if "YouTube" in globals() else "//av.tib.eu/player/49047",
       width=560, height=315)

### Quiz: How can we get our algorithm faster? (2 Points)
Our algorithm is slow when the distance between the start and goal is large, especially, for some orientations of the goal pose. What would be the proper solution if we want to apply our algorithm to (much) larger datasets?
- A: We need a faster processor.
- B: We need a computer with more memory.
- C: We need a faster hard disk.
- D: None of the above solutions will help.

In [69]:
what_we_should_do = "D"
# YOUR CODE HERE
#raise NotImplementedError()

In [70]:
from hashlib import shake_128
def public_string_test(the_answer_string, reference):
    m = shake_128()
    m.update(the_answer_string.encode())
    return m.hexdigest(4) == reference
assert public_string_test("do"+what_we_should_do.strip().lower(), '464329be'), "Oh no, your answer is wrong!"

### Run the following cells to play a bit with the "kinematic statespace routing".
Note that this program is provided, it is not a programming assignment -- so you can start playing right away!

In [71]:
# pp_02_a_car_statespace
# --------------------------------------------------------------------------
# Helper class for curved segments.
# --------------------------------------------------------------------------
class CurveSegment:
    @staticmethod
    def end_pose(start_pose, curvature, length):
        """Returns end pose, given start pose, curvature and length."""
        x, y, theta = start_pose
        if curvature == 0.0:
            # Linear movement.
            x, y = x + length * np.cos(theta), y + length * np.sin(theta)
            return (x, y, theta)
        else:
            # Curve segment of radius 1/curvature.
            tx, ty, radius = np.cos(theta), np.sin(theta), 1.0/curvature
            xc, yc = x - radius * ty, y + radius * tx  # Center of circle.
            angle = length / radius
            cosa, sina = np.cos(angle), np.sin(angle)
            nx = xc + radius * (cosa * ty + sina * tx)
            ny = yc + radius * (sina * ty - cosa * tx)
            ntheta = (theta + angle + np.pi) % (2*np.pi) - np.pi
            return (nx, ny, ntheta)

    @staticmethod
    def segment_points(start_pose, curvature, length, delta_length):
        """Return points of segment, at delta_length intervals."""
        l = 0.0
        delta_length = np.copysign(delta_length, length)
        points = []
        while abs(l) < abs(length):
            points.append(CurveSegment.end_pose(start_pose, curvature, l)[0:2])
            l += delta_length
        return points

# --------------------------------------------------------------------------
# Exploration of car's kinematic state space.
# --------------------------------------------------------------------------

# Allowed movements. These are given as tuples: (curvature, length).
kinematic_movements = [(1.0/10, 5.0), (0.0, 5.0), (-1.0/10, 5.0)]

# Helper functions.
def states_close(p, q):
    """Checks if two poses (x, y, heading) are the same within a tolerance."""
    d_angle = abs((p[2]-q[2]+np.pi) % (2*np.pi) - np.pi)
    # For the sake of simplicity, tolerances are hardcoded here:
    # 15 degrees for the heading angle, 2.0 for the position.
    return d_angle < np.radians(15.) and distance(p, q) <= 2.0

def explore_statespace(start_pose, goal_pose, obstacles, movements):
    """Try to find a sequence of curved segments from start to goal."""

    # Init front: the only element is the start pose.
    # Each tuple contains:
    # (total_cost, cost, pose, previous_pose, move_index).
    front = [ (distance(start_pose, goal_pose), 0.0001, start_pose,
               None, None) ]

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited_cells = np.zeros(extents, dtype=np.float32)

    # Also, no states have been generated.
    generated_states = {}

    while front:
        # Stop search if the front gets too large.
        if len(front) > 500000:
            print("Timeout.")
            break

        # Pop smallest item from heap.
        total_cost, cost, pose, previous_pose, move = heappop(front)

        # Mark visited_cell which encloses this pose.
        visited_cells[int(pose[0]), int(pose[1])] = cost

        # Enter into visited states, also use this to remember where we
        # came from and which move we used.
        generated_states[pose] = (previous_pose, move)

        # Check if we have (approximately) reached the goal.
        if states_close(pose, goal_pose):
            break  # Finished!

        # Check all possible movements.
        for i in range(len(movements)):
            curvature, length = movements[i]

            # Determine new pose and check bounds.
            new_pose = CurveSegment.end_pose(pose, curvature, length)
            if not (0 <= new_pose[0] < extents[0] and \
                    0 <= new_pose[1] < extents[1]):
                continue

            # Add to front if there is no obstacle.
            if not obstacles[(int(new_pose[0]), int(new_pose[1]))] == 255:
                new_cost = cost + abs(length)
                total_cost = new_cost + distance(new_pose, goal_pose)
                heappush(front, (total_cost, new_cost, new_pose, pose, i))

    # Reconstruct path, starting from goal.
    if states_close(pose, goal_pose):
        path = []
        path.append(pose[0:2])
        pose, move = generated_states[pose]
        while pose:
            points = CurveSegment.segment_points(pose,
                movements[move][0], movements[move][1], 2.0)
            path.extend(reversed(points))
            pose, move = generated_states[pose]
        path.reverse()
    else:
        path = []

    return path, visited_cells

### Run the interactive viewer!
As mentioned in the video, the user interface for setting the start and goal point has been extended, as now, the algorithm takes *poses* (position and heading), not just points. To set a pose, proceed exactly as before, but drag the mouse around to set the heading. That is, **to set the start point, hold shift, then also click and hold the left mouse button and drag the mouse around**. You will then see the small heading indicator following your mouse location. For the goal point, do the same using the middle or right mouse button.

In [72]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        return explore_statespace((x1,y1,t1), (x2,y2,t2), obstacles, kinematic_movements)
MyPPViewer(100, 75, scale_factor=5, draw_start_goal_heading=True).run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

## A* in kinematic state space
Now in this final programming assignment, we will use the kinematic state space search, as before. However, we will mark the states (poses) we have already visited, and thus prevent the algorithm from exploring them again. This is similar to what we did in the A* algorithm above. As a result, we will get an algorithm of polynomial instead of exponential time complexity. Since there is an "infinite" number of continuous poses, we will discretize them in order to memorize visited poses. Discretization is done by the `index = pose_index(pose)` helper function below.

In [73]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/rnf4t00uSmk" if "YouTube" in globals() else "//av.tib.eu/player/49048",
       width=560, height=315)

### Final programming assignment: mark the poses we have already explored (5 Points).

In [75]:
# pp_02_b_car_statespace_astar
# Allowed movements. These are given as tuples: (curvature, length).
# The list contains forward and backward movements.
# Use [0:3] for forward movements only.
kinematic_movements = [(1.0/10, 5.0), (0.0, 5.0), (-1.0/10, 5.0),
                       (1.0/10, -5.0), (0.0, -5.0), (-1.0/10, -5.0)]

# Helper functions.
def pose_index(pose):
    """Given a pose, returns a discrete version (a triple index)."""
    # Again, raster sizes are hardcoded here for simplicity.
    pos_raster = 1.0
    heading_raster = np.radians(10.)
    xi = int(np.floor(pose[0] / pos_raster))
    yi = int(np.floor(pose[1] / pos_raster))
    ti = int(np.floor(pose[2] / heading_raster))
    return (xi, yi, ti)

def astar_statespace(start_pose, goal_pose, obstacles, movements):
    """Try to find a sequence of curve segments from start to goal."""

    # Init front: the only element is the start pose.
    # Each tuple contains:
    # (total_cost, cost, pose, previous_pose, move_index).
    front = [ (distance(start_pose, goal_pose), 0.0001, start_pose,
               None, None) ]

    # In the beginning, no cell has been visited.
    extents = obstacles.shape
    visited_cells = np.zeros(extents, dtype=np.float32)

    # Also, no states have been generated.
    generated_states = {}

    # YOUR CODE HERE
    #raise NotImplementedError()
    while front:
        # Stop search if the front gets too large.
        if len(front) > 500000:
            print("Timeout.")
            break

        # Pop smallest item from heap.
        total_cost, cost, pose, previous_pose, move = heappop(front)

        # Compute discrete index.
        # CHANGE 02_b: compute a discrete pose index pose_idx from pose,
        #   using the function pose_index() above.
        pose_idx = pose_index(pose)
        # Check if this has been visited already.
        # CHANGE 02_b: check if pose_index is in generated_states already,
        #   and if so, skip the rest of the loop. (This is the point where
        #   we prevent exponential growth.)
        if pose_idx in generated_states:
            continue
        # Mark visited_cell which encloses this pose.
        visited_cells[int(pose[0]), int(pose[1])] = cost

        # Enter into visited states, also use this to remember where we
        # came from and which move we used.
        # CHANGE 02_b: Change the following line so that the index is the
        #   discrete pose instead of the continuous pose.
        generated_states[pose_idx] = (previous_pose, move)

        # Check if we have (approximately) reached the goal.
        if states_close(pose, goal_pose):
            break  # Finished!

        # Check all possible movements.
        for i in range(len(movements)):
            curvature, length = movements[i]

            # Determine new pose and check bounds.
            new_pose = CurveSegment.end_pose(pose, curvature, length)
            if not (0 <= new_pose[0] < extents[0] and \
                    0 <= new_pose[1] < extents[1]):
                continue

            # Add to front if there is no obstacle.
            if not obstacles[(int(new_pose[0]), int(new_pose[1]))] == 255:
                new_cost = cost + abs(length)
                total_cost = new_cost + distance(new_pose, goal_pose)
                heappush(front, (total_cost, new_cost, new_pose, pose, i))

    # Reconstruct path, starting from goal.
    # (Take this part of the code as is. Note it is different from the
    #  pp_02_a code.)
    if states_close(pose, goal_pose):
        path = []
        path.append(pose[0:2])
        pose, move = generated_states[pose_index(pose)]
        while pose:
            points = CurveSegment.segment_points(pose,
                movements[move][0], movements[move][1], 2.0)
            path.extend(reversed(points))
            pose, move = generated_states[pose_index(pose)]
        path.reverse()
    else:
        path = []

    return path, visited_cells

As usual, the test. If you get "Timeout." here, you did not successfully suppress the exploration of already visited nodes.

In [76]:
import numpy as np

def test(the_function):
    move = [(1.0/10, 5.0), (0.0, 5.0), (-1.0/10, 5.0)]

    arena = (100, 100)
    obstacles = np.zeros(arena, dtype=np.uint8)
    angles = [ i*np.pi/2 - np.pi for i in range(4) ]
    path_lengths = []
    for start_heading in angles:
        start = (arena[0]//4, arena[1]//4, start_heading)
        for goal_heading in angles:
            goal = ((arena[0]*3)//4, (arena[1]*3)//4, goal_heading)
            path, visited = the_function(start, goal, obstacles, move)
            path_lengths.append(len(path))
    return path_lengths == [70,61,55,58,61,70,58,55,55,55,46,46,55,55,46,46]

assert test(astar_statespace), "Oh no, it's wrong!"

### Finally: explore the kinematic state space routing interactively!
Try different start and goal headings, setting obstacles, and enabling the `Backward` button!

Note: If at some point the path seems "to go through an obstacle", then the obstacle is simply "too thin". This happens since in our statespace exploration, we consider moves of length 5, so that "walls" can be crossed if they are less wide.

In [78]:
from ipy_ppviewer import PPViewer
class MyPPViewer(PPViewer):
    def compute(self, x1, y1, t1, x2, y2, t2, obstacles, potential):
        move = kinematic_movements if self.option_button_state() else kinematic_movements[0:3]
        return astar_statespace((x1,y1,t1), (x2,y2,t2), obstacles, move)
MyPPViewer(100, 75, scale_factor=5, draw_start_goal_heading=True, option_button_name="Backward").run()

AppLayout(children=(HBox(children=(Button(description='Clear', style=ButtonStyle(), tooltip='clear obstacles')…

# Conclusions & congratulations!

In [42]:
# If you don't see a video below, run this cell.
from IPython.display import IFrame
IFrame("https://www.youtube.com/embed/JFtZnMu6PDQ" if "YouTube" in globals() else "//av.tib.eu/player/49049",
       width=560, height=315)