# 確率ロボティクスの勉強

## ロボットのシミュレーション

In [1]:
%matplotlib nbagg
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.animation as anm

import math
import numpy as np
from scipy.stats import expon, norm, multivariate_normal
from matplotlib.patches import Ellipse

In [2]:
class World:
    def __init__(self):
        self.objects = []
    
    def append(self, obj):
        self.objects.append(obj)
    
    def draw(self):
        fig = plt.figure(figsize=(4,4))
        ax = fig.add_subplot(111)
        ax.set_aspect("equal")
        ax.set_xlim(-5, 5)
        ax.set_ylim(-5,5)
        
        for obj in self.objects:
            obj.draw(ax)
        plt.show()
    

In [3]:
world = World()
world.draw()

<IPython.core.display.Javascript object>

In [4]:
class MyRobot:
    def __init__(self, pose, color="red"):
        self.pose = pose
        self.r = 0.2
        self.color = color
    def draw(self, ax):
        x,y = self.pose
        robot = patches.Circle(xy=(x,y), radius=self.r, color=self.color)
        ax.add_patch(robot)

In [5]:
world = World()

robot1 = MyRobot(np.array([2,3]))
robot2 = MyRobot(np.array([-2,-1]), "black")
world.append(robot1)
world.append(robot2)
world.draw()

<IPython.core.display.Javascript object>

### アニメーション化

In [6]:
class World:
    def __init__(self):
        self.objects = []
    
    def append(self, obj):
        self.objects.append(obj)
    
    def draw(self):
        fig = plt.figure(figsize=(4,4))
        ax = fig.add_subplot(111)
        ax.set_aspect("equal")
        ax.set_xlim(-5, 5)
        ax.set_ylim(-5, 5)
        
        elems = []
        
        self.ani = anm.FuncAnimation(fig, self.one_step, fargs=(elems, ax), frames=10, interval=1000, repeat=False)
        plt.show()
    
    def one_step(self, i, elems, ax):
        while elems:
            elems.pop().remove()
        elems.append(ax.text(-4.4, 4.5, "t="+str(i), fontsize=10))
        for obj in self.objects:
            obj.draw(ax, elems)

In [7]:
class MyRobot:
    def __init__(self, pose, color="red"):
        self.pose = pose
        self.r = 0.2
        self.color = color
    def draw(self, ax, elems):
        x,y = self.pose
        robot = patches.Circle(xy=(x,y), radius=self.r, color=self.color)
        elems.append(ax.add_patch(robot))

In [8]:
world = World()

robot1 = MyRobot(np.array([2,3]))
robot2 = MyRobot(np.array([-2,-1]), "black")
world.append(robot1)
world.append(robot2)
world.draw()

<IPython.core.display.Javascript object>

### 状態遷移の実装

In [9]:
class MyRobot:
    def __init__(self, pose, color="red"):
        self.pose = pose
        self.r = 0.2
        self.color = color
        
    def draw(self, ax, elems):
        x,y = self.pose
        robot = patches.Circle(xy=(x,y), radius=self.r, color=self.color)
        elems.append(ax.add_patch(robot))
        
    def state_transition(self, v_x, v_y, time): #x軸の速度, y軸の速度, 移動時間
        self.pose = self.pose + np.array([v_x*time, v_y*time]) 
    
    

In [10]:
my_robot = MyRobot(np.array([2,3]))
print(my_robot.pose)
my_robot.state_transition(2,3,1)
print(my_robot.pose)

[2 3]
[4 6]


### エージェントの実装

In [11]:
class World:
    def __init__(self, time_span, time_interval, debug=False): #シミュレーション時間, シミュレーション更新周期
        self.objects = []
        self.debug = debug
        self.time_span = time_span
        self.time_interval = time_interval
    
    def append(self, obj):
        self.objects.append(obj)
    
    def draw(self):
        fig = plt.figure(figsize=(4,4))
        ax = fig.add_subplot(111)
        ax.set_aspect("equal")
        ax.set_xlim(-5, 5)
        ax.set_ylim(-5, 5)
        
        elems = []
        if self.debug:        
            for i in range(int(self.time_span/self.time_interval)): self.one_step(i, elems, ax)
        else:
            self.ani = anm.FuncAnimation(fig, self.one_step, fargs=(elems, ax), frames=int(self.time_span/self.time_interval)+1, interval=int(self.time_interval*1000), repeat=False)
        plt.show()
    
    def one_step(self, i, elems, ax):
        while elems:
            elems.pop().remove()
        time_str = "t=%.2f[s]" % (self.time_interval*i)
        elems.append(ax.text(-4.4, 4.5, time_str, fontsize=10))
        for obj in self.objects:
            obj.draw(ax, elems)
            if hasattr(obj, "one_step"):
                obj.one_step(self.time_interval)

In [12]:
class MyRobot:
    def __init__(self, pose, agent=None, sensor=None, color="red"):
        self.pose = pose
        self.r = 0.2
        self.color = color
        self.agent = agent
        self.poses = [] #移動の軌跡を保存
        self.sensor = sensor
        
    def draw(self, ax, elems):
       
        #初期値or状態遷移後のposeを取得
        x,y = self.pose
        robot = patches.Circle(xy=(x,y), radius=self.r, color=self.color)
        elems.append(ax.add_patch(robot))
        
        self.poses.append(np.array([x,y]).T)
        poses_x = [e[0] for e in self.poses]
        poses_y = [e[1] for e in self.poses]
        elems += ax.plot(poses_x, poses_y, linewidth=0.5, color="black")
        
        if self.sensor and len(self.poses) > 1:
            #状態遷移前の姿勢で観測しているのでposes[-2] (一つ前の姿勢値から線分の計算)
            self.sensor.draw(ax, elems, self.poses[-2])
        
    def state_transition(self, v_x, v_y, time): #x軸の速度, y軸の速度, 移動時間
        self.pose += np.array([v_x*time, v_y*time])
    
    def one_step(self, time_interval):
        if self.agent:
            #観測
            obs = self.sensor.data(self.pose) if self.sensor else None
            # >>> 現在の姿勢と観測結果がわかった状態
            
            #エージェントによる制御値の決定
            v_x, v_y = self.agent.decision(obs)
            
            #状態遷移
            self.state_transition(v_x, v_y, time_interval)
            # >>> 現在の姿勢が更新, 観測はまだ
            

In [13]:
class Agent:
    def __init__(self, v_x, v_y):
        self.v_x = v_x
        self.v_y = v_y
        self.counter =0
    
    def decision(self, observation=None):
        self.counter += 1
        return self.v_x, self.v_y

In [14]:
world = World(10,1)
agent1 = Agent(0.5,0.0)
agent2 = Agent(0.5,0.5)
robot1 = MyRobot(np.array([-3.0, 0.0]).T, agent1)
robot2 = MyRobot(np.array([-3.0, -3.0]).T, agent2,"black")
world.append(robot1)
world.append(robot2)
world.draw()

<IPython.core.display.Javascript object>

### 点ランドマークの実装 

In [15]:
class Landmark:
    def __init__(self, x, y):
        self.pos=np.array([x,y]).T
        self.id = None
        
    def draw(self, ax, elems):
        c = ax.scatter(self.pos[0], self.pos[1], s=100, marker="*", label="landmarks", color="orange")
        elems.append(c)
        elems.append(ax.text(self.pos[0], self.pos[1], "id:"+str(self.id), fontsize=10))

In [16]:
class Map:
    def __init__(self):
        self.landmarks = []
        
    def append_landmark(self, landmark):
        landmark.id = len(self.landmarks)
        self.landmarks.append(landmark)
    
    def draw(self, ax, elems):
        for lm in self.landmarks:
            lm.draw(ax, elems)

In [17]:
world = World(10, 1)


agent1 = Agent(0.5,0.0)
agent2 = Agent(0.5,0.5)
robot1 = MyRobot(np.array([-3.0, 0.0]).T, agent1)
robot2 = MyRobot(np.array([-3.0, -3.0]).T, agent2,"black")
world.append(robot1)
world.append(robot2)

m = Map()
m.append_landmark(Landmark(2,-2))
m.append_landmark(Landmark(-1,-3))
m.append_landmark(Landmark(3,3))
world.append(m)
world.draw()

<IPython.core.display.Javascript object>

### 観測の実装

In [18]:
class MyCamera:
    def __init__(self, env_map, distance_range = (0.5, 1)):
        self.map = env_map
        self.lastdata = []
        self.distance_range = distance_range
    
    def visible(self, pos):
        if pos is None:
            return False
        
        distance = np.hypot(*pos)
        return self.distance_range[0] <= distance <= self.distance_range[1]
    
    def data(self, cam_pose):
        observed = []
        for lm in self.map.landmarks:
            z = self.observation_function(cam_pose, lm.pos)
            if self.visible(z):
                observed.append((z, lm.id))
        
        self.lastdata = observed
        return observed
    
    @classmethod
    def observation_function(cls, cam_pose, obj_pos):
        diff = obj_pos - cam_pose[0:2]
        return np.array(diff).T
    
    def draw(self, ax, elems, cam_pose):
        for lm in self.lastdata:
            x, y = cam_pose
            lx = lm[0][0] + x
            ly = lm[0][1] + y
            elems += ax.plot([x,lx],[y,ly], color="pink")
        

In [19]:
m = Map()
m.append_landmark(Landmark(2.0,-2.0))
m.append_landmark(Landmark(-1.0,-3.0))
m.append_landmark(Landmark(3.0,3.0))
cam = MyCamera(m)
robot1 = MyRobot(np.array([0.0, 0.0]).T)
print(cam.data(robot1.pose))


[]


In [20]:
world = World(10, 0.1)

m = Map()
m.append_landmark(Landmark(2,-2))
m.append_landmark(Landmark(-1,-3))
m.append_landmark(Landmark(3,3))
world.append(m)

agent1 = Agent(0.5,0.0)
agent2 = Agent(0.5,0.5)
robot1 = MyRobot(np.array([-3.0, 0.0]).T, sensor=MyCamera(m), agent=agent1)
robot2 = MyRobot(np.array([-3.0, -3.0]).T, sensor=MyCamera(m), agent=agent2, color="black")
world.append(robot1)
world.append(robot2)

world.draw()

<IPython.core.display.Javascript object>

 ## 雑音のない場合の実装まとめ

In [71]:
class World:
    '''
    シミュレーション全体の管理
    '''
    def __init__(self, time_span, time_interval, debug=False): #シミュレーション時間, シミュレーション更新周期
        self.objects = []
        self.debug = debug
        self.time_span = time_span
        self.time_interval = time_interval
    
    def append(self, obj):
        self.objects.append(obj)
    
    def draw(self):
        fig = plt.figure(figsize=(4,4))
        ax = fig.add_subplot(111)
        ax.set_aspect("equal")
        ax.set_xlim(-5, 5)
        ax.set_ylim(-5, 5)
        
        elems = []
        if self.debug:        
            for i in range(int(self.time_span/self.time_interval)): self.one_step(i, elems, ax)
        else:
            self.ani = anm.FuncAnimation(fig, self.one_step, fargs=(elems, ax), frames=int(self.time_span/self.time_interval)+1, interval=int(self.time_interval*1000), repeat=False)
        plt.show()
    
    def one_step(self, i, elems, ax):
        while elems:
            elems.pop().remove()
        time_str = "t=%.2f[s]" % (self.time_interval*i)
        elems.append(ax.text(-4.4, 4.5, time_str, fontsize=10))
        for obj in self.objects:
            obj.draw(ax, elems)
            if hasattr(obj, "one_step"):
                obj.one_step(self.time_interval)

In [46]:
class MyRobot:
    '''
    ロボット1台の処理関係
    agentとsensorを含む
    '''
    def __init__(self, pose, agent=None, sensor=None, color="red"):
        self.pose = pose
        self.r = 0.2
        self.color = color
        self.agent = agent
        self.poses = [pose] #移動の軌跡を保存
        self.sensor = sensor
        
    def draw(self, ax, elems):
       
        #初期値or状態遷移後のposeを取得
        x,y = self.pose
        robot = patches.Circle(xy=(x,y), radius=self.r, color=self.color)
        elems.append(ax.add_patch(robot))
        
        self.poses.append(np.array([x,y]).T)
        poses_x = [e[0] for e in self.poses]
        poses_y = [e[1] for e in self.poses]
        elems += ax.plot(poses_x, poses_y, linewidth=0.5, color="black")
        
        if self.sensor and len(self.poses) > 1:
            #状態遷移前の姿勢で観測しているのでposes[-2] (一つ前の姿勢値から線分の計算)
            self.sensor.draw(ax, elems, self.poses[-2])
        
        if self.agent and hasattr(self.agent, "draw"):
            self.agent.draw(ax, elems)
    
    @classmethod
    def state_transition(cls, v_x, v_y, time, pose): #x軸の速度, y軸の速度, 移動時間
        return pose + np.array([v_x*time, v_y*time])
    
    def one_step(self, time_interval):
        if self.agent:
            #観測
            obs = self.sensor.data(self.pose) if self.sensor else None
            # >>> 現在の姿勢と観測結果がわかった状態
            
            #エージェントによる制御値の決定
            v_x, v_y = self.agent.decision(obs)
            
            #状態遷移
            self.state_transition(v_x, v_y, time_interval)
            # >>> 現在の姿勢が更新, 観測はまだ

In [23]:
class Agent:
    '''
    ロボットの動作を決定する, agent(操縦者)としてRobotに登録する
    '''
    def __init__(self, v_x, v_y):
        self.v_x = v_x
        self.v_y = v_y
        self.counter =0
    
    def decision(self, observation=None):
        self.counter += 1
        return self.v_x, self.v_y

In [24]:
class Landmark:
    '''
    観測地点のデータ, Mapに登録する
    '''
    def __init__(self, x, y):
        self.pos=np.array([x,y]).T
        self.id = None
        
    def draw(self, ax, elems):
        c = ax.scatter(self.pos[0], self.pos[1], s=100, marker="*", label="landmarks", color="orange")
        elems.append(c)
        elems.append(ax.text(self.pos[0], self.pos[1], "id:"+str(self.id), fontsize=10))

In [25]:
class Map:
    '''
    Landmarkを管理する
    '''
    def __init__(self):
        self.landmarks = []
        
    def append_landmark(self, landmark):
        landmark.id = len(self.landmarks)
        self.landmarks.append(landmark)
    
    def draw(self, ax, elems):
        for lm in self.landmarks:
            lm.draw(ax, elems)

In [63]:
class MyCamera:
    '''
    観測を管理する, sensorとしてRobotに登録する
    '''
    def __init__(self, env_map, distance_range = (0.5, 4)):
        self.map = env_map
        self.lastdata = []
        self.distance_range = distance_range
    
    def visible(self, pos):
        if pos is None:
            return False
        
        distance = np.hypot(*pos)
        return self.distance_range[0] <= distance <= self.distance_range[1]
    
    def data(self, cam_pose):
        observed = []
        for lm in self.map.landmarks:
            z = self.observation_function(cam_pose, lm.pos)
            if self.visible(z):
                observed.append((z, lm.id))
        
        self.lastdata = observed
        return observed
    
    @classmethod
    def observation_function(cls, cam_pose, obj_pos):
        diff = obj_pos - cam_pose
        return np.array(diff).T
    
    def draw(self, ax, elems, cam_pose):
        for lm in self.lastdata:
            x, y = cam_pose
            lx = lm[0][0] + x
            ly = lm[0][1] + y
            elems += ax.plot([x,lx],[y,ly], color="pink")

## 雑音の追加

### ロボットに雑音を追加

In [47]:
class Robot(MyRobot):
    def __init__(self, pose, agent=None, sensor=None, color="black", noise_per_meter=5, noise_std=0.1):
        super().__init__(pose, agent, sensor, color)
        self.noise_pdf = expon(scale=1.0/(1e-100 + noise_per_meter))
        self.distance_until_noise = self.noise_pdf.rvs()
        self.pose_noise = norm(scale=noise_std)
    
    def noise(self, pose, v_x, v_y, time_interval):
        distance = np.hypot(v_x*time_interval, v_y*time_interval)
        self.distance_until_noise -= distance
        if self.distance_until_noise <= 0.0:
            self.distance_until_noise += self.noise_pdf.rvs()
            noise_value = self.pose_noise.rvs()
            pose[0] += self.pose_noise.rvs() #noise_value
            pose[1] += self.pose_noise.rvs() #noise_value
            
        return pose

    def one_step(self, time_interval):
        if self.agent:
            obs = self.sensor.data(self.pose) if self.sensor else None
            v_x, v_y = self.agent.decision(obs)
            self.pose = self.state_transition(v_x, v_y, time_interval, self.pose)
            self.pose = self.noise(self.pose, v_x, v_y, time_interval)

In [28]:
world = World(10, 0.1)
for i in range(100):
    line = Agent(0.5, 0.5)
    r = Robot(np.array([-4.0,-4.0]).T, sensor=None, agent=line, color="gray")
    world.append(r)
world.draw()

<IPython.core.display.Javascript object>

### 観測に雑音を追加

In [118]:
class Camera(MyCamera):
    def __init__(self, env_map, distance_range=(0.5, 4), pos_noise=0.1):
        super().__init__(env_map, distance_range)
        self.pos_noise = pos_noise
    
    def noise(self, relpos):
        noise_x = norm.rvs(loc=relpos[0], scale=self.pos_noise)
        noise_y = norm.rvs(loc=relpos[1], scale=self.pos_noise)
        return np.array([noise_x, noise_y]).T
    
    def data(self, cam_pose):
        observed = []
        for lm in self.map.landmarks:
            z = self.observation_function(cam_pose, lm.pos)
            if self.visible(z):
                z = self.noise(z)
                observed.append((z, lm.id))
        
        self.lastdata = observed
        return observed

In [30]:
world = World(10, 0.1)

m = Map()
m.append_landmark(Landmark(2,-2))
m.append_landmark(Landmark(-1,-3))
m.append_landmark(Landmark(3,3))
world.append(m)

agent1 = Agent(0.5,0.5)
robot1 = MyRobot(np.array([-3.0, -3.0]).T, sensor=Camera(m), agent=agent1)
world.append(robot1)

world.draw()

<IPython.core.display.Javascript object>

## 推定器を登録可能なエージェントの実装

In [89]:
class EstimationAgent(Agent):
    def __init__(self, time_interval, v_x, v_y, estimator):
        super().__init__(v_x, v_y)
        self.estimator = estimator
        self.time_interval = time_interval
        self.prev_v_x = 0.0
        self.prev_v_y = 0.0
    
    def decision(self, observation=None):
        print("prev_x:",self.prev_v_x, "prev_y:",self.prev_v_y)
        self.estimator.motion_update(self.prev_v_x, self.prev_v_y, self.time_interval)
        self.prev_v_x, self.prev_v_y = self.v_x, self.v_y
        self.estimator.observation_update(observation)
        return self.v_x, self.v_y
        
    def draw(self, ax, elems):
        self.estimator.draw(ax, elems)
        

### KFの実装

In [32]:
def sigma_ellipse(p, cov, n):
    eig_vals, eig_vec = np.linalg.eig(cov)
    ang = math.atan2(eig_vec[:,0][1], eig_vec[:,0][0])/math.pi*180
    return Ellipse(p, width=2*n*math.sqrt(eig_vals[0]), height=2*n*math.sqrt(eig_vals[1]), angle=ang, fill=False, color="blue", alpha=0.5)

In [126]:
class KalmanFilter:
    def __init__(self, envmap, init_pose, motion_noise_stds= {"nn":0.1, "oo":0.1}, pos_noise=0.1):
        self.belief = multivariate_normal(mean=np.array([0.0, 0.0]), cov=np.diag([1e-10, 1e-10]))
        self.pose = self.belief.mean
        self.motion_noise_stds = motion_noise_stds
        self.pose = self.belief.mean
        self.map = envmap
        self.pos_noise = pos_noise
    
    def matR(self, v_x, v_y,time):
        return np.diag([self.motion_noise_stds["nn"]**2*abs(v_x)/time, self.motion_noise_stds["oo"]**2*abs(v_y)/time])
        
    def observation_update(self, observation):
        print("observation update")
        for d in observation:
            z = d[0]
            obs_id = d[1]
            estimated_z = MyCamera.observation_function(self.belief.mean, self.map.landmarks[obs_id].pos)
            H = np.array([[-1,0],[0,-1]])
            K = self.belief.cov.dot(H.T).dot(np.linalg.inv(H.dot(self.belief.cov).dot(H.T) + np.diag([self.pos_noise, self.pos_noise])))            
            self.belief.mean += K.dot(z - estimated_z)
            self.belief.cov = (np.eye(2) - K.dot(H)).dot(self.belief.cov)
            self.pose = self.belief.mean
    
    def motion_update(self, v_x, v_y, time):
        self.belief.cov = self.belief.cov + self.matR(v_x,v_y,time)
        self.belief.mean = MyRobot.state_transition(v_x, v_y, time, self.belief.mean)
        self.pose = self.belief.mean
    
    def draw(self, ax, elems):
        e = sigma_ellipse(self.belief.mean[0:2], self.belief.cov[0:2, 0:2], 3)
        elems.append(ax.add_patch(e))

In [125]:
def trial():
    time_interval = 0.1
    world = World(25, time_interval, debug=False)
    #地図の生成
    m = Map()
    for ln in [(-4.0, 2.0), (2.0, -3.0), (3.0, 3.0)]:
        m.append_landmark(Landmark(*ln))
    world.append(m)
    
    
    #ロボットを作成
    initial_pose = np.array([0.0, 0.0]).T
    kf = KalmanFilter(m, initial_pose)
    line = EstimationAgent(time_interval, 0.1, 0.1, kf)
    r = Robot(initial_pose, sensor=Camera(m), agent=line, color="red")
    world.append(r)
    world.draw()

trial()

<IPython.core.display.Javascript object>