In [4]:
%pip install -q torch numpy matplotlib pillow torchvision torchsummary torchsummaryX plotly gtsam shapely tqdm

Note: you may need to restart the kernel to use updated packages.


In [5]:
import os
import json
import time
import torch
import torch.nn as nn
import torchvision.datasets as datasets
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.autograd import Variable
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F
# %pip install torchsummaryX --quiet
from torchsummaryX import summary as summaryX
from torchsummary import summary
import random
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [52]:
# importing necessary modules for factor graph optimization and visualization
#UNCOMM
%pip install -q plotly gtsam shapely
import gtsam
import numpy as np
import plotly.graph_objects as go
from tqdm import trange
import torch
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import os
import math
# import anki_vector
# from anki_vector.util import Pose, degrees
#curl https://hackgtstoragebucket.s3.amazonaws.com/utils.py > utils.py


from utils import *

########################## load dataset ##########################
test_dataset = datasets.KMNIST(root='dataset/', train=False, transform=transforms.Compose([transforms.Pad(2), transforms.ToTensor()]), download=True)

########################## environment helpers ##########################

def rect(x_m, y_m, xsize_m, ysize_m, rgba=None):
    return dict(
        type='rect',
        x0=x_m, y0=y_m,
        x1=x_m + xsize_m, y1=y_m + ysize_m,
        fillcolor=f"rgba({rgba[0]}, {rgba[1]}, {rgba[2]}, {rgba[3]})" if rgba else "rgb(255, 255, 255)",
        line_width=0,
        layer="below"
    )


class Wall:
    def __init__(self, start_point, end_point, color):
        self.ps = start_point
        self.pe = end_point
        self.color = color

    def get_rect(self):
        return rect(self.ps[0], self.ps[1], self.pe[0]-self.ps[0], self.pe[1]-self.ps[1], self.color)


def env_walls():
    '''
    Returns a list of obstacles in the map. 
    Each Wall class represents a rectangular obstacle in the map. 
    '''
    white = [255, 255, 255, 255]
    gray = [150, 150, 150, 255]
    wall_list = [Wall([0,-0.5], [16, 0], white),
                 Wall([0, 9], [16, 9.5], white),
                 Wall([-0.5, -0.5], [0, 9.5], white),
                 Wall([16, -0.5], [16.5, 9.5], white),
                 Wall([0, 4.25], [3, 4.75], gray),
                 Wall([5, 4.25], [11, 4.75], gray),
                 Wall([13, 4.25], [16, 4.75], gray),
                 Wall([7.75, 1.75], [8.25, 7.25], gray),
                 Wall([0, 0], [3, 1.75], gray),
                 Wall([13, 0], [16, 1.75], gray),
                 Wall([0, 7.25], [3, 9], gray),
                 Wall([13, 7.25], [16, 9], gray),
                 Wall([5, 7.25], [11, 7.75], gray),
                 Wall([5, 1.25], [11, 1.75], gray)]
    return wall_list



Note: you may need to restart the kernel to use updated packages.


In [51]:
class Environment:
    """ Simulation environment class.
    Initializes the room size, images, and obstacles. 
    """
    def __init__(self):
        self.room_size = (16, 9)
        self.wall_list = env_walls()
        self.image_list=[]
env = Environment()

visualize(env)

In [8]:
class Node:
    def __init__(self, point, parent=None):
        super(Node, self).__init__()
        self.point = point
        self.parent = None

    @property
    def x(self):
       return self.point[0]

    @property
    def y(self):
       return self.point[1]


def dist(node1, node2):
    return np.linalg.norm(node2.point - node1.point) 


def inObstacle(node, env):
    x = node.point[0]
    y = node.point[1]
    for i in range(0, len(env.wall_list)): # WARINING POLYGON AND MATH WERE IMPORTED ABOVE
        LB = env.wall_list[i].ps
        RU = env.wall_list[i].pe
        LU = np.array([env.wall_list[i].ps[0], env.wall_list[i].pe[1]])
        RB = np.array([env.wall_list[i].pe[0], env.wall_list[i].ps[1]])
        # print(type(LB[0]))
        rect = Polygon([LB, LU, RU, RB])
        if rect.contains(Point(x,y)):
            return True
    return False
        
        # if LB[0]-0.5 <= x and x <= RU[0]+0.5:
        #     if LB[1]-0.5 <= y and y <= RB[0]+0.5:
        #         return True
        
    return False

def randomSample(goal_pos, env):
    chance = random.uniform(0.0, 10.0)
    if chance <= 1.0:
        return Node(goal_pos)

    rand_point = Node(np.array([random.uniform(0, 16.0), random.uniform(0, 9.0)]))
    leash = 1000
    in_obs = inObstacle(rand_point, env)
    while in_obs:
        rand_point = Node(np.array([random.uniform(0, 16.0), random.uniform(0, 9.0)]))
        in_obs = inObstacle(rand_point, env)
        if leash == 1:
            return Node(np.array([0.0,0.0]))
        leash-=1
        
    return rand_point


def getNearest(rrt_tree, random_point):
    min = math.inf
    min_idx=-1
    for i in range(0, len(rrt_tree)-1):
        curr_dist = dist(rrt_tree[i], random_point)
        if curr_dist < min:
            min = curr_dist
            min_idx=i

    return rrt_tree[min_idx]

def stepTo( nearest_point,random_point,step=1.):
    
    if dist(nearest_point,random_point) < step:
        return random_point
    angle = math.atan2(random_point.point[1]-nearest_point.point[1], random_point.point[0]-nearest_point.point[0])

    x_point = math.cos(angle) * step
    y_point = math.sin(angle) * step  
    x_point= round(x_point,3)
    y_point= round(y_point,3)    
    vec= np.array([x_point+nearest_point.point[0], y_point+nearest_point.point[1]])
    #debug = "angle: " + str(math.radians(angle)) + " x_point: " + str(x_point) +" y_point: "+ str(y_point)
    #return (Node(vec),debug)
    return Node(vec)


def isCollision(line, env):       
    for i in range(0, len(env.wall_list)):
        LB = env.wall_list[i].ps
        RU = env.wall_list[i].pe
        LU = np.array([env.wall_list[i].ps[0], env.wall_list[i].pe[1]])
        RB = np.array([env.wall_list[i].pe[0], env.wall_list[i].ps[1]])

        if line_intersection(LU, RU, line[0].point, line[1].point):
            return True   
        if line_intersection(RU, RB, line[0].point, line[1].point):
            return True
        if line_intersection(LB, RB, line[0].point, line[1].point):
            return True
        if line_intersection(LU,LB, line[0].point, line[1].point):
            return True

    return False
  

def withinTolerance(point, goal, tolerance=1.5):
    distance = dist(point, goal)
    if tolerance >= abs(distance):
        return True
    return False 


In [9]:
res= inObstacle(Node(np.array([4,3])), env)
print(res)

False


In [10]:
def RRT(start_pos, goal_pos, env):
    
    root = Node(start_pos)
    goal_node = Node(goal_pos)
    rrt_tree = [root]
    soln_node = None
    path=None
    path=[]
    dummy=0
    hasReached = False
    while hasReached == False:
        randy = randomSample(goal_pos, env)
        nearest = getNearest(rrt_tree, randy)
        node = stepTo(nearest, randy)
        if isCollision((nearest, node), env):
            continue
        if inObstacle(node, env):
            continue
        node.parent= nearest
        rrt_tree.append(node)

        hasReached = withinTolerance(node, goal_node)
        if hasReached == True:
            soln_node = node

    dummy = soln_node
    while dummy != None:
        path.append(dummy)
        dummy = dummy.parent
    path.reverse()

    return path, rrt_tree

In [11]:
def smooth_path(path, env):
    new_path = [path[0]]
    for i in range(1, len(path)-1):
        if isCollision((path[i-1], path[i+1]), env):
            new_path.append(path[i])
    new_path.append(path[-1])
    return new_path

In [12]:
start_pos = np.array([4,1])
goal_pos = np.array([13,6])
path, rrt_tree = RRT(start_pos, goal_pos, env)
# path = smooth_path(path, env)


invalid value encountered in intersection



In [13]:

start_pos = np.array([4,1])
goal_pos = np.array([13,6])
path, rrt_tree = RRT(start_pos, goal_pos, env)
# path = smooth_path(path, env)
visualize_tree(env, rrt_tree)



In [14]:
visualize_path(env,path)


In [15]:
# def smoothPath(path, env, iters=10):
#     return path

In [16]:
def ddr_ik(v_x, omega, L=0.5, r=0.1):
    return (v_x - (L/2)*omega)/r, (v_x + (L/2)*omega)/r


def rotate(init_pose, point, omega=2):

    angle = np.arctan2(point[1]- init_pose.y(), point[0]- init_pose.x())
    if angle < 0:
        angle +=  2*np.pi
    v_L, v_R = ddr_ik(0, omega)
    t = abs(angle /omega)

    return v_L,v_R, t

  
def rotateRobot(init_pose, v_L, v_R, t, L=0.5, r=0.1):
    init_theta = init_pose.theta()
    angle_turned = 0
    poses = []
    N = 1
    for _ in range(N):
        angle_turned += (v_R-v_L)*t*r/(L*N) % (2*math.pi)
        poses.append(gtsam.Pose2(init_pose.x(),init_pose.y(), angle_turned))

    return poses

def forward(init_pose, point, V=1):
    distance = np.linalg.norm([point[0]- init_pose.x(), point[1]- init_pose.y()])
    v_L,v_R = ddr_ik(V, 0)
    translation = np.array([init_pose.x()-point[0], init_pose.y()-point[1]])
    t = distance / V
    return v_L, v_R, t


def forwardRobot(init_pose, v_L, v_R, t, L=0.5, r=0.1):
    '''
    Generates intermediate robot poses based on wheel velocities, useful for visualization
    
    v_L (float): Left wheel velocity
    v_R (float): Right wheel velocity
    t (float): time duration for which the robot should use these velocities
    L (float): Distance between robot wheels
    r (float): Wheel radius of robot
    '''
    x = init_pose.x()
    y = init_pose.y()
    poses = []
    N = 5

    for i in range(N):
        x += v_L*math.cos(init_pose.theta())*t*r/N
        y += v_L*math.sin(init_pose.theta())*t*r/N

        poses.append(gtsam.Pose2(x,y,init_pose.theta()))

    return poses
    

def moveRobot(init_pose: gtsam.Pose2, path):
    '''
    move robot along given path
    init_pose (gtsam.Pose2): Starting pose (x,y,theta) of the robot
    path: path found from RRT

    return: list of poses along the given path
    '''
    poses = [init_pose]
    
    for i in range(1, len(path)):
        curr_pose = poses[-1]
        next_x, next_y = path[i].point
        v_L, v_R, t = rotate(curr_pose, [next_x, next_y])
        poses += rotateRobot(curr_pose, v_L, v_R, t)
        v_L,v_R, t = forward(poses[-1], [next_x, next_y])
        poses+= forwardRobot(poses[-1], v_L,v_R, t)

    return poses
       

In [17]:
poses=moveRobot(gtsam.Pose2(4, 1, np.pi/2), path)
# print(poses[0])
data =[]
for i in range(0,len(poses)):
    data.append((poses[i].x(),poses[i].y(),poses[i].theta()))


print(len(data))

with open('data.json', 'w') as outfile:
    json.dump(data, outfile)


115


Test robot to move to a designated goal position. 

In [18]:
# def move_robot(robot, length):
#     amt_loops = (length* 254) // 200
#     for i in range(0, amt_loops):
#         robot.behavior.drive_straight(distance_mm(200), speed_mmps(100))
#     robot.behavior.drive_straight(distance_mm((length* 254) % 200), speed_mmps(100))
    
    
    

In [19]:

# saved_pos = dict([(0, [7.75,3]), (1, [0,6]), (2, [7.75,6]), (3, [4,9]), 
#                 (4, [12,9]), (5, [8.25,6]), (6, [16,6]), (7, [8.25,3]), 
#                 (8, [16,3]), (9, [4,1])])
# poses=moveRobot(gtsam.Pose2(4, 1, np.pi/2), path)
# # starting_pose = gtsam.Pose2(4, 1, np.pi/2)
# goal_node = np.array([0,3])

# args = anki_vector.util.parse_command_args()
# with anki_vector.Robot(args.serial) as robot:
#     # curr_pose = starting_pose
#     for i in range(0,len(poses)-1):
        
#         robot.behavior.turn_in_place((poses[i][2]))
#         move_robot(robot, abs(dist(Node(np.array([poses[i][0],poses[i][1]])) ,Node(np.array([poses[i+1][0],poses[i+1][1]])) )))

        

In [20]:
# args = anki_vector.util.parse_command_args()
# with anki_vector.Robot(args.serial) as robot:
#     robot.behavior.say_text("hello")

In [21]:
# ### Comment for autograder - do not delete!
# '''
# Code to run RRT multiple times based on the coordinates obtained from the detected images. 

# Given
# -----
# saved_pos: (dict) Contains image labels as keys and the coordinates of the next image position as values.
# starting_pose: (gtsam.Pose2) Robot initial pose in coordinate [4,1]
# goal_node: (np.array) coordinate of the initial goal position of image 0.

# TODO
# ----
# 1. Navigate to the goal position using RRT
# 2. When the robot is close enough to the image, (i.e. final node in the found path) 
# sample the image and use the image classifier to get the predicted label. 
# 3. Get the new goal position from saved_pos using the predicted label. 
# 4. Repeat 1-3 until the robot moves though all images. 
# '''
# # Dictionary of image labels as keys and the next image coordinates as values
# saved_pos = dict([(0, [7.75,3]), (1, [0,6]), (2, [7.75,6]), (3, [4,9]), 
#                   (4, [12,9]), (5, [8.25,6]), (6, [16,6]), (7, [8.25,3]), 
#                   (8, [16,3]), (9, [4,1])])

# # saved_pos = dict([(0, [7.75,3]), (1, [0,6])])

# starting_pose = gtsam.Pose2(4, 1, np.pi/2)
# goal_node = np.array([0,3])
# # TODO 14
# ###############################################################################
# #                             START OF YOUR CODE                              #
# ###############################################################################
# poses_list=[]
# temp = None
# for i in range(0, len(saved_pos)+1):
#     path, rrt_tree = RRT(np.array([starting_pose.x(), starting_pose.y()]), goal_node, env)
#     poses_list.extend(moveRobot(starting_pose, path))
#     for img in env.image_list:
#         if img.location == list(goal_node):
#             print("image ",img.image_label)
#             temp = img
#     label = image_classifier.classify_image(temp.sample())
#     goal_node = np.array(saved_pos[label])
#     print("label ",label)
#     starting_pose = poses_list[-1]


# poses = poses_list
# ###############################################################################
# #                              END OF YOUR CODE                               #
# ###############################################################################
# ### Comment for autograder end - do not delete!