# Example: Discrete Bayes

In [1]:
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import time
from ipywidgets import interact

In [2]:
class World:
    def __init__(self, L: float, doors: np.ndarray, N: int, dynamics_std: float, measurement_std: float):
        """
        Params:
            L: hallway length
            doors: array of door positions along the hallway, each should be in range [0, L]
                   this should be in sequential order of position
            N: number of discretization buckets
            dynamics_std: standard deviation of dynamics noise
            measurement_std: standard deviation of measurement noise
        """
        self.L = L
        self.doors = doors
        self.N = N
        self.dynamics_std = dynamics_std
        self.dynamics_noise = norm(0., self.dynamics_std)
        self.measurement_std = measurement_std
        self.measurement_noise = norm(0., self.measurement_std)
        self.interval_width = self.L / self.N
        self.rng = np.random.default_rng()

    def position_to_idx(self, x: float):
        """
        Given a continuous position in the range [0, L], compute
        the bucket index for N evenly spaced buckets.
    
        Params:
            x: position in range [0, L]
    
        Returns:
            Index of bucket the position falls into
        """
        x = np.max((0., x))
        x = np.min((self.L, x))
        return int(np.floor(x / self.interval_width))

    def idx_to_position(self, i: int):
        """
        Given a bucket index for N evenly spaced buckets compute
        the continous position of the bucket center.
    
        Params:
            i: bucket index in {0, ..., self.N-1}
    
        Returns:
            Position of bucket center
        """
        return i * self.interval_width + 0.5 * self.interval_width

    def dynamics(self, x: float, u: float, use_noise: bool=False):
        """
        Simulate a step of the robot's dynamics.

        Params:
            x: current state
            u: control input
            use_noise: true to add noise to the dynamics

        Returns:
            next state
        """
        x_next = self.rng.normal(x + u, self.dynamics_std) if use_noise else x + u
        return np.min((x_next, self.L))
    
    def measurement(self, x: float, use_noise: bool=False):
        """
        Measurement consists of a noisy range measurement to the nearest
        door to the robot's right. Assumes self.doors is already in order.

        Params:
            x: current state
            use_noise: true to add noise to the measurement

        Returns:
            Measurement from state x
        """
        for door in self.doors:
            if door > x:
                return self.rng.normal(door - x, self.measurement_std) if use_noise else door - x
        return self.doors[-1]

    def discrete_transition_probability(self, i0: float, u: float, i1: float):
        """
        Compute discretized transition probability

        Params:
            i0: bin index for initial state
            u: control
            i1: bin index for next state

        Returns:
            Approximation of probability of the transition from bin i0 to i1
            with control u.
        """
        x0 = self.idx_to_position(i0)
        x1 = self.idx_to_position(i1)
        error = x1 - self.dynamics(x0, u, use_noise=False)
        return self.dynamics_noise.pdf(error)

    def discrete_measurement_probability(self, i: float, z: float):
        """
        Compute discretized measurement probability

        Params:
            i: bin index
            z: array of door measurements

        Returns:
            Probability p(z | i), i.e. measurement likelihood
        """
        x = self.idx_to_position(i)
        error = z - self.measurement(x, use_noise=False)
        return self.measurement_noise.pdf(error)

    def discrete_bayes_update(self, belief_prev: np.ndarray, u: float, z: float):
        """
        Perform one step of discrete Bayes filter

        Params:
            belief_prev: previous time belief distribution
            u: control input
            z: measurement

        Returns:
            Update belief
        """
        belief = np.zeros_like(belief_prev)
        
        # Predict step
        for i in range(self.N):
            for i_prev in range(self.N):
                belief[i] += self.discrete_transition_probability(i_prev, u, i) * belief_prev[i_prev]

        # Correct step
        if z is not None:
            for i in range(self.N):
                belief[i] *= self.discrete_measurement_probability(i, z)

        # Normalize distribution
        normalization_constant = np.sum(belief)
        if normalization_constant < 1e-3:
            raise ValueError("Invalid belief update")
        return belief / normalization_constant

    def simulate(self, x0: float, belief0: np.ndarray, u: float, T: int):
        """
        Simulate robot moving down the hallway with a constant control

        Params:
            x0: initial state
            belief0: initial belief
            u: constant control input
            T: number of steps to simulate

        Returns:
            (x, belief) where x is the state trajectory and belief is of shape (T, N) where
            T is the number of simualted steps and N is the state dimension so belief[t, :]
            represents the belief at time step t.
        """
        x = np.zeros((T,))
        belief = np.zeros((T, self.N))
        x[0] = x0
        belief[0, :] = belief0
        for t in range(T-1):
            x[t+1] = self.dynamics(x[t], u, use_noise=True)
            z = self.measurement(x[t+1], use_noise=True)

            # Belief update
            belief[t+1, :] = self.discrete_bayes_update(belief[t, :], u, z)
        return x, belief

In [7]:
# Create world
world = World(L=10, doors=[2., 5.], N=50, dynamics_std=0.1, measurement_std=0.5)

# Set problem constants
x0 = 0.
u = 0.5
T = 20

# Initialize belief to uniform
belief0 = np.ones((world.N)) / world.N

# Simulate
start_time = time.perf_counter()
x, belief = world.simulate(x0, belief0, u, T)
runtime = time.perf_counter() - start_time
print(f"Time: {1000*runtime:.2f} ms")

# Plot results for each time step
@interact(i=(0, T-1))
def f(i=0):
    fig, axs = plt.subplots(2, 1, figsize=(10, 4))
    for door in world.doors:
        width=0.3
        height=0.5
        rect = Rectangle((door - 0.5*width, 0), width, height, facecolor='blue', edgecolor='black', alpha=0.7)
        axs[0].add_patch(rect)
    interval = world.L / world.N
    buckets = interval * np.arange(world.N) + 0.5 * world.L / world.N
    axs[0].plot(x[i], 0.1, color='r', marker='o', markersize=20)
    axs[1].bar(buckets, belief[i])
    for ax in axs:
        ax.set_xlim(-0.1,world.L+0.1)
        ax.set_ylim(-0.1, 1.1)
    axs[0].tick_params(axis='y', left=False, labelleft=False)
    axs[1].set_ylabel('belief')
    plt.show()

Time: 4433.46 ms


interactive(children=(IntSlider(value=0, description='i', max=19), Output()), _dom_classes=('widget-interact',â€¦