In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import time
import pinocchio as pin
from pinocchio.visualize import MeshcatVisualizer
from meshcat.transformations import translation_matrix, rotation_matrix
from meshcat.geometry import Box, MeshPhongMaterial, Cylinder
from scipy.optimize import minimize

### Visualization

In [2]:
# specify a path the the urdf files and meshes
urdf_model_path = "diffdrive.urdf"
mesh_dir = ""

# load the robot using pinocchio
robot = pin.RobotWrapper.BuildFromURDF(urdf_model_path, mesh_dir)

# vizualize the robot using meshcat
viz = MeshcatVisualizer(robot.model, robot.collision_model, robot.visual_model)
viz.initViewer(loadModel=True)

def show_robot(x,y,theta):
    quat = pin.Quaternion(pin.utils.rotate('z', theta)).coeffs()
    pos = np.array([x,y,0.1])

    viz.display(np.append(pos,quat))

You can open the visualizer by visiting the following URL:
http://127.0.0.1:7000/static/


In [3]:
# Add a floor
# Add floor material
material_floor = MeshPhongMaterial()
material_floor.color = int(200) * 256**2 + int(200) * 256 + int(200)
# Add a floor
viz.viewer["/Floor"].set_object(
    Box([10, 10, 0.01]),
    material_floor
)
viz.viewer["/Floor"].set_transform(
    translation_matrix([0, 0, -0.005])
)

# Add obstacle material
material_obstacle = MeshPhongMaterial()
material_obstacle.color = int(100) * 256**2 + int(100) * 256 + int(100)

# Randomly generate 30 obstacle positions within a defined range
np.random.seed(6) #6
obstacle_positions = [
    np.array([np.random.uniform(-4.8, 4.8), np.random.uniform(-4.8, 4.8), 0.5])
    for _ in range(10)
]

# add cylinders for each obstacle
for i, pos in enumerate(obstacle_positions):
    viz.viewer[f"/Obstacle_{i}"].set_object(
        Cylinder(1, 0.2), material_obstacle
    )
    T_world_obs = translation_matrix(pos)
    T_world_obs[:3, :3] = pin.utils.rotate('x', np.pi / 2)
    viz.viewer[f"/Obstacle_{i}"].set_transform(
        T_world_obs
    )
for i, pos in enumerate(obstacle_positions[::2]):
    # Connect a wall (box) between the two obstacles
    wall_length = np.linalg.norm(obstacle_positions[2*i][:2] - obstacle_positions[2*i+1][:2])
    wall_width = 0.4
    wall_height = 1.0
    wall_material = MeshPhongMaterial()
    wall_material.color = int(100) * 256**2 + int(100) * 256 + int(100)

    wall_position = (obstacle_positions[2*i][:2] + obstacle_positions[2*i+1][:2]) / 2

    viz.viewer[f"/Wall_Obstacle_{i}"].set_object(
        Box([wall_length, wall_width, wall_height]), wall_material
    )
    # Set the wall rotation to align with the line between the two obstacles
    angle = np.arctan2(
        obstacle_positions[2*i+1][1] - obstacle_positions[2*i][1],
        obstacle_positions[2*i+1][0] - obstacle_positions[2*i][0]
    )
    viz.viewer[f"/Wall_Obstacle_{i}"].set_transform(
        translation_matrix([wall_position[0], wall_position[1], wall_height / 2]) @
        rotation_matrix(angle, [0, 0, 1])
    )

# Add walls around the floor
wall_thickness = 0.1
wall_height = 1.0

# Left wall
viz.viewer["/Wall_Left"].set_object(Box([wall_thickness, 10, wall_height]))
viz.viewer["/Wall_Left"].set_transform(
    translation_matrix([-5 - wall_thickness / 2, 0, wall_height / 2])
)

# Right wall
viz.viewer["/Wall_Right"].set_object(Box([wall_thickness, 10, wall_height]))
viz.viewer["/Wall_Right"].set_transform(
    translation_matrix([5 + wall_thickness / 2, 0, wall_height / 2])
)

# Front wall
viz.viewer["/Wall_Front"].set_object(Box([10, wall_thickness, wall_height]))
viz.viewer["/Wall_Front"].set_transform(
    translation_matrix([0, 5 + wall_thickness / 2, wall_height / 2])
)

# Back wall
viz.viewer["/Wall_Back"].set_object(Box([10, wall_thickness, wall_height]))
viz.viewer["/Wall_Back"].set_transform(
    translation_matrix([0, -5 - wall_thickness / 2, wall_height / 2])
)

# can you add a tower in each corner of the walls?
tower_height = 1.5
tower_radius = 0.3
tower_material = MeshPhongMaterial()
tower_material.color = int(100) * 256**2 + int(100) * 256 + int(100)

tower_positions = [
    np.array([-5 - wall_thickness / 2, 5 + wall_thickness / 2, tower_height / 2]),
    np.array([5 + wall_thickness / 2, 5 + wall_thickness / 2, tower_height / 2]),
    np.array([-5 - wall_thickness / 2, -5 - wall_thickness / 2, tower_height / 2]),
    np.array([5 + wall_thickness / 2, -5 - wall_thickness / 2, tower_height / 2])
]

for i, pos in enumerate(tower_positions):
    viz.viewer[f"/Tower_{i}"].set_object(
        Cylinder(tower_height, tower_radius), tower_material
    )
    T_world_tower = translation_matrix(pos)
    T_world_tower[:3, :3] = pin.utils.rotate('x', np.pi / 2)
    viz.viewer[f"/Tower_{i}"].set_transform(
        T_world_tower
    )

In [4]:
x,y,theta = 0,0,0
# grid is:
# x [-5, 5]
# y [-5, 5]
# theta [0, 2*pi]
show_robot(x,y,theta)

### Simulation

In [5]:
# Define robot dynamics
def discrete_dynamics(state, control_input, dt, model_mismatch=False):
    """
    Update the robot's state based on its dynamics.

    Parameters:
    - state: Current state [x, y, theta]
    - control_input: Control input [linear_velocity, angular_velocity]
    - dt: Time step

    Returns:
    - Updated state [x, y, theta]
    """
    x, y, theta = state
    linear_velocity, angular_velocity = control_input
    if model_mismatch:
        # Introduce model mismatch by adding noise to the control input
        linear_velocity += np.random.normal(0, 1.0)
        angular_velocity += np.random.normal(0, 1.0)

    # Update state using differential drive kinematics
    x += linear_velocity * np.cos(theta) * dt
    y += linear_velocity * np.sin(theta) * dt
    theta += angular_velocity * dt

    # Normalize theta to keep it within [-pi, pi]
    theta = np.arctan2(np.sin(theta), np.cos(theta))

    return np.array([x, y, theta])


In [6]:
def simulation(controller):
    # Simulation parameters
    dt = 0.01  # Time step (s)
    simulation_time = 5  # Total simulation time (s)
    sensor_noise_stddev = 0.03  # Standard deviation of sensor noise (m)

    # Initial robot position and orientation
    x, y, theta = 0.0, 0.0, 0.0  # Initial position (x, y) and orientation (theta)

    # Simulation loop
    for t in np.arange(0, simulation_time, dt):

        z = np.array([x,y]) + np.random.normal(0, sensor_noise_stddev, 2)  # Simulated sensor measurement with noise

        # calculate control inputs
        u,w = controller(x,y,theta,z)
        # u = linear_velocity
        # w = angular_velocity

        # Update robot position and orientation using differential drive kinematics
        x, y, theta = discrete_dynamics([x, y, theta], [u,w], dt, model_mismatch=True)

        # Normalize theta to keep it within [-pi, pi]
        theta = np.arctan2(np.sin(theta), np.cos(theta))

        # Display the robot in the visualization
        show_robot(x, y, theta)
        # Calculate and visualize distances to towers

        # Pause to simulate real-time visualization
        time.sleep(dt)

# Motion Planning

In [12]:
# Robot parameters
ROBOT_RADIUS = 0.5  # Robot collision radius

# Define goal configuration
GOAL = np.array([4.0, 4.0, 0.0])  # Example goal (x, y, theta)

# Local Planner Implementation
class LocalPlanner:
    def __init__(self, start, goal, dt=0.1):
        self.start = np.array(start)
        self.goal = np.array(goal)
        self.dt = dt

    def compute_controls(self):
        def cost(control):
            u, w = control
            state = self.start.copy()
            next_state = discrete_dynamics(state, [u, w], self.dt)
            pos_cost = np.sum((next_state[:2] - self.goal[:2])**2)
            theta_cost = (next_state[2] - self.goal[2])**2
            control_cost = 0.1 * (u**2 + w**2)
            return pos_cost + theta_cost + control_cost

        def grad(control):
            u, w = control
            x, y, theta = self.start
            dx = u * np.cos(theta) * self.dt
            dy = u * np.sin(theta) * self.dt
            dtheta = w * self.dt
            
            grad_u = 2*(x + dx - self.goal[0]) * np.cos(theta)*self.dt + \
                     2*(y + dy - self.goal[1]) * np.sin(theta)*self.dt + \
                     0.2*u
            grad_w = 2*(theta + dtheta - self.goal[2])*self.dt + 0.2*w
            return np.array([grad_u, grad_w])

        res = minimize(cost, [1.5, 0.0], jac=grad, method='BFGS')
        return res.x if res.success else (0.0, 0.0)

In [13]:
import numpy as np
from scipy.optimize import minimize

def generate_initial_guess(start, goal, N):
    # Linear interpolation between start and goal
    return np.linspace(start, goal, N)

def smoothness_cost(x, N):
    x = x.reshape((N, 3))
    cost = 0.0
    for i in range(1, N):
        dx = x[i] - x[i-1]
        cost += np.sum(dx[:2]**2)  # Penalize squared position differences
    return cost

def local_planner(start, goal, N=20):
    """
    Optimization-based trajectory planner between two poses.
    Ignores obstacles.
    Returns a (N,3) array of [x, y, theta] waypoints.
    """
    init_guess = generate_initial_guess(start, goal, N).flatten()

    def objective(x):
        return smoothness_cost(x, N)

    def constraint_start(x):
        return x[:3] - start

    def constraint_goal(x):
        return x[-3:] - goal

    constraints = [
        {'type': 'eq', 'fun': constraint_start},
        {'type': 'eq', 'fun': constraint_goal}
    ]

    result = minimize(
        objective,
        init_guess,
        constraints=constraints,
        method='SLSQP',
        options={'maxiter': 500, 'ftol': 1e-6}
    )

    if not result.success:
        print("Optimization failed:", result.message)
        return None

    trajectory = result.x.reshape((N, 3))
    return trajectory

In [14]:
# Collision Checking Implementation
class CollisionChecker:
    def __init__(self):
        self.cylinder_obstacles = []
        self.wall_obstacles = []
        
        # Populate obstacles from environment setup
        # Cylinders (original obstacles and towers)
        for pos in obstacle_positions:
            self.cylinder_obstacles.append({'pos': pos[:2], 'radius': 0.2})
        for pos in tower_positions:
            self.cylinder_obstacles.append({'pos': pos[:2], 'radius': 0.3})
        
        # Walls between obstacles
        for i in range(len(obstacle_positions)//2):
            p1 = obstacle_positions[2*i][:2]
            p2 = obstacle_positions[2*i+1][:2]
            self.wall_obstacles.append({'start': p1, 'end': p2, 'radius': 0.2})
        
        # Surrounding walls
        surrounding_walls = [
            {'start': (-5.05, -5), 'end': (-5.05, 5), 'radius': 0.05},
            {'start': (5.05, -5), 'end': (5.05, 5), 'radius': 0.05},
            {'start': (-5, 5.05), 'end': (5, 5.05), 'radius': 0.05},
            {'start': (-5, -5.05), 'end': (5, -5.05), 'radius': 0.05}
        ]
        self.wall_obstacles += surrounding_walls

    def is_collision_free(self, x, y):
        # Check cylinder obstacles
        for obs in self.cylinder_obstacles:
            dx = x - obs['pos'][0]
            dy = y - obs['pos'][1]
            if np.hypot(dx, dy) < ROBOT_RADIUS + obs['radius']:
                return False

        # Check wall obstacles
        for wall in self.wall_obstacles:
            a = np.array(wall['start'])
            b = np.array(wall['end'])
            p = np.array([x, y])
            ap = p - a
            ab = b - a
            t = np.dot(ap, ab) / np.dot(ab, ab)
            t = np.clip(t, 0.0, 1.0)
            closest = a + t * ab
            distance = np.linalg.norm(p - closest)
            if distance < ROBOT_RADIUS + wall['radius']:
                return False
        return True

from
https://github.com/AtsushiSakai/PythonRobotics/blob/master/PathPlanning/ProbabilisticRoadMap/probabilistic_road_map.py

In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial import KDTree

# PRM Global Planner Implementation
class PRMPlanner:
    def __init__(self, collision_checker, n_samples=100, k_neighbors=5):
        self.cc = collision_checker
        self.N_SAMPLE = n_samples # number of sample_points
        self.N_KNN = k_neighbors # number of edge from one sampled point
        self.MAX_EDGE_LEN = 0.5 # Maximum edge length for connections
        self.nodes = []
        self.edges = {}
    
    class Node:
        """
        Node class for dijkstra search
        """

        def __init__(self, x, y, cost, parent_index):
            self.x = x
            self.y = y
            self.cost = cost
            self.parent_index = parent_index

        def __str__(self):
            return str(self.x) + "," + str(self.y) + "," +\
                str(self.cost) + "," + str(self.parent_index)
    
    def sample_points(self, sx, sy, gx, gy, rr, ox, oy, obstacle_kd_tree, rng):
        max_x = max(ox)
        max_y = max(oy)
        min_x = min(ox)
        min_y = min(oy)

        sample_x, sample_y = [], []

        if rng is None:
            rng = np.random.default_rng()

        while len(sample_x) <= self.n_samples:
            tx = (rng.random() * (max_x - min_x)) + min_x
            ty = (rng.random() * (max_y - min_y)) + min_y

            dist, index = obstacle_kd_tree.query([tx, ty])

            if dist >= rr:
                sample_x.append(tx)
                sample_y.append(ty)

        sample_x.append(sx)
        sample_y.append(sy)
        sample_x.append(gx)
        sample_y.append(gy)

        return sample_x, sample_y

    def prm_planning(self, start_x, start_y, goal_x, goal_y,
                 obstacle_x_list, obstacle_y_list, robot_radius, *, rng=None):
        """
        Run probabilistic road map planning

        :param start_x: start x position
        :param start_y: start y position
        :param goal_x: goal x position
        :param goal_y: goal y position
        :param obstacle_x_list: obstacle x positions
        :param obstacle_y_list: obstacle y positions
        :param robot_radius: robot radius
        :param rng: (Optional) Random generator
        :return:
        """

        obstacle_kd_tree = KDTree(np.vstack((obstacle_x_list, obstacle_y_list)).T)

        sample_x, sample_y = self.sample_points(start_x, start_y, goal_x, goal_y,
                                        robot_radius,
                                        obstacle_x_list, obstacle_y_list,
                                        obstacle_kd_tree, rng)

        road_map = self.generate_road_map(sample_x, sample_y,
                                    robot_radius, obstacle_kd_tree)

        rx, ry = self.dijkstra_planning(
            start_x, start_y, goal_x, goal_y, road_map, sample_x, sample_y)

        return rx, ry


    def is_collision(self, sx, sy, gx, gy, rr, obstacle_kd_tree):
        x = sx
        y = sy
        dx = gx - sx
        dy = gy - sy
        yaw = math.atan2(gy - sy, gx - sx)
        d = math.hypot(dx, dy)

        if d >= self.MAX_EDGE_LEN:
            return True

        D = rr
        n_step = round(d / D)

        for i in range(n_step):
            dist, _ = obstacle_kd_tree.query([x, y])
            if dist <= rr:
                return True  # collision
            x += D * math.cos(yaw)
            y += D * math.sin(yaw)

        # goal point check
        dist, _ = obstacle_kd_tree.query([gx, gy])
        if dist <= rr:
            return True  # collision

        return False  # OK


    def generate_road_map(sample_x, sample_y, rr, obstacle_kd_tree):
        """
        Road map generation

        sample_x: [m] x positions of sampled points
        sample_y: [m] y positions of sampled points
        robot_radius: Robot Radius[m]
        obstacle_kd_tree: KDTree object of obstacles
        """

        road_map = []
        n_sample = len(sample_x)
        sample_kd_tree = KDTree(np.vstack((sample_x, sample_y)).T)

        for (i, ix, iy) in zip(range(n_sample), sample_x, sample_y):

            dists, indexes = sample_kd_tree.query([ix, iy], k=n_sample)
            edge_id = []

            for ii in range(1, len(indexes)):
                nx = sample_x[indexes[ii]]
                ny = sample_y[indexes[ii]]

                if not is_collision(ix, iy, nx, ny, rr, obstacle_kd_tree):
                    edge_id.append(indexes[ii])

                if len(edge_id) >= N_KNN:
                    break

            road_map.append(edge_id)

        #  plot_road_map(road_map, sample_x, sample_y)

        return road_map


    def dijkstra_planning(sx, sy, gx, gy, road_map, sample_x, sample_y):
        """
        s_x: start x position [m]
        s_y: start y position [m]
        goal_x: goal x position [m]
        goal_y: goal y position [m]
        obstacle_x_list: x position list of Obstacles [m]
        obstacle_y_list: y position list of Obstacles [m]
        robot_radius: robot radius [m]
        road_map: ??? [m]
        sample_x: ??? [m]
        sample_y: ??? [m]

        @return: Two lists of path coordinates ([x1, x2, ...], [y1, y2, ...]), empty list when no path was found
        """

        start_node = Node(sx, sy, 0.0, -1)
        goal_node = Node(gx, gy, 0.0, -1)

        open_set, closed_set = dict(), dict()
        open_set[len(road_map) - 2] = start_node

        path_found = True

        while True:
            if not open_set:
                print("Cannot find path")
                path_found = False
                break

            c_id = min(open_set, key=lambda o: open_set[o].cost)
            current = open_set[c_id]

            # show graph
            if show_animation and len(closed_set.keys()) % 2 == 0:
                # for stopping simulation with the esc key.
                plt.gcf().canvas.mpl_connect(
                    'key_release_event',
                    lambda event: [exit(0) if event.key == 'escape' else None])
                plt.plot(current.x, current.y, "xg")
                plt.pause(0.001)

            if c_id == (len(road_map) - 1):
                print("goal is found!")
                goal_node.parent_index = current.parent_index
                goal_node.cost = current.cost
                break

            # Remove the item from the open set
            del open_set[c_id]
            # Add it to the closed set
            closed_set[c_id] = current

            # expand search grid based on motion model
            for i in range(len(road_map[c_id])):
                n_id = road_map[c_id][i]
                dx = sample_x[n_id] - current.x
                dy = sample_y[n_id] - current.y
                d = math.hypot(dx, dy)
                node = Node(sample_x[n_id], sample_y[n_id],
                            current.cost + d, c_id)

                if n_id in closed_set:
                    continue
                # Otherwise if it is already in the open set
                if n_id in open_set:
                    if open_set[n_id].cost > node.cost:
                        open_set[n_id].cost = node.cost
                        open_set[n_id].parent_index = c_id
                else:
                    open_set[n_id] = node

        if path_found is False:
            return [], []

        # generate final course
        rx, ry = [goal_node.x], [goal_node.y]
        parent_index = goal_node.parent_index
        while parent_index != -1:
            n = closed_set[parent_index]
            rx.append(n.x)
            ry.append(n.y)
            parent_index = n.parent_index

        return rx, ry

In [19]:
# Controller Integration
cc = CollisionChecker()
prm = PRMPlanner(cc)
prm.build_roadmap()
global_path = prm.plan_path((0, 0), (GOAL[0], GOAL[1]))
current_waypoint = 0

def controller(x, y, theta, z):
    global current_waypoint
    if current_waypoint >= len(global_path):
        return 0, 0
    
    target = global_path[current_waypoint]
    if np.hypot(x-target[0], y-target[1]) < 0.1:
        current_waypoint += 1
        if current_waypoint >= len(global_path):
            return 0, 0
    
    lp = LocalPlanner([x, y, theta], list(global_path[current_waypoint]) + [0])
    u, w = lp.compute_controls()
    return u, w

# Run simulation
simulation(controller)