In [None]:
import warnings
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from synthesize.synthesize import placeRandomRooms, synthesize_train_set, synthetic_sensor
from utils.planning import  get_path, plot_path
from utils.processing import create_classification_problem
from utils.plotting import plot_path_with_lines

from sklearn import preprocessing
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
import random
from copy import deepcopy
import math
from shapely.geometry import LineString
from astar.gridmap import OccupancyGridMap
from shapely.geometry import MultiLineString, LineString
from copy import deepcopy

%matplotlib inline
warnings.filterwarnings('ignore')

In [None]:
class Args:
    '''
    Simple class that controls the hyperparameters of our system. 
    
    @params
    robot_type-- Use 'omni' for an omnidirectional robot and 'ddr' for a differential drive robot
    map_size-- Currently needs to be fixed at (50,50). Will update for variable sized map soon. 
    num_train_maps-- How many random maps to train on
    num_train_paths-- How many paths to train on
    use_polar-- Flag to switch from L1 distance to polar distance to the goal
    num_sensor_readings-- How many samples to be returned by the synthetic sensor. 
                          These samples will be evenly space around the robot.
    movement-- Use '4N' for 4-point connectivity and '8N' for 8-point connectivity. 
               Note: Current implementation only supports ML path planning for '4N'
    dirs-- The mapping between class label and directional movement. 
    inv_dirs-- Inverse of dirs mapping used for test path generation.
    steer-- Encoding directional output realtive to odom. Only used when robot_type = 'ddr'
    img_root-- Folder to save gif of output
    '''
    robot_type = 'ddr'
    map_size = (50,50)
    num_train_maps = 10
    num_train_paths = 100
    use_polar = True 
    num_sensor_readings = 8
    movement = '4N'
    dirs = {0:(1.0, 0.0), 1:(0.0, 1.0), 2:(-1.0, 0.0), 3:(0.0, -1.0)}
    inv_dirs = {d: label for label, d in dirs.items()}
    enc = preprocessing.LabelEncoder()
    steer = {0:0, 1:np.pi/2., -1:-np.pi/2, 2:np.pi, -2:np.pi, 3:-np.pi/2, -3:np.pi/2}
    img_root = 'gifs'
    
args=Args()

In [None]:
def test_model(MAP, map_np, start, goal, model, args):
    '''
    Test model performance on a new map. 
    
    @params
    MAP-- Shapely map of test environment
    map_np-- Numpy array of map environment. Obstacle cells = 1, free space = 0
    start-- Starting position in path
    goal-- Goal position. 
    model-- Trained sklearn model. 
    args-- System hyperparameter class. Must be of type Args
    '''
    
    # Get ground truth path for comparison. 
    groud_truth = get_path(start, goal, map_np, plot=False)
    pred_path = [start]
    cur_dir = (0, 0)
    cur = start
    directions = []
    while cur != goal:
        # Get sensor data at current node. 
        laser_scan, lines = synthetic_sensor(MAP, (cur[0]+0.5, cur[1]+0.5), direction=cur_dir, args = args)
        # offset is the rotation angle in radians from starting position. 
        offset = 0
        if cur_dir != (0, 0): # Start node 
            offset = args.inv_dirs[cur_dir]
        rot = np.pi/2 * offset 
        # Get goal in map frame 
        goal_loc = (goal[0]-cur[0], goal[1]-cur[1])  
        # Get goal in odom frame 
        goal_orn = (goal_loc[0]*np.cos(rot) + goal_loc[1]*np.sin(rot), goal_loc[0]*-np.sin(rot) + goal_loc[1]*np.cos(rot))
        if args.robot_type == 'omni':
            laser_scan.append(goal_loc[0])
            laser_scan.append(goal_loc[1])
        elif args.robot_type == 'ddr':
            if args.use_polar:
                polar_distance = np.linalg.norm(np.array([goal_loc[0],goal_loc[1]]))
                polar_rotation = math.atan2(goal_orn[1], goal_orn[0])
                laser_scan.append(polar_distance)
                laser_scan.append(polar_rotation)
            else:
                laser_scan.append(goal_orn[0])
                laser_scan.append(goal_orn[1])

        # Create model input
        inpX = np.array(laser_scan)
        inds = model.predict_proba(inpX.reshape(1,-1))[0]
        # Extract and sort best cells to travel to next
        best = list(np.argsort(inds))  
        best.reverse()

        # Get all possible next cells ordered by RF prediction probability. 
        possible_dirs = [eval(args.enc.inverse_transform(best)[ii]) for ii in range(len(best))]
        if args.robot_type == 'omni':
            possible_next_states = [(cur[0] + d[0], cur[1] + d[1]) for d in possible_dirs]
        elif args.robot_type == 'ddr':
            possible_steers = [(np.cos(rot + args.steer[ind]), np.sin(rot + args.steer[ind])) for ind in possible_dirs]
            possible_next_states = [(cur[0] + steers[0], cur[1] + steers[1]) for steers in possible_steers]

        # Remove cells that we have already visited. 
        temp_states = deepcopy(possible_next_states)
        for state in possible_next_states:
            if (state in pred_path):
                temp_states.remove(state) 

        # Save directions and current state
        directions.append(cur_dir)
        cur_dir = (temp_states[0][0] - cur[0], temp_states[0][1] - cur[1])
        cur = temp_states[0]
        assert cur not in pred_path
        pred_path.append(cur)
        # Once goal is found, make sure the path doesnt 
        # intersect with any obstacles. If it does, we failed. 
        if cur == goal:
            temp = [(p[0]+0.5, p[1]+0.5) for p in pred_path]
            if LineString(temp).intersects(MAP):
                success = 0
            else:
                success = 1
            break
 
    return pred_path, directions, success, groud_truth

In [None]:
# Get training maps
MAPS = []
for i in range(args.num_train_maps):
    train_arr, train_MAP = placeRandomRooms(args.map_size, minRoomSize=3, maxRoomSize=15, roomStep = 1, margin = 4, attempts = 100)
    MAPS.append((train_MAP,train_arr))

# Synthesize dataset
df_ = synthesize_train_set(MAPS, args)
# Process data
df = create_classification_problem(df_.copy(), args)

# Fit model using all data.
clf = RandomForestClassifier(n_estimators=200)
clf.fit(df.drop(['out'], axis=1).values, df['out'].values)

In [None]:
pred_len, astar_len = [], []
astar_len = []
successes = 0
n_test = 0

print("Begin path planning")
while True:
    # Get random path
    min_node, max_node = 1, args.map_size[1]-1
    start = (random.randint(min_node, max_node), random.randint(min_node, max_node))
    goal = (random.randint(min_node, max_node), random.randint(min_node, max_node))
    # Get random map
    arr, test_MAP = placeRandomRooms(args.map_size, minRoomSize=3, maxRoomSize=15, roomStep = 1, margin = 4, attempts = 100)
    # Code in try block fails when no path is avaiable on given map.
    try:
        # Perform ML-based path planning
        pred_path, directions, success, ground_truth = test_model(test_MAP, arr, start, goal, clf, args)
        break
    except:
        plt.close()
        print("No path exists... trying again")
        continue

print("\nPath Found")     
if success == 1:
    print("Safe path found!")
else:
    print("Invalid path computed :(")
print("Plotting A* Path")
get_path(start, goal, arr, plot=True) 
print("Plotting Predicted Path")
OccupancyGridMap(arr, 1).plot()
plot_path(pred_path)
            

In [None]:
from IPython.display import Image
plot_path_with_lines(pred_path[:-1], test_MAP, directions, args)
Image(args.img_root+'/out.gif')