In [0]:
import sys
print(sys.version)

CURR_DIR = '/content/drive/My Drive/google_colab_work/detailed_probabilistic_robotics/'
sys.path.append(CURR_DIR)
sys.path.append(CURR_DIR + 'scripts/')

3.6.9 (default, Apr 18 2020, 01:56:04) 
[GCC 8.4.0]


In [0]:
from mcl import *

3.6.9 (default, Apr 18 2020, 01:56:04) 
[GCC 8.4.0]
3.6.9 (default, Apr 18 2020, 01:56:04) 
[GCC 8.4.0]


  import pandas.util.testing as tm


# KLD sampling

In [0]:
class KldMcl(Mcl):
    def __init__(
        self, map, init_pose, max_num,
        motion_noise_stds={'nn':0.19, 'no':0.001, 'on':0.13, 'oo':0.20},
        distance_std_rate=0.14, direction_std=0.05,
        bin_widths=np.array([0.2,0.2,math.radians(10.0)]), epsilon=0.1, delta=0.01
    ):
        super().__init__(map, init_pose, 1, motion_noise_stds, distance_std_rate, direction_std)
        self.bin_widths = bin_widths
        self.max_num = max_num
        self.epsilon = epsilon
        self.delta = delta
        self.bin_num = 0
    
    '''
    ### Mcl: Base Class
    def motion_update(self, nu, omega, time_interval):
        for particle in self.particles:
            particle.motion_update(nu, omega, time_interval, self.motion_noise_rate_pdf)

    def observation_update(self, obs):
        for particle in self.particles:
            particle.observation_update(obs, self.map, self.distance_std_rate, self.direction_std)
        self.set_max_likeli()
        self.resampling()

    def resampling(self):
        weights = [particle.weight for particle in self.particles]
        if sum(weights) < 1.0e-100:
            weights = [weight + 1.0e-100 for weight in weights]
        resampled_particles = random.choices(self.particles, weights=weights, k=len(self.particles))
        self.particles = [copy.deepcopy(resampled_particle) for resampled_particle in resampled_particles]
        for particle in self.particles:
            particle.weight = 1.0 / len(self.particles)
    '''

    def motion_update(self, nu, omega, time_interval):
        weights = [particle.weight for particle in self.particles]
        if sum(weights) < 1.0e-100:
            weights = [weight + 1.0e-100 for weight in weights]

        ### Simultaneously particle motion updating and resampling
        new_particles = []
        bins = set()
        for i in range(self.max_num):
            chosen_p = random.choices(self.particles, weights=weights)
            p = copy.deepcopy(chosen_p[0])
            p.motion_update(nu, omega, time_interval, self.motion_noise_rate_pdf)

            new_particles.append(p)
            bins.add(tuple(math.floor(bin_idx) for bin_idx in p.pose/self.bin_widths))

            self.bin_num = len(bins) if 1<len(bins) else 2
            if math.ceil(scipy.stats.chi2.ppf(1.0-self.delta, self.bin_num-1)/(2*self.epsilon)) < len(new_particles):
                break;
        
        ### Normalize partcle's weight
        self.particles = new_particles
        for particle in self.particles:
            particle.weight = 1.0 / len(self.particles)

    def observation_update(self, obs):
        for particle in self.particles:
            particle.observation_update(obs, self.map, self.distance_std_rate, self.direction_std)
        self.set_max_likeli()
        # self.resampling() ### Do NOT resample here because resampling will be done in self.motion_update()

    def draw(self, ax, elems):
        super().draw(ax, elems)
        elems.append(ax.text(-4.5, -4.5, "particle:{}, bin:{}".format(len(self.particles),self.bin_num), fontsize=10))

In [0]:
def trial():
    time_span = 30
    time_interval = 0.1
    world = World(time_span, time_interval, debug=False)

    m = Map()
    for landmark in [(2,-3), (3,3)]:
        m.append_landmark(Landmark(*landmark))
    world.append(m)

    init_pose = np.array([0, 0, math.radians(0.0)])
    max_num = 1000
    kld_mcl_estimator = KldMcl(m, init_pose, max_num)

    nu = 0.2
    omega = math.radians(10.0)
    circling_agent = EstimationAgent(nu, omega, time_interval, estimator=kld_mcl_estimator)

    robot = Robot(
        init_pose, agent=circling_agent, sensor=Camera(m), color='red',
        noise_per_meter=0,
        bias_rate_stds=(0, 0),
        expected_stuck_time=sys.maxsize, expected_escape_time=0,
        expected_kidnap_time=sys.maxsize
    )
    world.append(robot)

    world.draw()
    return world

In [0]:
world = trial()
world.ani

Output hidden; open in https://colab.research.google.com to view.

# Adaptive MCL

In [0]:
class AdaptiveMcl(Mcl):
    def __init__(
        self, map, init_pose, num,
        motion_noise_stds={'nn':0.19, 'no':0.001, 'on':0.13, 'oo':0.20},
        distance_std_rate=0.14, direction_std=0.05,
        amcl_params={'slow':0.001, 'fast':0.1, 'nu':3.0}
    ):
        super().__init__(map, init_pose, num, motion_noise_stds, distance_std_rate, direction_std)
        self.amcl_params = amcl_params
        self.alpha_slow = 1.0
        self.alpha_fast = 1.0
            
    '''
    ### Mcl: Base Class
    def observation_update(self, obs):
        for particle in self.particles:
            particle.observation_update(obs, self.map, self.distance_std_rate, self.direction_std)
        self.set_max_likeli()
        self.resampling()

    def resampling(self):
        weights = [particle.weight for particle in self.particles]
        if sum(weights) < 1.0e-100:
            weights = [weight + 1.0e-100 for weight in weights]
        resampled_particles = random.choices(self.particles, weights=weights, k=len(self.particles))
        self.particles = [copy.deepcopy(resampled_particle) for resampled_particle in resampled_particles]
        for particle in self.particles:
            particle.weight = 1.0 / len(self.particles)
    '''

    def observation_update(self, obs):
        for particle in self.particles:
            particle.observation_update(obs, self.map, self.distance_std_rate, self.direction_std)
        self.set_max_likeli()
        self.adaptive_resetting(obs)

    def adaptive_resetting(self, obs):
        if len(obs) == 0:
            return

        alpha = sum([particle.weight for particle in self.particles])
        self.alpha_slow += self.amcl_params['slow']*(alpha - self.alpha_slow)
        self.alpha_fast += self.amcl_params['fast']*(alpha - self.alpha_fast)
        reset_num = len(self.particles)*max(0, 1.0 - self.amcl_params['nu']*self.alpha_fast/self.alpha_slow)
        if 0 < reset_num:
            print('reset_num =', reset_num)

        self.resampling()

        nearest_obs_idx = np.argmin([one_obs[0][0] for one_obs in obs])
        nearest_z, nearest_landmark_id = obs[nearest_obs_idx]
        for n in range(int(reset_num)):
            particle = random.choices(self.particles)[0]
            self.sensor_resetting_draw(particle, self.map.landmarks[nearest_landmark_id].pos, *nearest_z)
        
    def sensor_resetting_draw(self, particle, landmark_pos, obs_dist, obs_phi):
        psi = np.random.uniform(-math.pi, math.pi) # [rad]
        dist = scipy.stats.norm(loc=obs_dist, scale=(self.distance_std_rate*obs_dist)**2).rvs() # [m]
        particle.pose[0] = landmark_pos[0] + dist*math.cos(psi)
        particle.pose[1] = landmark_pos[0] + dist*math.sin(psi)

        phi = scipy.stats.norm(loc=obs_phi, scale=self.direction_std**2).rvs() # [rad]
        particle.pose[2] = math.atan2(landmark_pos[1]-particle.pose[1], landmark_pos[0]-particle.pose[0]) - phi
        particle.weight = 1.0 / len(self.particles)

In [0]:
def trial(animation): 
    time_span = 30
    time_interval = 0.1
    world = World(time_span, time_interval, debug=not animation) 

    m = Map()
    for landmark in [(-4,2), (2,-3), (3,3)]:
        m.append_landmark(Landmark(*landmark))
    world.append(m)

    init_pose = np.array([
        np.random.uniform(-5.0, 5.0),
        np.random.uniform(-5.0, 5.0),
        math.radians(np.random.uniform(-180.0, 180))]
    )
    num = 300
    amcl_estimator = AdaptiveMcl(m, init_pose, num)

    nu = 0.2
    omega = math.radians(10.0)
    circling_agent = EstimationAgent(nu, omega, time_interval, estimator=amcl_estimator)

    robot_pose = np.array([
        np.random.uniform(-3.0, 3.0),
        np.random.uniform(-3.0, 3.0),
        math.radians(np.random.uniform(-180.0, 180))]
    )
    robot = Robot(robot_pose, agent=circling_agent, sensor=Camera(m, phantom_prob=0.1), color='red')
    world.append(robot)

    world.draw()
    return world

In [0]:
world = trial(animation=True)
world.ani

Output hidden; open in https://colab.research.google.com to view.