# Mobile Robotics: Project

## 0. Imports

In [1]:
import cv2
import time
import numpy as np
import math
import os
import sys
import serial

%matplotlib notebook 
import matplotlib.pyplot as plt
import matplotlib.image as img
from matplotlib import colors
%matplotlib notebook

# Adding the src folder in the current directory as it contains the script
# with the Thymio class
sys.path.insert(0, os.path.join(os.getcwd(), 'src'))

from Thymio import Thymio

In [2]:
th = Thymio.serial(port="/dev/cu.usbmodem14101", refreshing_rate=0.1)

In [3]:
th.set_var_array("leds.top", [255, 0, 0])
time.sleep(0.5)
th.set_var_array("leds.top", [0, 255, 0])
time.sleep(0.5)
th.set_var_array("leds.top", [0, 0, 255])
time.sleep(0.5)
th.set_var_array("leds.top", [0, 0, 0])

## 1. Detection
The aim here is to find the position of the two Thymio robots from an image taken of the global map with obstacles.

### 1.1 Global Map
We take a picture of the global map from above and import it as well as a black and white version.

In [4]:
# Get color and grayscale images
map_img = cv2.imread('images/MapReal2.jpeg', cv2.IMREAD_COLOR)
map_bw = cv2.imread('images/MapReal2.jpeg', cv2.IMREAD_GRAYSCALE)
equ = cv2.equalizeHist(map_bw)
clahe = cv2.createCLAHE(clipLimit=0.5, tileGridSize=(8,8))
equ = clahe.apply(map_bw)

plt.figure()
plt.imshow(map_img)
plt.title("Global Map")

<IPython.core.display.Javascript object>

Text(0.5, 1.0, 'Global Map')

### 1.2 Filtering
We pre-process the black and white image through filtering and thresholding. We then apply morphological operators to the image to remove impurities and the two lines. The filtered image will later be used for Thymio detection.

In [5]:
# Pre-processing the image through filtering and thresholding
bilateral = cv2.bilateralFilter(equ,9,25,25)
thresh = cv2.threshold(bilateral, 120, 255, cv2.THRESH_BINARY_INV)[1]

# Applying morphological operators to the image
eroded = cv2.erode(thresh, None, iterations=7)
dilated = cv2.dilate(eroded, None, iterations=7)

# Plot Images
titles = ['Bilateral Filtered Map', 'Threshold Filtered Map', 'Eroded Map', 'Dilated Map']
images = [bilateral, thresh, eroded, dilated]

plt.figure(figsize=(8,8))
for i in range(len(images)):
    plt.subplot(2,2,i+1),plt.imshow(images[i])
    plt.title(titles[i])
    plt.xticks([]),plt.yticks([])
plt.show()

<IPython.core.display.Javascript object>

### 2.1 Thymio Detection
We use a blob detector to find all obstacles and the robots. We then filter out the obstacles by only keeping the blobs in the outer square where the robots are found.

In [6]:
# Setup SimpleBlobDetector parameters.
params = cv2.SimpleBlobDetector_Params()
 
# Change thresholds
params.minThreshold = 75;
params.maxThreshold = 200;
 
# Filters not used
params.filterByArea = False
params.filterByCircularity = False
params.filterByConvexity = False

# Filter by Color
params.filterByColor = True
params.blobColor = 255;

# Detect Blobs
detector = cv2.SimpleBlobDetector_create(params)

# Generate and show all keypoints
keypoints = detector.detect(dilated)
all_blobs = cv2.drawKeypoints(dilated, keypoints, np.array([]), (255,0,255), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)

# Get the position of the keypoints and filter out the obstacles
pts = cv2.KeyPoint_convert(keypoints)
robotInd = [None]*2
i = 0
j = 0
for p in pts:
    if (p[0] < 406) | (p[0] > 1625):
        if j != 1:
            robotInd[0] = i
            j = 1
        elif j == 1:
            robotInd[1] = i
                
    i+=1

# Define Thymio keypoints and save starting position
newKeypoints = [keypoints[robotInd[0]], keypoints[robotInd[1]]]
ThymioStartPos1 = pts[robotInd[1]]
ThymioStartPos2 = pts[robotInd[0]]

# Plot Images
map_with_thymios = cv2.drawKeypoints(map_img, newKeypoints, np.array([]), (255,255,0), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
titles = ['Blob Detection', 'Global Map with Thymio Detection']
images = [all_blobs, map_with_thymios]

plt.figure(figsize=(8,4))
for i in range(len(images)):
    plt.subplot(1,2,i+1),plt.imshow(images[i])
    plt.title(titles[i])
    plt.xticks([]),plt.yticks([])
plt.show()



<IPython.core.display.Javascript object>

# 2. Path Planning
The aim here is to find the fastest path from one Thymio to the other. We first need to apply dilate the filtered image to take the robots size into consideration to avoid the obstacles.

## 2.1 Dilate filtered image
Here we remove the robots from the image. We then dilate the filtered image by a bit more than half of the size of the Thymio Robot.

In [7]:
# Removing the Robots
obstacles = np.empty((len(dilated),len(dilated)))

for i in range(len(dilated)):
    for j in range(len(dilated)):
        if (dilated[i][j]>12):
            if (j > 406) & (j < 1625):
                obstacles[i][j] = 255
            else:
                obstacles[i][j] = 0
        else:
            obstacles[i][j] = 0

# Applying morphological operators to the image
morph = cv2.dilate(obstacles, None, iterations=140)

# Plot Images
titles = ['Filtered Map', 'Obstacle Map', 'Morphed Obstacle Map']
images = [dilated, obstacles, morph]

plt.figure(figsize=(9,3))
for i in range(len(images)):
    plt.subplot(1,3,i+1),plt.imshow(images[i])
    plt.title(titles[i])
    plt.xticks([]),plt.yticks([])
plt.show()

<IPython.core.display.Javascript object>

## 2.2 Creating grid
Here we great a grid of the obstacles for path planning.

In [8]:
# Size of the map
ratio = 32
grid_size = int(len(map_bw)/ratio)

def create_empty_plot(grid_size=grid_size):
    """
    Helper function to create a figure of the desired dimensions & grid
    
    :param grid_size: dimension of the map along the x and y dimensions
    :return: the fig and ax objects.
    """
    fig, ax = plt.subplots(figsize=(7,7))
    
    major_ticks = np.arange(0, grid_size+1, 5)
    minor_ticks = np.arange(0, grid_size+1, 1)
    ax.set_xticks(major_ticks)
    ax.set_xticks(minor_ticks, minor=True)
    ax.set_yticks(major_ticks)
    ax.set_yticks(minor_ticks, minor=True)
    ax.grid(which='minor', alpha=0.2)
    ax.grid(which='major', alpha=0.5)
    ax.set_ylim([-1,grid_size])
    ax.set_xlim([-1,grid_size])
    ax.grid(True)
    
    return fig, ax

In [9]:
# Morphed Obstacle Data
dataMorph = np.empty((grid_size,grid_size))
m = 0
n = 0
for i in range(len(morph)):
    if (i%ratio == 0):
        for j in range(len(morph)):
            if (j%ratio == 0):
                if (morph[i][j]>12):
                    dataMorph[m][n] = 255
                else:
                    dataMorph[m][n] = 0
                n += 1
            if (n == grid_size):
                n = 0
        m += 1
        
limit = 12 
occupancy_grid = dataMorph.copy()
occupancy_grid[dataMorph>limit] = 1
occupancy_grid[dataMorph<=limit] = 0
  
# Original Obstacle Data
dataObstacles = np.empty((grid_size,grid_size))
m = 0
n = 0
for i in range(len(obstacles)):
    if (i%ratio == 0):
        for j in range(len(obstacles)):
            if (j%ratio == 0):
                if (obstacles[i][j]>12):
                    dataObstacles[m][n] = 255
                else:
                    dataObstacles[m][n] = 0
                n += 1
            if (n == grid_size):
                n = 0
        m += 1

limit = 12 
obstacle_grid = dataObstacles.copy()
obstacle_grid[dataObstacles>limit] = 1
obstacle_grid[dataObstacles<=limit] = 0

fig, ax = create_empty_plot()
cmap = colors.ListedColormap(['white', 'slategray']) # Select the colors with which to display obstacles and free cells

# Update Thymio position for new grid
newThymioStartPos1 = ThymioStartPos1/ratio
newThymioStartPos2 = ThymioStartPos2/ratio
newThymioStartPos1[0] = int(round(newThymioStartPos1[0]))
newThymioStartPos1[1] = int(round(newThymioStartPos1[1]))
newThymioStartPos2[0] = int(round(newThymioStartPos2[0]))
newThymioStartPos2[1] = int(round(newThymioStartPos2[1]))

# Define the start and end goal
start = (int(newThymioStartPos1[1]),int(newThymioStartPos1[0]))
goal = (int(newThymioStartPos2[1]),int(newThymioStartPos2[0]))

# Displaying the map
ax.imshow(occupancy_grid.transpose(), cmap=cmap)
ax.scatter(start[0], start[1], marker="o", color = 'aquamarine', s=2300);
ax.scatter(goal[0], goal[1], marker="o", color = 'pink', s=2300);

<IPython.core.display.Javascript object>

## 2.2 A* Algorithm
We implement the A* algorithm to find the shortest path between the two robots. We use the morphed obstacle map to assure no obstacles will be hit during our global navigation

In [10]:
def _get_movements():
    """
    Get all possible 8-connectivity movements. Equivalent to get_movements_in_radius(1).
    :return: list of movements with cost [(dx, dy, movement_cost)]
    """
    s2 = math.sqrt(2)
    return [(1, 0, 1.0),
            (0, 1, 1.0),
            (-1, 0, 1.0),
            (0, -1, 1.0),
            (1, 1, s2),
            (-1, 1, s2),
            (-1, -1, s2),
            (1, -1, s2)]

def reconstruct_path(cameFrom, current):
    """
    Recurrently reconstructs the path from start node to the current node
    :param cameFrom: map (dictionary) containing for each node n the node immediately 
                     preceding it on the cheapest path from start to n 
                     currently known.
    :param current: current node (x, y)
    :return: list of nodes from start to current node
    """
    total_path = [current]
    while current in cameFrom.keys():
        # Add where the current node came from to the start of the list
        total_path.insert(0, cameFrom[current]) 
        current=cameFrom[current]
    return total_path

def A_Star(start, goal, h, coords, occupancy_grid, grid_size=grid_size):
    """
    A* for 2D occupancy grid. Finds a path from start to goal.
    h is the heuristic function. h(n) estimates the cost to reach goal from node n.
    :param start: start node (x, y)
    :param goal_m: goal node (x, y)
    :param occupancy_grid: the grid map
    :return: a tuple that contains: (the resulting path in meters, the resulting path in data array indices)
    """
    for point in [start, goal]:
        for coord in point:
            assert coord>=0 and coord<grid_size, "start or end goal not contained in the map"
            
    if occupancy_grid[start[0], start[1]]:
        raise Exception('Start node is not traversable')
    if occupancy_grid[goal[0], goal[1]]:
        raise Exception('Goal node is not traversable')
        
    movements = _get_movements()
    openSet = [start]
    closedSet = []
    cameFrom = dict()
    gScore = dict(zip(coords, [np.inf for x in range(len(coords))]))
    gScore[start] = 0
    fScore = dict(zip(coords, [np.inf for x in range(len(coords))]))
    fScore[start] = h[start]

    while openSet != []:      
        fScore_openSet = {key:val for (key,val) in fScore.items() if key in openSet}
        current = min(fScore_openSet, key=fScore_openSet.get)
        del fScore_openSet
        
        if current == goal:
            return reconstruct_path(cameFrom, current), closedSet

        openSet.remove(current)
        closedSet.append(current)
        
        for dx, dy, deltacost in movements:       
            neighbor = (current[0]+dx, current[1]+dy)
            if (neighbor[0] >= occupancy_grid.shape[0]) or (neighbor[1] >= occupancy_grid.shape[1]) or (neighbor[0] < 0) or (neighbor[1] < 0):
                continue
            if (occupancy_grid[neighbor[0], neighbor[1]]) or (neighbor in closedSet): 
                continue            
            tentative_gScore = gScore[current] + deltacost
            if neighbor not in openSet:
                openSet.append(neighbor)
            if tentative_gScore < gScore[neighbor]:
                cameFrom[neighbor] = current
                gScore[neighbor] = tentative_gScore
                fScore[neighbor] = gScore[neighbor] + h[neighbor]
                
    print("No path found to goal")
    return [], closedSet


In [11]:
# List of all coordinates in the grid
x,y = np.mgrid[0:grid_size:1, 0:grid_size:1]
pos = np.empty(x.shape + (2,))
pos[:, :, 0] = x; pos[:, :, 1] = y
pos = np.reshape(pos, (x.shape[0]*x.shape[1], 2))
coords = list([(int(x[0]), int(x[1])) for x in pos])

# Define the heuristic, here = distance to goal ignoring obstacles
h = np.linalg.norm(pos - goal, axis=-1)
h = dict(zip(coords, h))

# Run the A* algorithm
pathCoords, visitedNodes = A_Star(start, goal, h, coords, occupancy_grid)
path = np.array(pathCoords).reshape(-1, 2).transpose()
visitedNodes = np.array(visitedNodes).reshape(-1, 2).transpose()

# Displaying the map
fig_astar, ax_astar = create_empty_plot()
ax_astar.imshow(obstacle_grid.transpose(), cmap=cmap)

# Plot the best path found and the list of visited nodes
ax_astar.plot(path[0], path[1], marker="o", color = 'steelblue');
ax_astar.scatter(start[0], start[1], marker="o", color = 'aquamarine', s=2300);
ax_astar.scatter(goal[0], goal[1], marker="o", color = 'pink', s=2300);

<IPython.core.display.Javascript object>

# 3 Global Navigation

## 3.1 Translating Path to Movement

### 3.1.1 Functions

In [46]:
# GLOBAL VARIABLES
SPEEDR = 200
SPEEDL = 203
TIME_UNIT_SQUARE = 0.3
TIME_UNIT_TURN = 0.569

def stop():
    """
    Stops the robot motors
    """
    th.set_var_array("leds.top", [255, 0, 0])
    th.set_var_array("leds.circle", [0, 0, 0, 0, 0, 0, 0, 0])
    th.set_var("motor.right.target", 0)
    th.set_var("motor.left.target", 0)

def goForwards():
    """
    Starts the robot motors to go straight
    """
    th.set_var_array("leds.circle", [255, 0, 0, 0, 0, 0, 0, 0])
    th.set_var_array("leds.top", [0, 255, 0])
    th.set_var("motor.right.target", SPEEDR)
    th.set_var("motor.left.target", SPEEDL)
    
def turnRight():
    """
    Rotates the robot 45 degrees to the right and sets the motors to go straight
    """
    th.set_var_array("leds.circle", [0, 255, 0, 0, 0, 0, 0, 0])
    th.set_var_array("leds.top", [0, 255, 0])
    th.set_var("motor.right.target", 2**16-SPEEDR)
    th.set_var("motor.left.target", SPEEDL)
    time.sleep(TIME_UNIT_TURN)
    th.set_var("motor.right.target", 0)
    th.set_var("motor.left.target", 0)
        
def turnLeft():
    """
    Rotates the robot 45 degrees to the left and sets the motors to go straight
    """
    th.set_var_array("leds.circle", [0, 0, 0, 0, 0, 0, 0, 255])
    th.set_var_array("leds.top", [0, 255, 0])
    th.set_var("motor.right.target", SPEEDR)
    th.set_var("motor.left.target", 2**16-SPEEDL)
    time.sleep(TIME_UNIT_TURN)
    th.set_var("motor.right.target", 0)
    th.set_var("motor.left.target", 0)

def pathToMovement(path):
    """
    Takes a list of path coordinates and converts it into an array of movements
    :path:   List[(x,y), ... (x,y)]
    :return: Array[ [dx dy] ... [dx dy] ]
    """
    movArray = np.empty((len(path)-1,2))
    for i in range(len(pathCoords)):
        if (i != len(pathCoords)-1):
            movArray[i][0] = path[i+1][0] - path[i][0]
            movArray[i][1] = path[i+1][1] - path[i][1]         
    return movArray

def movementToDirection(mov):
    """
    Takes an array of movements and converts it to an array of directions:
        - 0: go straight
        - 1: turn right
        - 2: turn left
    :mov:    Array[ [dx dy] ...  [dx dy] ]
    :return: Array[ d1 d2 ... dn ]
    """
    dirArray = np.empty(len(mov))
    for i in range(len(mov)):
        if (i == 0):
            dirArray[i] = 0
        else:
            if ((mov[i][0] == mov[i-1][0]) and (mov[i][1] == mov[i-1][1])):
                dirArray[i] = 0
            elif ((mov[i][0] != mov[i-1][0]) and (mov[i][1] == mov[i-1][1])):
                if ((mov[i][0]-mov[i-1][0]) == 1):
                    dirArray[i] = 1
                elif ((mov[i][0]-mov[i-1][0]) == -1):
                    dirArray[i] = 2
            elif ((mov[i][0] == mov[i-1][0]) and (mov[i][1] != mov[i-1][1])):
                if ((mov[i][1]-mov[i-1][1]) == 1):
                    dirArray[i] = 1
                elif ((mov[i][1]-mov[i-1][1]) == -1):
                    dirArray[i] = 2                  
    return dirArray

def globalNavigation(movement, direction):
    """
    Takes in a movement and direction and applies the specific motor instruction:
    :movement:  [dx dy]
    :direction: [d]
    """
    if direction == 0:
        goForwards()
    elif direction == 1:
        turnRight()
        goForwards()
    elif direction == 2:
        turnLeft()
        goForwards()
    time.sleep(math.sqrt(movement[0]**2+movement[1]**2)*TIME_UNIT_SQUARE)
    
def updateCoords(currCoords, prevCoords, movement):
    """
    Takes in a movement, the robots current position coordinates and an array of its previous coordinates.
    It then updates the current coordinates according to the movement, and updates the previous ones:
    :currCoords:  [x y]
    :prevCoords: [ [x y] ... [x y]]
    :movement: [dx dy]
    """
    newPrevCoords = np.empty((len(prevCoords)+1,2))
    newCurrCoords = np.empty(2)
    for i in range(len(prevCoords)+1):
        if i == (len(prevCoords)):
            newPrevCoords[i] = currCoords
        else:
            newPrevCoords[i] = prevCoords[i]
    newCurrCoords[0] = currCoords[0]+movement[0]
    newCurrCoords[1] = currCoords[1]+movement[1]
    return newCurrCoords, newPrevCoords
    

In [47]:
movementArray = pathToMovement(pathCoords)
directionArray = movementToDirection(movementArray)
currentCoords = np.array(pathCoords[0])
previousCoords = np.empty(0)
count = 0
state = 0

# Face the right Way
if movementArray[0][0] == -1 and movementArray[0][1] == 1:
    turnLeft()
elif movementArray[0][0] == 1 and movementArray[0][1] == 1:
    turnRight()
elif movementArray[0][0] == -1 and movementArray[0][1] == 0:
    turnLeft()
    turnLeft()
elif movementArray[0][0] == 1 and movementArray[0][1] == 0:
    turnRight()
    turnRight()  

while True:
    # CHECK IF NOT FALLING OFF EDGE OR LIFTED UP
    if (th["prox.ground.reflected"][0] > 50) or (th["prox.ground.reflected"][0] > 50):
        
        # CHECK PROX VALUES FOR LOCAL OBSTACLE TO CHANGE STATE
        if (th["prox.horizontal"][0] > 2500) or (th["prox.horizontal"][2] > 2500) or (th["prox.horizontal"][4] > 2500):
            state = 1
        else:
            state = 0

        # STATE MACHINE
        if (state == 0):
            if (count < len(directionArray)):
                globalNavigation(movementArray[count], directionArray[count])
                currentCoords, previousCoords = updateCoords(currentCoords, previousCoords, movementArray[count])
                count += 1
            else:
                break
                
        if (state == 1):
            stop()
    else:
        stop()
    
    # REFRESH RATE OF 10ms
    time.sleep(0.01)
    
stop()


# 4 Local Navigation

In [35]:
stop()