In [1]:
import numpy as np
import os
import torch
from torch import nn, optim
import importlib

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device", device)
print(torch.cuda.get_device_name(0))
print(torch.cuda.get_device_name(1))

Using device cuda:0
GeForce GTX 1660 SUPER
P106-100


In [2]:
from torch.utils.data import Dataset, DataLoader
from data.MinicityDataset import TrafficDataset

dbpath = '/home/rong/disk/database/minicity.db'
bs = 12
train_loader = DataLoader(TrafficDataset(dbpath = dbpath,
                            train = True),
                         batch_size = bs, shuffle=True, drop_last = True)
test_loader = DataLoader(TrafficDataset(dbpath = dbpath,
                            train = False),
                          batch_size = bs, shuffle=True, drop_last = True)

# batch = next(iter(train_loader))
# batch['data']

In [3]:
from models.minicity import convVAE

X_dim = 4
traj_size = 25 * 4
z_dim = 50
cnn_out_size = 300
cond_out_size = 300

model = convVAE(sample_size = X_dim, 
                  traj_size = traj_size,
                  cnnout_size = cnn_out_size, 
                  cond_out_size = cond_out_size, 
                  encoder_layer_sizes = [512,1024,512], 
                  latent_size = z_dim, 
                  decoder_layer_sizes = [512,1024,512]).to(device)
optimizer = optim.Adam(model.parameters(), lr=5e-4)

checkpoint = torch.load('checkpoints/minicity_attention_7.pt')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']

model.eval()
print(model)

convVAE(
  (condnn): CondNN(
    (cnn): Conv3d(
      (adap_pool): AdaptiveAvgPool3d(output_size=(25, 200, 200))
      (conv_layer1): Sequential(
        (0): Conv3d(1, 16, kernel_size=(2, 3, 3), stride=(1, 1, 1))
        (1): LeakyReLU(negative_slope=0.01)
        (2): Conv3d(16, 16, kernel_size=(2, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
        (3): LeakyReLU(negative_slope=0.01)
        (4): MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2), padding=0, dilation=1, ceil_mode=False)
      )
      (conv_layer2): Sequential(
        (0): Conv3d(16, 32, kernel_size=(2, 3, 3), stride=(1, 1, 1))
        (1): LeakyReLU(negative_slope=0.01)
        (2): Conv3d(32, 32, kernel_size=(2, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
        (3): LeakyReLU(negative_slope=0.01)
        (4): MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2), padding=0, dilation=1, ceil_mode=False)
      )
      (conv_layer5): Conv3d(32, 64, kernel_size=(1, 3, 3), stride=(1, 1, 1))
      (adap_pool2): AdaptiveAvg

In [4]:
from utils.MinicityOmpl import getSampleData
from utils.Minicity import plotData, plotOrientSpeed, plotAlpha
# importlib.reload(MinicityOmpl)

test_data = test_loader.dataset
viz_idx =   torch.randint(0,len(test_data),[1]).item()  
#  变道场景idx
# viz_idx = 8753 #弯道
# viz_idx = 143300 #弯道
# viz_idx = 5107 #直行
# viz_idx = 2984 #转盘
# viz_idx = 101538 #转盘
# viz_idx = 153309 #阻塞
# viz_idx = 109779 #变道

print(viz_idx)
startgoal, egoid, occ, y_viz, time_stamp, alpha = getSampleData(model, test_data, viz_idx, device)

%matplotlib
plotData(occ, startgoal, y_viz)
plotOrientSpeed(startgoal, y_viz)
# plotAlpha(alpha)

17655
torch.Size([25, 4])
torch.Size([25, 4])
time_step:  176.0
start [ 0.    0.   -0.   10.75]
goal [17.890003  0.       -0.        6.25    ]
Using matplotlib backend: Qt5Agg
[ 0.        0.       -0.       10.75     17.890003  0.       -0.
  6.25    ]
159.6333440144857 100.0


In [5]:
plotAlpha(alpha)

# commonroad collision

In [6]:
import os
from commonroad.common.file_reader import CommonRoadFileReader

file_path = '/home/rong/VAE-Motion-Planning/scenarios/commonroad_data/minicity.cr.xml'
scenario, _ = CommonRoadFileReader(file_path).open()
from utils.MinicityOmpl import show_scenario
show_scenario(scenario, 0)

In [7]:
from commonroad.scenario.scenario import Scenario
from commonroad_cc.collision_detection.pycrcc_collision_dispatch import create_collision_checker, create_collision_object
import pycrcc
from utils.MinicityOmpl import create_roadcc
carcc = create_collision_checker(scenario)
roadcc = create_roadcc(scenario)

## 碰撞测试

In [8]:
import numpy as np
from commonroad.scenario.trajectory import State, Trajectory
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.geometry.shape import Rectangle
from commonroad.scenario.obstacle import StaticObstacle
from commonroad_cc.collision_detection.pycrcc_collision_dispatch import create_collision_object

# # create a trajectory for the ego vehicle starting at time step 0
time_start = 0
position = np.array([[153, 147]])
orientation = [0]
state_list = list()
for k in range(0, len(position)):
    state_list.append(State(**{'position': position[k], 'orientation':orientation[k]}))
trajectory = Trajectory(time_start, state_list)
# create the shape of the ego vehicle
shape = Rectangle(length=3.9, width=1.9)
# # create a TrajectoryPrediction object consisting of the trajectory and the shape of the ego vehicle
traj_pred = TrajectoryPrediction(trajectory=trajectory, shape=shape)
# create a collision object using the trajectory prediction of the ego vehicle
co = create_collision_object(traj_pred)

# test the trajectory of the ego vehicle for collisions
print('Collision between the trajectory of the ego vehicle and objects in the environment: ', carcc.collide(co))
print('Collision between the trajectory of the ego vehicle and road: ', roadcc.collide(co))
print('Vallid', roadcc.collide(co) and not carcc.collide(co))

Collision between the trajectory of the ego vehicle and objects in the environment:  False
Collision between the trajectory of the ego vehicle and road:  True
Vallid True


# ompl

In [9]:
from ompl import base as ob
from ompl import control as oc
from ompl import app as oa

class ValidityChecker(ob.StateValidityChecker):
    def __init__(self,  si, ccc, ccr, time_stamp):
        super(ValidityChecker, self).__init__(si)
        self.ccc = ccc
        self.ccr = ccr
        self.time_stamp = int(time_stamp)
        self.count = 0
        self.collision_count = 0
        self.states_ok = []
        self.states_bad = []
        # Returns whether the given state's position overlaps the
        # circular obstacle
    def isValid(self, state):
        time_start = self.time_stamp
        position = np.array([[state[0].getX(), state[0].getY()]])
        orientation = [state[0].getYaw()]
        state_list = list()
        for k in range(0, len(position)):
            state_list.append(State(**{'position': position[k], 'orientation':orientation[k]}))
        trajectory = Trajectory(time_start, state_list)
        shape = Rectangle(length=3.9, width=1.9)
        traj_pred = TrajectoryPrediction(trajectory=trajectory, shape=shape)
        co = create_collision_object(traj_pred)

        result = self.ccr.collide(co) and not self.ccc.collide(co)
        self.count += 1
#         print("count", self.count)
        if not result:
            self.collision_count += 1
#             print("collision", self.collision_count)
#             self.states_bad.append(sample)
#         else:
#             self.states_ok.append(sample)
        return result
    
    def clearcount(self):
        self.count = 0
        self.collision_count = 0

car = oa.DynamicCarPlanning()
mychecker = ValidityChecker(car.getSpaceInformation(), carcc, roadcc, 0)
print(mychecker)

<__main__.ValidityChecker object at 0x7ff8fade5880>


In [107]:
from ompl import util as ou
import math
from utils.MinicityOmpl import warp2pi


class MyStateSampler(ob.StateSampler):
    def __init__(self, space, cvae_samples):
        super(MyStateSampler, self).__init__(space)
        self.space = space
        print(self.space)
        self.cvae_samples = cvae_samples
        self.cvae_num = cvae_samples.shape[0]
        self.i = 0
        self.rng_ = ou.RNG()
        print("my state sampler")
        
        boundse2 = self.space.getSubspace(0).getBounds()
        bound2 = self.space.getSubspace(1).getBounds()
        self.xlow = boundse2.low[0]
        self.xhigh = boundse2.high[0]
        self.ylow = boundse2.low[1]
        self.yhigh = boundse2.high[1]
        
        self.vlow = bound2.low[0]
        self.vhigh = bound2.high[0]
        self.vanglelow = bound2.low[1]
        self.vanglehigh = bound2.high[1]
        
#     def __call__(self, i): 
#         return self
        
    def sampleUniform(self, state):
        chance = self.rng_.uniformReal(0, 1)
#         print("my sample uniform")
        
        if chance < 1:
            x = self.cvae_samples[self.i, 0]
            y = self.cvae_samples[self.i, 1]
            yaw = self.cvae_samples[self.i, 2]
            yaw = warp2pi(yaw)
            v = self.cvae_samples[self.i, 3]
            self. i += 1
            if self.i == self.cvae_num:
                self.i = 0
            print("here ", self.i)
        else:
            x = self.rng_.uniformReal(self.xlow, self.xhigh)
            y = self.rng_.uniformReal(self.ylow, self.yhigh)
            yaw = self.rng_.uniformReal(-math.pi, math.pi)
            v = self.rng_.uniformReal(self.vlow, self.vhigh)
            
        vangle = self.rng_.uniformReal(self.vanglelow, self.vanglehigh)
        state[0].setX(float(x))
        state[0].setY(float(y))
        state[0].setYaw(float(yaw))
        state[1][0] = float(v)
        state[1][1] = float(vangle)
        
        return True
    
    def sampleUniformNear(self, state, nearstate, dis):
        print("sample uniform near")
        self.sampleUniform(state)
        
    def sampleGaussian(self, state, nearstate, dis):
        print("sample guassian")
        self.sampleUniform(state)
        
# return an instance of my sampler
def allocMyValidStateSampler(si):
    return MyValidStateSampler(si, y_viz)

def allocMyStateSampler(space):
    return MyStateSampler(space, y_viz)

In [11]:
import sys
from os.path import abspath, dirname, join

from ompl import base as ob
from ompl import control as oc
from ompl import app as oa

from commonroad_cc.visualization.draw_dispatch import draw_object
import matplotlib.pyplot as plt

# from utils.MinicityOmpl import checkpath

def plt_ompl_result(path, time_step):
    if path:
        fig1 = plt.figure(figsize=(10, 6), dpi=80)
        ax1 = fig1.add_subplot(111, aspect='equal')   
        draw_params = {}
        draw_params['time_begin'] = time_step
        draw_object(scenario, draw_params=draw_params)

        path.interpolate(); # uncomment if you want to plot the path

        states = path.getStates()
        for state in states:
            state = state[0]
            plt.scatter(state.getX(), state.getY(), color="green", s=250, edgecolors='black')  # path
        #     for state in collisionchecker.states_ok:
        #         plt.scatter(state[0], state[1], color="green", s=100, edgecolors='green')  # free sample
        #     for state in collisionchecker.states_bad:
        #         plt.scatter(state[0], state[1], color="red", s=100, edgecolors='red')  # collision sample
        # plt.scatter(start().getX(), start().getY(), color="blue", s=250, edgecolors='black')  # init
        # plt.scatter(goal().getX(), goal().getY(), color="red", s=250, edgecolors='black')  # goal

        plt.xlabel('x')
        plt.ylabel('y')
        plt.show()
    else:
        print("no path to plot")

def plttree(planner, startgoal):
    fig1 = plt.figure(figsize=(10, 6), dpi=80)
    ax1 = fig1.add_subplot(111, aspect='equal')
    
    plannerdata = ob.PlannerData(planner.getSpaceInformation())
    planner.getPlannerData(plannerdata)

    print("num edges: ", plannerdata.numEdges())
    print("num vertices: ", plannerdata.numVertices())

    plt.scatter(startgoal[0], startgoal[1], color="blue", s=250, edgecolors='black')  # init
    plt.scatter(startgoal[4], startgoal[5], color="red", s=250, edgecolors='black')  # goal

    num_ver = plannerdata.numVertices()
    for i in range(0, num_ver):
        for j in range(i, num_ver):
            if plannerdata.edgeExists(i, j):
                plt.plot([plannerdata.getVertex(i).getState()[0].getX(), plannerdata.getVertex(j).getState()[0].getX()], [plannerdata.getVertex(i).getState()[0].getY(), plannerdata.getVertex(j).getState()[0].getY()], color='gray')
                
        plt.scatter( plannerdata.getVertex(i).getState()[0].getX(), plannerdata.getVertex(i).getState()[0].getY(), color="green", s=50, edgecolors='black')  # vertice

    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

In [12]:
import math

def dynamicCarMinicity(setup, basepose, startgoal, sampleallocator):
    print("\n\n***** Planning for a %s *****\n" % setup.getName())
    # plan for dynamic car in SE(2)
    stateSpace = setup.getStateSpace()
    sizebound = 60
    # set the bounds for the R^2 part of SE(2)
    bounds = ob.RealVectorBounds(2)
    bounds.setLow(0, basepose[0] - sizebound)
    bounds.setHigh(0, basepose[0] + sizebound)
    bounds.setLow(1, basepose[1]- sizebound)
    bounds.setHigh(1, basepose[1] + sizebound)
    stateSpace.getSubspace(0).setBounds(bounds)
    bounds.setLow(0, -7)
    bounds.setHigh(0, 15)
#     bounds.setLow(1, -0.52)
#     bounds.setHigh(1, 0.52)
    bounds.setLow(1, -0.8)
    bounds.setHigh(1, 0.8)
    stateSpace.getSubspace(1).setBounds(bounds)
    print(setup.getStateSpace().getSubspace(0).getBounds().high[0])
    bounds.setLow(0, -.5)
    bounds.setHigh(0, 2.5)
    bounds.setLow(1, -math.pi * 10/180)
    bounds.setHigh(1, math.pi * 10/180)
    controlSpace = setup.getControlSpace()
    controlSpace.setBounds(bounds)
    print(setup.getControlSpace().getBounds().high[0])

    # define start state
    start = ob.State(stateSpace)
    start[0] = float(startgoal[0])
    start[1] = float(startgoal[1])
    start[2] = float(startgoal[2])
    start[3] = float(startgoal[3])
    start[4] = 0

    print(start)
    # define goal state
    goal = ob.State(stateSpace)
    goal[0] = float(startgoal[4])
    goal[1] = float(startgoal[5])
    goal[2] = float(startgoal[6])
    goal[3] = float(startgoal[7])
    goal[4] = 0
    print(goal)

    # set the start & goal states
    setup.setStartAndGoalStates(start, goal, .5)
    setup.getSpaceInformation().setStateValidityChecker(mychecker)
    setup.getSpaceInformation().getStateSpace().setStateSamplerAllocator(ob.StateSamplerAllocator(sampleallocator))
#     print(setup.getSpaceInformation().getStateValidityChecker())
    planner = oc.RRT(setup.getSpaceInformation())

    
    setup.setPlanner(planner)
#     print(planner.getSpaceInformation().getStateValidityChecker())
    planner.setProblemDefinition(setup.getProblemDefinition())
#     setup.getPlanner().setup()
    # try to solve the problem
#     planner.setNumSamples(30)
    plan_res =  planner.solve(0.2)
    path = planner.getProblemDefinition().getSolutionPath()
    if plan_res:
#         path.interpolate(); # uncomment if you want to plot the path
        print(path.printAsMatrix())
        if plan_res.asString() != 'Exact solution':
            print("Solution is approximate. Distance to actual goal is %g" %
                  planner.getProblemDefinition().getSolutionDifference())
    else:
        print("plan fail")
    return path, planner

In [13]:
from utils.MinicityOmpl import getproblem

test_data = test_loader.dataset
viz_idx =   torch.randint(0,len(test_data),[1]).item()  
#  变道场景idx
# viz_idx = 8753 #弯道
# viz_idx = 143300 #弯道
# viz_idx = 5107 #直行
# viz_idx = 2984 #转盘
# viz_idx = 101538 #转盘
# viz_idx = 153309 #阻塞
# viz_idx = 109779 #变道

print(viz_idx)
startgoal, egoid, occ, y_viz, time_stamp, alpha = getSampleData(model, test_data, viz_idx, device)
%matplotlib
plotData(occ, startgoal, y_viz)

ego = scenario.obstacle_by_id(egoid)
basepose, startgoal = getproblem(scenario, egoid, time_stamp, startgoal)

2312
torch.Size([25, 4])
torch.Size([25, 4])
time_step:  2326.0
start [ 0.      0.     -1.5708  1.34  ]
goal [ 0.        -1.5399933 -1.5708     1.33     ]
Using matplotlib backend: Qt5Agg
[ 0.         0.        -1.5708     1.34       0.        -1.5399933
 -1.5708     1.33     ]
100.0 105.13331095377605


In [14]:
car = oa.DynamicCarPlanning()
car.getSpaceInformation().setPropagationStepSize(0.1)
car.getSpaceInformation().setStateValidityCheckingResolution(0.001)
mychecker = ValidityChecker(car.getSpaceInformation(), carcc, roadcc, 0)
car.getSpaceInformation().setStateValidityChecker(mychecker)
# scenario.remove_obstacle(ego)
path, planner = dynamicCarMinicity(car, basepose, startgoal, allocMyStateSampler)
# scenario.add_objects(ego)
print("Count", mychecker.count)
print("Collision", mychecker.collision_count)
plt_ompl_result(path, int(time_stamp))
plttree(planner, startgoal)
print("Cost", path.length())
print("Distance", planner.getProblemDefinition().getSolutionDifference())



***** Planning for a Dynamic car *****

249.6
2.5
Compound state [
Compound state [
RealVectorState [189.6 193.81]
SO2State [-1.5708]
]
RealVectorState [1.34 0]
]

Compound state [
Compound state [
RealVectorState [189.6 192.27]
SO2State [-1.5708]
]
RealVectorState [1.33 0]
]

<ompl.base._base.CompoundStateSpace object at 0x7ff7b9683760>
my state sampler
here  1
here  2
here  3
189.6 193.81 -1.5708 1.34 0 0 0 0
189.6 193.672 -1.57117 1.41567 -0.00529711 0.756668 -0.0529711 0.1
189.6 193.526 -1.57305 1.51543 -0.0202566 0.997586 -0.149595 0.1
189.579 192.237 -1.60062 1.3496 -0.0225424 -0.184253 -0.00253976 0.9

Count 12
Collision 0
num edges:  3
num vertices:  4
Cost 1.0999999999999999
Distance 0.0


In [117]:
def getDataNosample(model, test_data, viz_idx, device):
    batch = test_data[viz_idx]
    startgoal = torch.from_numpy(batch["start_goal"]).to(device)
    egoid = batch["egoid"]
    time_stamp = batch["timeprob"]
    print("time_step: ", time_stamp)

    startgoal=startgoal.cpu().detach().numpy() * 50
    print("start", startgoal[0:4])
    print("goal", startgoal[4:8])

    return startgoal, egoid, time_stamp

def rollout(test_data, viz_idx, mychecker, allocstatesampler):
    if type(allocstatesampler) == allocMyStateSampler:
        startgoal, egoid, occ, y_viz, time_stamp, alpha = getSampleData(model, test_data, viz_idx, device)
    else:
        startgoal, egoid,  time_stamp= getDataNosample(model, test_data, viz_idx, device)
        
#     %matplotlib
#     plotData(occ, startgoal, y_viz)
    ego = scenario.obstacle_by_id(egoid)
    basepose, startgoal = getproblem(scenario, egoid, time_stamp, startgoal)
    car = oa.DynamicCarPlanning()
    car.getSpaceInformation().setPropagationStepSize(0.1)
    car.getSpaceInformation().setStateValidityCheckingResolution(0.001)
    car.getSpaceInformation().setStateValidityChecker(mychecker)
    # scenario.remove_obstacle(ego)
    path, planner = dynamicCarMinicity(car, basepose, startgoal, allocstatesampler)
    # scenario.add_objects(ego)
    print("Count", mychecker.count)
    print("Collision", mychecker.collision_count)
#     plt_ompl_result(path, int(time_stamp))
#     plttree(planner, startgoal)
    distance = planner.getProblemDefinition().getSolutionDifference()
    print("Distance", distance)
    if path and distance < 5:
        length = path.length()
    else:
        length = 10
        
    collision = mychecker.collision_count
    return length, collision, distance

test_data = test_loader.dataset
costs_att = []
collisions_att = []
distance2goals_att = []
costs_rand = []
collisions_rand = []
distance2goals_rand = []
for i in range(0, 1000):
    viz_idx =   torch.randint(0,len(test_data),[1]).item()  
    print(viz_idx)
    cost, collision, distance2goal = rollout(test_data, viz_idx, mychecker, allocRandomSampler)
    mychecker.clearcount()
    costs_att.append(cost)
    collisions_att.append(collision)
    distance2goals_att.append(distance2goal)
    cost, collision, distance2goal = rollout(test_data, viz_idx, mychecker, allocRandomSampler)
    mychecker.clearcount()
    costs_rand.append(cost)
    collisions_rand.append(collision)
    distance2goals_rand.append(distance2goal)

124001
time_step:  1526.0
start [0.     0.     1.5708 0.83  ]
goal [0.       2.569992 1.5708   2.83    ]


***** Planning for a Dynamic car *****

75.6
2.5
Compound state [
Compound state [
RealVectorState [15.6 170.08]
SO2State [1.5708]
]
RealVectorState [0.83 0]
]

Compound state [
Compound state [
RealVectorState [15.6 172.65]
SO2State [1.5708]
]
RealVectorState [2.83 0]
]

<ompl.base._base.CompoundStateSpace object at 0x7ff7a03ffe40>
my state sampler
15.6 170.08 1.5708 0.83 0 0 0 0
15.6798 171.703 1.43676 2.78433 -0.139147 2.17147 -0.154608 0.9
15.858 172.594 1.31027 3.27618 -0.137415 1.6395 0.00577404 0.3

Solution is approximate. Distance to actual goal is 0.534329
Count 822
Collision 176
Distance 0.5343290090033643
time_step:  1526.0
start [0.     0.     1.5708 0.83  ]
goal [0.       2.569992 1.5708   2.83    ]


***** Planning for a Dynamic car *****

75.6
2.5
Compound state [
Compound state [
RealVectorState [15.6 170.08]
SO2State [1.5708]
]
RealVectorState [0.83 0]
]

Compoun

In [118]:
from statistics import mean 
print("length", len(costs_att))
# print(costs_att)
print("average cost", mean(costs_att))
# print(distance2goals_att)
print("average distance2gaol", mean(distance2goals_att))
# print(collisions_att)
print("average collisions", mean(collisions_att))
# for i in len()

length 1000
average cost 3.6152
average distance2gaol 2.8052194122716623
average collisions 91.501


In [119]:
# print(costs_rand)
print("average cost", mean(costs_rand))
# print(distance2goals_rand)
print("average distance2gaol", mean(distance2goals_rand))
# print(collisions_rand)
print("average collisions",mean(collisions_rand))


average cost 3.5517
average distance2gaol 2.9298732521204442
average collisions 90.457


In [17]:
from ompl import util as ou
import math
from utils.MinicityOmpl import warp2pi


class RandomSampler(ob.StateSampler):
    def __init__(self, space, cvae_samples):
        super(RandomSampler, self).__init__(space)
        self.space = space
        print(self.space)
        self.cvae_samples = cvae_samples
        self.cvae_num = cvae_samples.shape[0]
        self.i = 0
        self.rng_ = ou.RNG()
        print("my state sampler")
        
        boundse2 = self.space.getSubspace(0).getBounds()
        bound2 = self.space.getSubspace(1).getBounds()
        self.xlow = boundse2.low[0]
        self.xhigh = boundse2.high[0]
        self.ylow = boundse2.low[1]
        self.yhigh = boundse2.high[1]
        
        self.vlow = bound2.low[0]
        self.vhigh = bound2.high[0]
        self.vanglelow = bound2.low[1]
        self.vanglehigh = bound2.high[1]
        
#     def __call__(self, i): 
#         return self
        
    def sampleUniform(self, state):
        chance = self.rng_.uniformReal(0, 1)
#         print("my sample uniform")
        
        if chance < 0:
            x = self.cvae_samples[self.i, 0]
            y = self.cvae_samples[self.i, 1]
            yaw = self.cvae_samples[self.i, 2]
            yaw = warp2pi(yaw)
            v = self.cvae_samples[self.i, 3]
            self. i += 1
            if self.i == self.cvae_num:
                self.i = 0
            print("here ", self.i)
        else:
            x = self.rng_.uniformReal(self.xlow, self.xhigh)
            y = self.rng_.uniformReal(self.ylow, self.yhigh)
            yaw = self.rng_.uniformReal(-math.pi, math.pi)
            v = self.rng_.uniformReal(self.vlow, self.vhigh)
            
        vangle = self.rng_.uniformReal(self.vanglelow, self.vanglehigh)
        state[0].setX(float(x))
        state[0].setY(float(y))
        state[0].setYaw(float(yaw))
        state[1][0] = float(v)
        state[1][1] = float(vangle)
        
        return True
    
    def sampleUniformNear(self, state, nearstate, dis):
        print("sample uniform near")
        self.sampleUniform(state)
        
    def sampleGaussian(self, state, nearstate, dis):
        print("sample guassian")
        self.sampleUniform(state)
        
# return an instance of my sampler
def allocRandomStateSampler(si):
    return MyValidStateSampler(si, y_viz)

def allocRandomSampler(space):
    return RandomSampler(space, y_viz)

In [97]:
car = oa.DynamicCarPlanning()
car.getSpaceInformation().setPropagationStepSize(0.1)
car.getSpaceInformation().setStateValidityCheckingResolution(0.001)
mychecker = ValidityChecker(car.getSpaceInformation(), carcc, roadcc, 0)
car.getSpaceInformation().setStateValidityChecker(mychecker)
# scenario.remove_obstacle(ego)
path, planner = dynamicCarMinicity(car, basepose, startgoal, allocRandomSampler)
# scenario.add_objects(ego)
print("Count", mychecker.count)
print("Collision", mychecker.collision_count)
plt_ompl_result(path, int(time_stamp))
plttree(planner, startgoal)
print("Cost", path.length())



***** Planning for a Dynamic car *****

226.3
2.5
Compound state [
Compound state [
RealVectorState [176.3 54.5]
SO2State [-2.5799]
]
RealVectorState [4.7 0]
]

Compound state [
Compound state [
RealVectorState [166.12 43.24]
SO2State [-2.1384]
]
RealVectorState [5.74 0]
]

count 1
<ompl.base._base.CompoundStateSpace object at 0x7fbba22090d0>
my state sampler
count 2
count 3
count 4
count 5
count 6
count 7
count 8
count 9
count 10
count 11
count 12
count 13
count 14
count 15
count 16
count 17
count 18
count 19
count 20
count 21
count 22
count 23
count 24
count 25
count 26
count 27
count 28
count 29
count 30
count 31
count 32
count 33
count 34
count 35
count 36
count 37
count 38
count 39
count 40
count 41
count 42
count 43
count 44
count 45
count 46
count 47
count 48
count 49
count 50
count 51
count 52
count 53
count 54
count 55
count 56
count 57
count 58
count 59
count 60
count 61
count 62
count 63
count 64
count 65
count 66
count 67
count 68
count 69
count 70
count 71
count 72
count