In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
X_dim = 6
c_dim = 21
gridSize = 100
z_dim = 5

bs = 256

In [3]:
import fcl

def obs2fcl(obs, dimW):
    obs_fcl = []
    for i in range(0, obs.shape[0] // (2 * dimW)):  # plot obstacle patches
        width = obs[i * 2 * dimW + dimW] - obs[i * 2 * dimW]
        height = obs[i * 2 * dimW + dimW + 1] - obs[i * 2 * dimW + 1]
    #     width = height = 1
        ob = fcl.Box(width, height, 1)
        x = obs[i * 2 * dimW] + width/2
        y = obs[i * 2 * dimW + 1] + height/2
    #     x = y = 0
        T = np.array([x, y, 0])
        t = fcl.Transform(T)
        co = fcl.CollisionObject(ob, t)
        obs_fcl.append(co)
#         print("x: ", x, " y: ", y, " width: ", width, " height: ", height)

    obs_manager = fcl.DynamicAABBTreeCollisionManager()
    obs_manager.registerObjects(obs_fcl)
    obs_manager.setup()
    return obs_manager

In [6]:
import sys
try:
    from ompl import util as ou
    from ompl import base as ob
    from ompl import geometric as og
except ImportError:
    # if the ompl module is not in the PYTHONPATH assume it is installed in a
    # subdirectory of the parent directory called "py-bindings."
    from os.path import abspath, dirname, join
    sys.path.insert(0, join(dirname(dirname(abspath(__file__))), 'py-bindings'))
    from ompl import util as ou
    from ompl import base as ob
    from ompl import geometric as og
from math import sqrt
import argparse

# Keep these in alphabetical order and all lower case
def allocatePlanner(si, plannerType):
    if plannerType.lower() == "bfmtstar":
        return og.BFMT(si)
    elif plannerType.lower() == "bitstar":
        return og.BITstar(si)
    elif plannerType.lower() == "fmtstar":
        return og.FMT(si)
    elif plannerType.lower() == "informedrrtstar":
        return og.InformedRRTstar(si)
    elif plannerType.lower() == "prmstar":
        return og.PRMstar(si)
    elif plannerType.lower() == "rrtstar":
        return og.RRTstar(si)
    elif plannerType.lower() == "sorrtstar":
        return og.SORRTstar(si)
    else:
        ou.OMPL_ERROR("Planner-type is not implemented in allocation function.")
        
from utils.NarrowPassage import gap2obs

def plt_ompl_result(condition, start, goal, path, collisionchecker, spaceinfo, planner):
    fig1 = plt.figure(figsize=(10, 6), dpi=80)
    ax1 = fig1.add_subplot(111, aspect='equal')
    obs, dimW = gap2obs(condition)
    for i in range(0, obs.shape[0] // (2 * dimW)):  # plot obstacle patches
        ax1.add_patch(
            patches.Rectangle(
                (obs[i * 2 * dimW], obs[i * 2 * dimW + 1]),  # (x,y)
                obs[i * 2 * dimW + dimW] - obs[i * 2 * dimW],  # width
                obs[i * 2 * dimW + dimW + 1] - obs[i * 2 * dimW + 1],  # height
                alpha=0.6
            ))
#         ax1.add_patch(
#             patches.Rectangle(
#                 (1-obs[i * 2 * dimW], obs[i * 2 * dimW + 1]),  # (x,y)
#                 obs[i * 2 * dimW + dimW] - obs[i * 2 * dimW],  # width
#                 obs[i * 2 * dimW + dimW + 1] - obs[i * 2 * dimW + 1],  # height
#                 alpha=0.6
#             ))
    gridSize = 11
    
    plannerdata = ob.PlannerData(spaceinfo)
    planner.getPlannerData(plannerdata)
    print("num edges: ", plannerdata.numEdges())
    print("num vertices: ", plannerdata.numVertices())
    num_ver = plannerdata.numVertices()
    num_edge = plannerdata.numEdges()
    if num_edge > 3000:
        print("too much edges")
        edge_vis = False
    else:
        edge_vis = True
    for i in range(0, num_ver):
        plt.scatter( plannerdata.getVertex(i).getState().getX(), plannerdata.getVertex(i).getState().getY(), color="green", s=50, edgecolors='black')  # vertice
        if edge_vis:
            for j in range(0, num_ver):
                if plannerdata.edgeExists(i, j):
                    plt.plot([plannerdata.getVertex(i).getState().getX(), plannerdata.getVertex(j).getState().getX()], [plannerdata.getVertex(i).getState().getY(), plannerdata.getVertex(j).getState().getY()], color='gray')
                
    path.interpolate()
    states = path.getStates()
    for state in states:
        plt.scatter(state.getX(), state.getY(), color="green", s=250, edgecolors='black')  # path
#     for state in collisionchecker.states_ok:
#         plt.scatter(state[0], state[1], color="green", s=100, edgecolors='green')  # free sample
#     for state in collisionchecker.states_bad:
#         plt.scatter(state[0], state[1], color="red", s=100, edgecolors='red')  # collision sample
    plt.scatter(start().getX(), start().getY(), color="blue", s=250, edgecolors='black')  # init
    plt.scatter(goal().getX(), goal().getY(), color="red", s=250, edgecolors='black')  # goal
    
    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

In [None]:
class MyValidStateSampler(ob.ValidStateSampler):
    def __init__(self, si, cvae_samples):
        super(MyValidStateSampler, self).__init__(si)
        print("my valid state sampler")
        self.name_ = "my sampler"
        self.cvae_samples = cvae_samples
        self.cvae_num = cvae_samples.shape[0]
        print("cvae_num", self.cvae_num)
        self.i = 0
        self.random_count = 0
        self.rng_ = ou.RNG()
        
#     def __call__(self, i): 
#         return self
        
    def sample(self, state):
        chance = self.rng_.uniformReal(0, 1)
#         print("sample")
        
        if chance < 0.4:
            x = self.cvae_samples[self.i, 0]
            y = self.cvae_samples[self.i, 1]
            self. i += 1
            if self.i == self.cvae_num:
                self.i = 0
#             print("cvae: ", self.i)
        else:
            x = self.rng_.uniformReal(0, 1)
            y = self.rng_.uniformReal(0, 1)
            self.random_count += 1
#             print("random: ", self.random_count)
        state.setX(float(x))
        state.setY(float(y))
        return True
    
class MyStateSampler(ob.StateSampler):
    def __init__(self, space, cvae_samples):
        super(MyStateSampler, self).__init__(space)
        self.cvae_samples = cvae_samples
        self.cvae_num = cvae_samples.shape[0]
        self.i = 0
        self.rng_ = ou.RNG()
#         print("my state sampler")
#     def __call__(self, i): 
#         return self
        
    def sampleUniform(self, state):
        chance = self.rng_.uniformReal(0, 1)
#         print("my sample uniform")
        
        if chance < 0.35:
            x = self.cvae_samples[self.i, 0]
            y = self.cvae_samples[self.i, 1]
            self. i += 1
            if self.i == self.cvae_num:
                self.i = 0
#             print("here ", self.i)
        else:
            x = self.rng_.uniformReal(0, 1)
            y = self.rng_.uniformReal(0, 1)
        state.setX(float(x))
        state.setY(float(y))
        return True
    
    def sampleUniformNear(self, state, nearstate, dis):
        print("sample uniform near")
        self.sampleUniform(state)
        
    def sampleGaussian(self, state, nearstate, dis):
        print("sample guassian")
        self.sampleUniform(state)
        
# return an instance of my sampler
def allocMyValidStateSampler(si):
    return MyValidStateSampler(si, y_viz)

def allocMyStateSampler(space):
    return MyStateSampler(space, y_viz)

class ValidityChecker(ob.StateValidityChecker):
    def __init__(self,  si, fcl_manager, space):
        super(ValidityChecker, self).__init__(si)
        self.space = space
        self.fcl_manager = fcl_manager
        self.count = 0
        self.collision_count = 0
        self.states_ok = []
        self.states_bad = []
        # Returns whether the given state's position overlaps the
        # circular obstacle
    def isValid(self, state):
#         print("collision")
        sample = np.array([state.getX(), state.getY(), 0])
#         sample = np.array([state().getX(), state().getY(), state().getYaw()])
        req = fcl.CollisionRequest(num_max_contacts=100, enable_contact=True)
        rdata = fcl.CollisionData(request = req)

        cyl = fcl.Cylinder(0.01, 2)
        t = fcl.Transform(sample)
        agent = fcl.CollisionObject(cyl, t)

        self.fcl_manager.collide(agent, rdata, fcl.defaultCollisionCallback)
#         if(rdata.result.is_collision):
#             print("state: ", sample, " collision: ", rdata.result.is_collision)
#         print ('Collision between manager 1 and agent?: {}'.format(rdata.result.is_collision))
#         print( 'Contacts:')
#         for c in rdata.result.contacts:
#             print( '\tO1: {}, O2: {}'.format(c.o1, c.o2))
        self.count += 1
        if(rdata.result.is_collision):
            self.collision_count += 1
#             self.states_bad.append(sample)
#         else:
#             self.states_ok.append(sample)
        return not rdata.result.is_collision

In [5]:
# fmt_star
sucess = 0
total = 0
total_length = 0
total_collision_detection = 0
total_collision = 0

for i in range(0,10):
#     state space
    space = ob.SE2StateSpace()
    # set lower and upper bounds
    bounds = ob.RealVectorBounds(2)
    bounds.setLow(0)
    bounds.setHigh(1)
    space.setBounds(bounds)
    # construct an instance of space information from this state space
    si = ob.SpaceInformation(space)
    # set state validity checking for this space
    obs_manager = obs2fcl(obs, dimW)
    mychecker = ValidityChecker(si, obs_manager, space)
    si.setStateValidityChecker(mychecker)
    si.setStateValidityCheckingResolution(0.03)
    si.getStateSpace().setStateSamplerAllocator(ob.StateSamplerAllocator(allocMyStateSampler))
    si.setup()

# 起点终点
    start = ob.State(space)
    start.random()
    start().setX(float(startend[0]))
    start().setY(float(startend[1]))
    # create a random goal state
    goal = ob.State(space)
    goal.random()
    goal().setX(float(startend[6]))
    goal().setY(float(startend[7]))
    # create a problem instance
    pdef = ob.ProblemDefinition(si)
    pdef.setOptimizationObjective(ob.PathLengthOptimizationObjective(si))
    # set the start and goal states
    pdef.setStartAndGoalStates(start, goal)

    optimizingPlanner = allocatePlanner(si, 'fmtstar')
#     限制采1000个点
    optimizingPlanner.setNumSamples(500)
    optimizingPlanner.setExtendedFMT(False)
#     optimizingPlanner.setFreeSpaceVolume(1)
#     optimizingPlanner.setRadiusMultiplier(5)
    # set the problem we are trying to solve for the planner
    optimizingPlanner.setProblemDefinition(pdef)
    # perform setup steps for the planner
    optimizingPlanner.setup()
    
    solved = optimizingPlanner.solve(0.08)
    print("collision detection: ", mychecker.count)
    total_collision_detection += mychecker.count
    print("collision num: ", mychecker.collision_count)
    total_collision += mychecker.collision_count

    total += 1
    if solved.asString() == 'Exact solution':
#     print(solved)
#     if solved:
        # get the goal representation from the problem definition (not the same as the goal state)
        # and inquire about the found path
        path = pdef.getSolutionPath()
        if path:
            print("path length: ", pdef.getSolutionPath().length())
            total_length += pdef.getSolutionPath().length()
    #     print("Found solution:\n%s" % path)
            plt_ompl_result(con, start, goal,path, mychecker, si, optimizingPlanner)
        sucess += 1
    else:
        print("No solution found")

from termcolor import colored
print(colored("success rate: ", 'red'), sucess/total)
print(colored("average collision detection:", 'red'), total_collision_detection/total)
print(colored("average collision: ", "red"), total_collision/total)
if sucess > 0:
    print(colored("average length: ", "red"),  total_length/sucess)

NameError: name 'obs' is not defined