# 第5章 パーティクルフィルタによる自己位置推定

In [None]:
import os
DATA_DIR = "LNPR_BOOK_CODES/section_uncertainty/"
os.chdir(DATA_DIR)

## 5.2 パーティクルの準備

In [None]:
import sys                                   ###mcl1 全部のセルを掲載
sys.path.append('../scripts/')
from robot import *

In [None]:
class EstimationAgent(Agent):
    def __init__(self, nu, omega):
        super().__init__(nu, omega)
        
    def draw(self, ax, elems):
        elems.append(ax.text(0, 0, "hoge", fontsize=10))

In [None]:
world = World(30, 0.1)  

### 地図を生成して3つランドマークを追加 ###
m = Map()
for ln in [(-4,2), (2,-3), (3,3)]: m.append_landmark(Landmark(*ln))
world.append(m)          

### ロボットを作る ###
initial_pose = np.array([2, 2, math.pi/6]).T
circling = EstimationAgent(0.2, 10.0/180*math.pi) 
r = Robot(initial_pose, sensor=Camera(m), agent=circling) 
world.append(r)

### アニメーション実行 ###
world.draw()

In [None]:
import sys         
sys.path.append('../scripts/')
from robot import *

In [None]:
class Particle:    ###particle_class###
    def __init__(self, init_pose):
        self.pose = init_pose

In [None]:
class Mcl:        ###Mcl_class###
    def __init__(self, init_pose, num):
        self.particles = [Particle(init_pose) for i in range(num)]

In [None]:
class EstimationAgent(Agent):  ###EstimationAgent2###
    def __init__(self, nu, omega, estimator): #引数を追加
        super().__init__(nu, omega)
        self.estimator = estimator
        
    def draw(self, ax, elems):
        elems.append(ax.text(0, 0, "hoge", fontsize=10))

In [None]:
world = World(30, 0.1)   ###1--15行目をmcl2_12として1行掲載

### 地図を生成して3つランドマークを追加 ###
m = Map()
for ln in [(-4,2), (2,-3), (3,3)]: m.append_landmark(Landmark(*ln))
world.append(m)       

### ロボットを作る ###
initial_pose = np.array([2, 2, math.pi/6]).T
estimator = Mcl(initial_pose, 100)                                     #パーティクルファイルタを作る
circling = EstimationAgent(0.2, 10.0/180*math.pi, estimator)  #estimatorを渡す
r = Robot(initial_pose, sensor=Camera(m), agent=circling) 
world.append(r)

### アニメーション実行 ###
world.draw()

In [None]:
import sys         
sys.path.append('../scripts/')
from robot import *

In [None]:
class Particle:
    def __init__(self, init_pose):
        self.pose = init_pose

In [None]:
class Mcl:  ###Mcl3 (1,2,5行目以降を記載)
    def __init__(self, init_pose, num):
        self.particles = [Particle(init_pose) for i in range(num)]
        
    def draw(self, ax, elems):   #追加
        xs = [p.pose[0] for p in self.particles]
        ys = [p.pose[1] for p in self.particles]
        vxs = [math.cos(p.pose[2]) for p in self.particles] 
        vys = [math.sin(p.pose[2]) for p in self.particles] 
        elems.append(ax.quiver(xs, ys, vxs, vys, color="blue", alpha=0.5))

In [None]:
class EstimationAgent(Agent):  ###EstimationAgent3 (1,2,6,7行目を記載)
    def __init__(self, nu, omega, estimator): 
        super().__init__(nu, omega)
        self.estimator = estimator
        
    def draw(self, ax, elems):   #追加
        self.estimator.draw(ax, elems)

In [None]:
world = World(30, 0.1)  

### 地図を生成して3つランドマークを追加 ###
m = Map()                                  
m.append_landmark(Landmark(-4,2))
m.append_landmark(Landmark(2,-3))
m.append_landmark(Landmark(3,3))
world.append(m)          

### ロボットを作る ###
initial_pose = np.array([2, 2, math.pi/6]).T
estimator = Mcl(initial_pose, 100)
circling = EstimationAgent(0.2, 10.0/180*math.pi, estimator) 
r = Robot(initial_pose, sensor=Camera(m), agent=circling) 
world.append(r)

### アニメーション実行 ###
world.draw()

## 5.3 移動後のパーティクルの姿勢更新

### 5.3.2 状態遷移モデルの実装

In [None]:
import sys         ###mcl4header
sys.path.append('../scripts/')
from robot import *
from scipy.stats import multivariate_normal  #追加

In [None]:
class Particle:
    def __init__(self, init_pose):
        self.pose = init_pose

In [None]:
class Mcl: ###Mcl4（draw以下省略）###
    def __init__(self, init_pose, num, motion_noise_stds):   #引数追加
        self.particles = [Particle(init_pose) for i in range(num)]
        
        v = motion_noise_stds #5-7行目追加
        c = np.diag([v["nn"]**2, v["no"]**2, v["on"]**2, v["oo"]**2])
        self.motion_noise_rate_pdf = multivariate_normal(cov=c)
        
    def motion_update(self, nu, omega, time): #追加
        print(self.motion_noise_rate_pdf.cov)
        
    def draw(self, ax, elems): 
        xs = [p.pose[0] for p in self.particles]
        ys = [p.pose[1] for p in self.particles]
        vxs = [math.cos(p.pose[2]) for p in self.particles] 
        vys = [math.sin(p.pose[2]) for p in self.particles] 
        elems.append(ax.quiver(xs, ys, vxs, vys, color="blue", alpha=0.5))

In [None]:
class EstimationAgent(Agent):   ###EstimationAgent4###
    def __init__(self, time_interval, nu, omega, estimator): #time_intervalを追加
        super().__init__(nu, omega)
        self.estimator = estimator
        self.time_interval = time_interval #追加
        
    def draw(self, ax, elems):
        self.estimator.draw(ax, elems)

In [None]:
initial_pose = np.array([0, 0, 0]).T   ###mcl_test4（下の行列出力も掲載）###
estimator = Mcl(initial_pose, 100, motion_noise_stds={"nn":0.01, "no":0.02, "on":0.03, "oo":0.04})
a = EstimationAgent(0.1, 0.2, 10.0/180*math.pi, estimator)
estimator.motion_update(0.2, 10.0/180*math.pi, 0.1)

In [None]:
print(np.diag([1,2])) ###diag###

In [None]:
import sys 
sys.path.append('../scripts/')
from robot import *
from scipy.stats import multivariate_normal

In [None]:
class Particle: ###Particle5
    def __init__(self, init_pose):
        self.pose = init_pose
        
    def motion_update(self, nu, omega, time, noise_rate_pdf): #追加
        ns = noise_rate_pdf.rvs() #順にnn, no, on, oo
        noised_nu = nu + ns[0]*math.sqrt(abs(nu)/time) + ns[1]*math.sqrt(abs(omega)/time)
        noised_omega = omega + ns[2]*math.sqrt(abs(nu)/time) + ns[3]*math.sqrt(abs(omega)/time)
        self.pose = IdealRobot.state_transition(noised_nu, noised_omega, time, self.pose)

In [None]:
class Mcl: 
    def __init__(self, init_pose, num, motion_noise_stds): 
        self.particles = [Particle(init_pose) for i in range(num)]
        
        v = motion_noise_stds
        c = np.diag([v["nn"]**2, v["no"]**2, v["on"]**2, v["oo"]**2])
        self.motion_noise_rate_pdf = multivariate_normal(cov=c)
        
    def motion_update(self, nu, omega, time): ###Mcl5###
        for p in self.particles: p.motion_update(nu, omega, time, self.motion_noise_rate_pdf)
        
    def draw(self, ax, elems): 
        xs = [p.pose[0] for p in self.particles]
        ys = [p.pose[1] for p in self.particles]
        vxs = [math.cos(p.pose[2]) for p in self.particles] 
        vys = [math.sin(p.pose[2]) for p in self.particles] 
        elems.append(ax.quiver(xs, ys, vxs, vys, color="blue", alpha=0.5))

In [None]:
class EstimationAgent(Agent):        ###EstimationAgent5
    def __init__(self, time_interval, nu, omega, estimator):
        super().__init__(nu, omega)
        self.estimator = estimator
        self.time_interval = time_interval
        
        self.prev_nu = 0.0       #追加
        self.prev_omega = 0.0 #追加
        
    def decision(self, observation=None): #追加
        self.estimator.motion_update(self.prev_nu, self.prev_omega, self.time_interval)
        self.prev_nu, self.prev_omega = self.nu, self.omega
        return self.nu, self.omega
        
    def draw(self, ax, elems):
        self.estimator.draw(ax, elems)

In [None]:
initial_pose = np.array([0, 0, 0]).T   ###mcl5_test（下の出力の先頭数行を記載）
estimator = Mcl(initial_pose, 100, motion_noise_stds={"nn":0.01, "no":0.02, "on":0.03, "oo":0.04})
a = EstimationAgent(0.1, 0.2, 10.0/180*math.pi, estimator)
estimator.motion_update(0.2, 10.0/180*math.pi, 0.1)
for p in estimator.particles:
    print(p.pose)

In [None]:
def trial(motion_noise_stds): ###draw_mcl5###（下の絵も）
    time_interval = 0.1 
    world = World(30, time_interval) 

    initial_pose = np.array([0, 0, 0]).T
    estimator = Mcl(initial_pose, 100, motion_noise_stds)
    circling = EstimationAgent(time_interval, 0.2, 10.0/180*math.pi, estimator)
    r = Robot(initial_pose, sensor=None, agent=circling, color="red" )
    world.append(r)

    world.draw()
    
trial({"nn":0.01, "no":0.02, "on":0.03, "oo":0.04})

### 5.3.3 パラメータの調整

In [None]:
import sys     ###motion_test_header
sys.path.append('../scripts/')
from robot import *

In [None]:
import copy    ###motion_test_linear

world = World(40.0, 0.1)  

initial_pose = np.array([0, 0, 0]).T
robots = []
r = Robot(initial_pose, sensor=None, agent=Agent(0.1, 0.0)) 

for i in range(100):
    copy_r = copy.copy(r)
    copy_r.distance_until_noise = copy_r.noise_pdf.rvs() #最初に雑音が発生するタイミングを変える
    world.append(copy_r)     #worldに登録することでアニメーションの際に動く
    robots.append(copy_r)   #オブジェクトの参照のリストにロボットのオブジェクトを登録

world.draw()

In [None]:
import pandas as pd ###motion_test_stats
poses = pd.DataFrame([ [math.sqrt(r.pose[0]**2 + r.pose[1]**2), r.pose[2]] for r in robots], 
                     columns=['r', 'theta'])
poses.transpose() #縦横を入れ替えて表示

In [None]:
print(poses["theta"].var()) ###motion_test_sigma_omeganu
print(poses["r"].mean())
math.sqrt(poses["theta"].var()/poses["r"].mean())

In [None]:
import sys 
sys.path.append('../scripts/')
from robot import *

In [None]:
world = World(40.0, 0.1)      ###motion_test_forward_bias

initial_pose = np.array([0, 0, 0]).T
robots = []

for i in range(100):
    r = Robot(initial_pose, sensor=None, agent=Agent(0.1, 0.0))  #ここで生成されるロボットは異なるバイアスを持つ
    world.append(r)
    robots.append(r)

world.draw()

In [None]:
import pandas as pd ###motion_test_stats_bias
poses = pd.DataFrame([ [math.sqrt(r.pose[0]**2 + r.pose[1]**2), r.pose[2]] for r in robots], columns=['r', 'theta'])
poses.transpose()

In [None]:
print(poses["r"].var()) 
print(poses["r"].mean())
math.sqrt(poses["r"].var()/poses["r"].mean())