# Homework 5

Austin Gill

* 11: 3
* 16: 1, 2
* 17: 2, 3, 4
* 18: 1.1
* Text Addition

In [None]:
# %config InlineBackend.figure_format = 'retina'
%config InlineBackend.figure_format = 'svg'
%matplotlib inline

from collections import deque
import itertools
import functools
import numpy as np
import scipy as sp
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
# Apparently, SNS stands for "Samuel Norman Seaborn", a fictional
# character from The West Wing
import seaborn as sns

sns.set()

## Problem 11.3

Write a Python program to navigate a robot using the Wavefront algorithm. Demonstrate on a map with multiple obstacles.

### Map Creation with Celular Automata

I couldn't figure out how to make an interesting map by hand in a way that didn't suck, so after a little research, here's how I can create a map with celular automata. The algorithm is from [here](http://brogue.wikia.com/wiki/Level_Generation#Lakes) and modified to fit my needs.

First we need a nice way to plot a map and an optional path to check our results.

In [None]:
def plot(a, path=None, **kwargs):
    """Plot a given map."""
    sns.heatmap(a,
                cbar=False,
                square=True,
                xticklabels=False, yticklabels=False,
                **kwargs)
    if path is not None:
        sns.heatmap(a,
                    mask=path,
                    cmap=mpl.colors.ListedColormap(['m']),
                    cbar=False,
                    square=True,
                    xticklabels=False, yticklabels=False,
                    **kwargs)
    plt.show()

Then we generate a seed matrix randomly filled with the specified percentage of obstacle values.

In [None]:
def generate_seed(rows, cols, percentage):
    """Generate a seed of the given size with the given percentage
    of filled cells.
    """
    # Create a 1D array and then reshape it because I'm a scrub.
    a = np.zeros(rows * cols, dtype=int)
    # Set the given percentage of the values to be filled.
    a[:int(percentage * rows * cols)] = -1
    np.random.shuffle(a)
    # Return a shuffled 2D view of the array.
    return np.reshape(a, (rows, cols))

In [None]:
def neighbors(a, i, j):
    """Get the 3x3 submatrix around coordinate (i, j)"""
    xmin = i - 1 if i - 1 >= 0 else 0
    ymin = j - 1 if j - 1 >= 0 else 0
    # Add 2 because the end is exclusive.
    return a[xmin:i+2, ymin:j+2]

Then we glob together cells matching certain conditions. An empty cell is populated if it is surrounded by 6 or more filled cells. A filled cell remains filled if it is surrounded by 4 or more filled cells. Any other cells die or remain dead.

In [None]:
def blob(a):
    """Run the blob algorithm once on the given matrix."""
    rows, cols = a.shape
    for i, j in itertools.product(range(rows), range(cols)):
        N = neighbors(a, i, j)
        # Cell is born if surrounded by more than 5 neighbors.
        if not a[i, j] and -np.sum(N) > 5:
            a[i, j] = -1
        # Cell dies if surrounded by 3 or fewer neighbors.
        elif a[i, j] and -np.sum(N) <= 3:
            a[i, j] = 0

So generate the seed and run two iterations to "smooth out" the obstacles.

In [None]:
a = generate_seed(15, 15, 0.45)
plot(a)

blob(a)
blob(a)
plot(a)

### The Wavefront Algorithm

The wavefront algorithm works in two steps

1. Fill the free space starting with the goal.

   This seems like it would work best recursively, because you need to start with the goal and work your way outward. It needs to be a breadth first recursive solution because you want to work outwards relatively evenly.

2. From the start point, randomly pick from the lowest labeled cells (there could be more than one with the same distance).

I will use 4-way travel rather than 8-way (no diagonal moves).

In [None]:
def valid(a, coord):
    """Determines if the given coordinate is valid for the given array."""
    coord = np.array(coord)
    return np.all(np.zeros_like(a.shape) <= coord) and np.all(coord < a.shape)

Use a breadth first flood fill algorithm because we want to mark all of a cell's immediate neighbors before moving on to their neighbors.

In [None]:
def flood_fill(a, start):
    """Breadth-first flood fill the given map from the given start point."""
    q = deque()
    # It's a deque, so append and pop from opposite ends -_-
    q.appendleft(start)

    while len(q) > 0:
        i, j = q.pop()
        adjacents = [(i, j-1), (i+1, j), (i, j+1), (i-1, j)]

        distance = a[i, j]
        distance += 1

        # Consider only cells inside the given matrix.
        for cell in filter(functools.partial(valid, a), adjacents):
            # Update any cells that are not the start and that haven't
            # been updated yet.
            if cell != start and a[cell] == 0:
                a[cell] = distance
                q.appendleft(cell)

So we flood fill and plot the results.

In [None]:
flood_fill(a, (0, 0))
plot(a, annot=True)

Note that with this implementation, the last encountered cell with the minimal value will be chosen. With the ordering of the `adjacents` list in the `descent()` function, this corresponds to prefering vertical paths over horizontal paths of the same length.

In [None]:
def min_index(a, indices):
    """Get the index with the minimum value from a list of indices."""
    idx = indices[0]
    val = a[idx]

    for cell in indices:
        if a[cell] <= val:
            idx = cell
            val = a[idx]

    return idx

The second phase of the wavefront algorithm is to descend from the goal to the start point.

In [None]:
def descent(a, goal):
    """Get a path through the map descending from the given goal point."""
    yield goal

    while True:
        i, j = goal
        adjacents = [(i, j-1), (i+1, j), (i, j+1), (i-1, j), (i, j)]
        # Filter out cells outside the map.
        adjacents = [p for p in adjacents if valid(a, p)]
        # Filter out obstacle cells.
        adjacents = [p for p in adjacents if a[p] >= 0]

        # If there's nowhere to go, we've reached the end
        if not adjacents:
            break

        idx = min_index(a, adjacents)

        # If we don't make progress or we've reached the start, stop.
        if idx == goal:
            break
            
        yield idx
        
        if a[idx] == 0:
            break

        goal = idx

Then plot the resulting path from $(14, 14)$ to the start point.

In [None]:
# Mark out the path with a boolean mask.
mask = np.ones_like(a, dtype=bool)

for cell in descent(a, (14, 14)):
    mask[cell] = False

# Plot the filled map and the path.
plot(a, path=mask)

Now what happens if there is no path from the start to the goal?

In [None]:
b = generate_seed(15, 15, 0.55)
plot(b)

blob(b)
blob(b)
plot(b)

In [None]:
flood_fill(b, (0, 0))
plot(b, annot=True)

In [None]:
# Mark out the path with a boolean mask.
mask = np.ones_like(b, dtype=bool)

for cell in descent(b, (14, 14)):
    mask[cell] = False

# Plot the filled map and the path.
plot(b, path=mask)

Note that the flood fill algorithm leaves all unreachable cells with distance `0`, so when we begin our descent, we immediately believe we have found the start point, so we do not progress. This is reasonable.

## Problem 16.1

Let $z_1=284$, $z_2=257$, $z_3=295$, be measurements from sensors which have normally distributed errors with standard deviations $\sigma_1=10$, $\sigma_2=20$, $\sigma_3=15$, respectively. What is the best estimate for the measured state?

We want to weight the better measurments more, so we use

$$\hat x = \frac{1}{S} \sum_i s_i z_i$$

where $S = \sum_i s_i$ and $s_i = \frac{1}{\sigma_1^2}$. I'm a scrub, so here's some code.

In [None]:
z = np.array([284, 257, 295])
sigma = np.array([10, 20, 15])

In [None]:
s = 1 / sigma**2
x = (1 / np.sum(s)) * np.sum(s * z)
print(x)

This estimate makes sense, because we would weight $z_1$ more than the rest, so that's the measurement the estimate is closest to.

## Problem 16.2

You are given three distance sensors which all measure the same distance. To determine the accuracy of each you run repeated measurements of an object which you setup so that the sensor will return 2 meters. Running 40 measurements you obtain the data listed below. Next you measure the distance of an object ahead of the robot. Sensor $A$ gives 2.4577696, sensor $B$ gives 1.8967743, and sensor $C$ gives 2.1352561. Combine these readings to make a more accurate estimate of the object distance. Hint: you need to figure out the distributions and the data according to the variances.
<!--
|    $A$    |    $B$    |    $C$    |
|:---------:|:---------:|:---------:|
| 2.2411549 | 1.8286673 | 2.3295015 |
| 2.3366108 | 1.9243295 | 2.4867167 |
| 1.9687234 | 1.8972737 | 2.5489412 |
| 2.1240351 | 2.0961534 | 1.9834876 |
| 2.3984044 | 1.7985819 | 2.6153805 |
| 2.3523899 | 1.8377782 | 1.8132444 |
| 2.1074266 | 1.8358201 | 2.5563951 |
| 2.4711542 | 1.8875839 | 2.2031291 |
| 2.1998000 | 1.8116113 | 2.3117542 |
| 2.2710086 | 1.8701890 | 2.3495262 |
| 2.3530473 | 1.7646824 | 2.0109293 |
| 2.4391559 | 1.9499153 | 2.5030771 |
| 2.2066306 | 1.9243432 | 2.3561112 |
| 2.3000099 | 1.8309696 | 2.3097754 |
| 2.2235766 | 1.8453219 | 2.2940692 |
| 2.1396901 | 1.8390955 | 2.1904604 |
| 2.0929719 | 1.7978329 | 2.5693897 |
| 2.3154159 | 1.8217245 | 1.9332188 |
| 2.3716302 | 1.9558670 | 2.3002433 |
| 2.2611420 | 1.8654487 | 2.5508342 |
| 2.1415088 | 1.7836290 | 2.6884786 |
| 2.2088487 | 1.9245743 | 2.5037028 |
| 2.2714614 | 1.8918415 | 2.7112663 |
| 2.3345816 | 1.8275421 | 2.1656644 |
| 2.3052296 | 1.8494488 | 2.1940472 |
| 2.1600041 | 1.7632971 | 2.2703708 |
| 2.0630943 | 1.8396972 | 2.6488544 |
| 2.0997821 | 1.8412331 | 2.1828831 |
| 2.3037175 | 1.7761007 | 2.2959535 |
| 2.4536524 | 1.8542271 | 2.0446945 |
| 2.3909478 | 1.8649815 | 2.7852822 |
| 2.1195966 | 1.9533324 | 2.5700007 |
| 2.0205112 | 1.8857815 | 2.1113650 |
| 2.1708006 | 1.7115595 | 2.1215336 |
| 2.0800748 | 1.9403332 | 2.3126032 |
| 2.3332722 | 1.8530670 | 2.4687277 |
| 2.0826115 | 1.8279041 | 2.6104026 |
| 2.2652480 | 1.9058054 | 2.3165716 |
| 2.3734464 | 1.9632258 | 2.0907554 |
| 2.0563260 | 1.9367908 | 2.2130578 | -->

In [None]:
za = np.array([
    2.2411549, 2.3366108, 1.9687234, 2.1240351, 2.3984044, 2.3523899, 2.1074266,
    2.4711542, 2.1998000, 2.2710086, 2.3530473, 2.4391559, 2.2066306, 2.3000099,
    2.2235766, 2.1396901, 2.0929719, 2.3154159, 2.3716302, 2.2611420, 2.1415088,
    2.2088487, 2.2714614, 2.3345816, 2.3052296, 2.1600041, 2.0630943, 2.0997821,
    2.3037175, 2.4536524, 2.3909478, 2.1195966, 2.0205112, 2.1708006, 2.0800748,
    2.3332722, 2.0826115, 2.2652480, 2.3734464, 2.0563260,
])

zb = np.array([
    1.8286673, 1.9243295, 1.8972737, 2.0961534, 1.7985819, 1.8377782, 1.8358201,
    1.8875839, 1.8116113, 1.8701890, 1.7646824, 1.9499153, 1.9243432, 1.8309696,
    1.8453219, 1.8390955, 1.7978329, 1.8217245, 1.9558670, 1.8654487, 1.7836290,
    1.9245743, 1.8918415, 1.8275421, 1.8494488, 1.7632971, 1.8396972, 1.8412331,
    1.7761007, 1.8542271, 1.8649815, 1.9533324, 1.8857815, 1.7115595, 1.9403332,
    1.8530670, 1.8279041, 1.9058054, 1.9632258, 1.9367908,
])

zc = np.array([
    2.3295015, 2.4867167, 2.5489412, 1.9834876, 2.6153805, 1.8132444, 2.5563951,
    2.2031291, 2.3117542, 2.3495262, 2.0109293, 2.5030771, 2.3561112, 2.3097754,
    2.2940692, 2.1904604, 2.5693897, 1.9332188, 2.3002433, 2.5508342, 2.6884786,
    2.5037028, 2.7112663, 2.1656644, 2.1940472, 2.2703708, 2.6488544, 2.1828831,
    2.2959535, 2.0446945, 2.7852822, 2.5700007, 2.1113650, 2.1215336, 2.3126032,
    2.4687277, 2.6104026, 2.3165716, 2.0907554, 2.2130578,
])

za.sort()
zb.sort()
zc.sort()

data = np.array([za, zb, zc])

In [None]:
plt.plot(za, sp.stats.norm.pdf(za, za.mean(), za.std()), label='Sensor $A$')
plt.plot(zb, sp.stats.norm.pdf(zb, zb.mean(), zb.std()), label='Sensor $B$')
plt.plot(zc, sp.stats.norm.pdf(zc, zc.mean(), zc.std()), label='Sensor $C$')

plt.title('Sensor Normal Curves')
plt.legend()
plt.show()

In [None]:
# Center the distributions around the true value
shifts = 2 - np.array([za.mean(), zb.mean(), zc.mean()])
stds = np.array([za.std(), zb.std(), zc.std()])

za += shifts[0]
zb += shifts[1]
zc += shifts[2]

In [None]:
plt.plot(za, sp.stats.norm.pdf(za, za.mean(), za.std()), label='Sensor $A$')
plt.plot(zb, sp.stats.norm.pdf(zb, zb.mean(), zb.std()), label='Sensor $B$')
plt.plot(zc, sp.stats.norm.pdf(zc, zc.mean(), zc.std()), label='Sensor $C$')

plt.title('Centered Normal Curves')
plt.legend()
plt.show()

We can optimally fuse these measurements with the formula

$$\hat x = \frac{\sum_i \frac{z_i}{\sigma_i^2}}{\sum_i \frac{1}{\sigma_i^2}}$$

which will have a variance

$$\hat{\sigma^2} = \left(\sum_i \frac{1}{\sigma_i^2}\right)^{-1}$$

In [None]:
# Readings after experiment
z = np.array([2.4577696, 1.8967743, 2.1352561])
corrected = z + shifts
print('correctted measurements:', corrected)

In [None]:
xhat = np.sum(corrected / stds**2) / np.sum(1 / stds**2)
variance = 1 / np.sum(1 / stds**2)

print('estimate:', xhat)
print('variance:', variance)

## Problem 17.2

Assume that one has three different measurements for the location of some object. The three measurements with the covariances are

$$\begin{align}
    (10.5, 18.2)&, \quad\begin{pmatrix}
        0.1 & 0.01 \\
        0.01 & 0.15
    \end{pmatrix} \\
    (10.75, 18.0)&, \quad\begin{pmatrix}
        0.05 & 0.005 \\
        0.005 & 0.05
    \end{pmatrix} \\
    (9.9, 19.1)&, \quad\begin{pmatrix}
        0.2 & 0.05 \\
        0.05 & 0.25
    \end{pmatrix}
\end{align}$$
Fuse this data into one measurement and provide an estimate of the covariance.

In [None]:
z1 = np.array([10.5, 18.2])
V1 = np.array([[0.1,  0.01],
               [0.01, 0.15]])

z2 = np.array([10.75, 18.0])
V2 = np.array([[0.05, 0.005],
               [0.005, 0.05]])

z3 = np.array([9.9, 19.1])
V3 = np.array([[0.2,  0.05],
               [0.05, 0.25]])

In [None]:
z = np.array([z1, z2, z3])
V = np.array([V1, V2, V3])

In [None]:
# Arrays to save each estimate and covariance matrix in.
xhats = np.zeros_like(z)
Ps = np.zeros_like(V)

xhats[0] = z[0]
Ps[0] = V[0]

In [None]:
for i in range(1, 3):
    K = Ps[i - 1] @ sp.linalg.inv(Ps[i - 1] + V[i])
    xhats[i] = xhats[i - 1] + K @ (z[i] - xhats[i - 1])
    Ps[i] = Ps[i - 1] - K @ Ps[i - 1]

In [None]:
print(xhats)
print(Ps)

In [None]:
def eigsorted(cov):
    """Get the sorted eigenvalues of a 2x2 covariance matrix"""
    vals, vecs = sp.linalg.eigh(cov)
    order = vals.argsort()[::-1]
    return vals[order], vecs[:, order]

In [None]:
def plot_ellipse(cov, center, nstd=1, ax=None, **kwargs):
    """Plot an error ellipse."""
    if ax is None:
        ax = plt.gca()
    
    vals, vecs = eigsorted(cov)
    theta = np.degrees(np.arctan2(*vecs[:, 0][::-1]))
    
    width, height = nstd * np.sqrt(vals)
    ellipse = Ellipse(xy=center, width=width, height=height, angle=theta, **kwargs)
    ax.add_patch(ellipse)
    return ellipse

In [None]:
for i, (xhat, P) in enumerate(zip(xhats, Ps)):
    x, y = xhat
    plt.plot(x, y, '.', label=f'$\hat x_{i}$')
    plot_ellipse(P, xhat, alpha=0.2)

plt.title('$\hat x$ Estimates and Error Ellipses')
plt.xlabel('$x_1$')
plt.ylabel('$x_2$')
plt.legend()
plt.show()

This problem relies on assuming that the initial estimate $\hat x_0$ is $z_1$. My first though was to make a wild initial guess and see what happened. Here's the results.

In [None]:
x0 = np.array([10, 18])
P0 = np.array([[1, 0], [0, 1]])

z = np.array([x0, z1, z2, z3])
V = np.array([P0, V1, V2, V3])

xhats = np.zeros_like(z)
Ps = np.zeros_like(V)

xhats[0] = z[0]
Ps[0] = V[0]

In [None]:
for i in range(1, 4):
    K = Ps[i - 1] @ sp.linalg.inv(Ps[i - 1] + V[i])
    xhats[i] = xhats[i - 1] + K @ (z[i] - xhats[i - 1])
    Ps[i] = Ps[i - 1] - K @ Ps[i - 1]

print(xhats)

In [None]:
for i, (xhat, P) in enumerate(zip(xhats, Ps)):
    x, y = xhat
    plt.plot(x, y, '.', label=f'$\hat x_{i}$')
    plot_ellipse(P, xhat, alpha=0.2)

plt.title('$\hat x$ Estimates and Error Ellipses')
plt.xlabel('$x_1$')
plt.ylabel('$x_2$')
plt.legend()
plt.show()

I think the only thing this shows is that the error ellipses get better and better with each measurement.

## Problem 17.3

Run a simulation on

$$\begin{align*}
    \dot x &= y \\
    \dot y &= -\cos x + 0.5 \sin t
\end{align*}$$

adding noise to the $x$ and $y$ components (with variance = $0.2$ on each). Let $\Delta t=0.1$. Assume that you can observe the first variable, $x$, with variance $0.25$. Record the observations. Write a program to run the EKF on the observed data and compare the state estimate to the original values.

First, we discretize

$$\frac{x_{n + 1} - x_n}{\Delta t} = y_n$$
$$\frac{y_{n + 1} - y_n}{\Delta t} = -\cos x_n + 0.5 \sin t_n$$

and get

$$x_{n + 1} = \Delta t y_n + x_n$$
$$y_{n + 1} = \Delta t ( -\cos x_n + 0.5 \sin t_n ) + y_n$$

Then we linearize and get

$$F_k = \begin{pmatrix}
1 & \Delta t \\
\Delta t \sin x_n & 1 \\
\end{pmatrix}$$

Since we observe only the first variable, $H = \begin{pmatrix}1 & 0\end{pmatrix}$.

We have process variance

$$V = \begin{pmatrix}
0.2 & 0 \\
0 & 0.2 \\
\end{pmatrix}$$

and observation variance

$$W = \begin{pmatrix}
0.25
\end{pmatrix}$$

which is a singleton matrix because we are observing only a single variable.

In [None]:
dt = 0.1
process_variance = 0.2
observation_variance = 0.25
V = np.array([[process_variance, 0],
              [0, process_variance]])
W = np.array([[observation_variance]])
H = np.array([[1, 0]])

In [None]:
def F(state):
    """The physics matrix at the given state."""
    assert state.shape == (2, 1)
    x, _ = state
    return np.array([[1, dt],
                     [dt * np.sin(x), 1]])

In [None]:
def f(state, time):
    """The discretized motion model."""
    assert state.shape == (2, 1)
    x, y = state
    return np.array([dt * y + x,
                     dt * (-np.cos(x) + 0.5 * np.sin(time)) + y])

In [None]:
def simulation(x0):
    """A simulation generator for the given physics matrix
    and initial position. Yields (state, observation) tuples.
    """
    prev_state = x0
    while True:
        process_noise = np.random.multivariate_normal([0, 0], V, 1).T
        assert process_noise.shape == (2, 1)
        observation_noise = np.random.multivariate_normal([0], W, 1).T
        assert observation_noise.shape == (1, 1)
        state = F(prev_state) @ prev_state + process_noise
        assert state.shape == (2, 1)
        observation = state + observation_noise
        assert observation.shape == (2, 1)
        
        # Even though we're only observing one variable, yield both and
        # apply the observation matrix to hide the second.
        yield state, observation
        
        prev_state = state

In [None]:
def filter(x0, P0, t0):
    """Run the Extended Kalman Filter starting with the given
    initial state, initial covariance matrix, and physics matrix.
    """
    time = t0
    state_estimate = x0
    state_covariance = P0

    for state, observation in simulation(x0):
        assert state.shape == (2, 1)
        assert observation.shape == (2, 1)
        prediction = f(state_estimate, time)
        assert prediction.shape == (2, 1)
        pred_covar = F(state_estimate) @ state_covariance @ F(state_estimate).T + V
        assert pred_covar.shape == (2, 2)
        kalman_gain = pred_covar @ H.T @ sp.linalg.inv(H @ pred_covar @ H.T + W)
        assert kalman_gain.shape == (2, 1)
        # The simulation simulates both variables, but we only observe the first.
        residual = H @ observation - H @ prediction
        assert residual.shape == (1, 1)
        state_estimate = prediction + kalman_gain @ residual
        assert state_estimate.shape == (2, 1)
        state_covariance = (np.eye(2) - kalman_gain @ H) @ pred_covar
        assert state_covariance.shape == (2, 2)
        
        yield state, observation, state_estimate, state_covariance

In [None]:
x0 = np.array([[0, 0]]).T
P0 = np.array([[1, 0],
               [0, 1]])
t0 = 0

In [None]:
N = 100
ekf = filter(x0, P0, t0)
# Run N iterations of the EKF and save the results
vals = [next(ekf) for _ in range(N)]

In [None]:
states, observations, estimates, covariances = zip(*vals)

x, y = zip(*states)
zx, zy = zip(*observations)
xhat, yhat = zip(*estimates)

# zip returns tuples. Convert to properly dimensioned numpy arrays for math.
yhat = np.array(yhat).reshape(N)
xhat = np.array(xhat).reshape(N)

# Ignore the cross variances because I'm not sure how to plot that...
xstd = np.array([np.sqrt(cov[0, 0]) for cov in covariances])
ystd = np.array([np.sqrt(cov[1, 1]) for cov in covariances])

In [None]:
plt.plot(range(N), x, label='$x$')
plt.plot(range(N), zx, '.', label='$z_x$')
plt.plot(range(N), xhat, label=r'$\hat x$', color='r')

plt.title('EKF $x$ results')
plt.legend()
plt.xlabel('$t$')
plt.ylabel('$x$')
plt.show()

plt.plot(range(N), y, label='$y$')
# Don't plot the observations because we didn't observe this variable.
# plt.plot(range(N), zy, '.', label='$z_y$')
plt.plot(range(N), yhat, label=r'$\hat y$', color='r')

plt.title('EKF $y$ (hidden variable) results')
plt.legend()
plt.xlabel('$t$')
plt.ylabel('$y$')
plt.show()

In [None]:
plt.plot(range(N), x, label='$x$')
# plt.plot(range(N), zx, '.', label='$z_x$')
plt.plot(range(N), xhat, label=r'$\hat x$', color='r')
plt.fill_between(range(N), xhat - xstd, xhat + xstd, alpha=0.3, label=r'$1$ std dev', color='r')

plt.title('EKF $x$ results')
plt.legend()
plt.xlabel('$t$')
plt.ylabel('$x$')
plt.show()

plt.plot(range(N), y, label='$y$')
plt.plot(range(N), yhat, label=r'$\hat y$', color='r')
plt.fill_between(range(N), yhat - ystd, yhat + ystd, alpha=0.3, label=r'$1$ std dev', color='r')

plt.title('EKF $y$ (hidden variable) results')
plt.legend()
plt.xlabel('$t$')
plt.ylabel('$y$')
plt.show()

## Problem 17.4

Differential Drive - EKF. The motion equations for a differential drive robot are given below. Assume that the wheels are $5$cm in radius and the wheelbase is $12$cm.

Use $\Delta t = 0.2$ and discretize. Assume the covariance of the state transition is $V$. Assume that you have a local GPS system that gives $(x,y)$ data subject to Gaussian noise with covariance $W$. The units on the noise are given in cm.

1. Starting at $t=0$, $x=0$, $y=0$, $\theta=0$, predict location when wheel velocities are:
   
   $$\begin{align*}
     0  \leq t \leq 5 &,\quad  \omega_1 =  \omega_2 = 3 \\
     5  \leq t \leq 6 &,\quad  \omega_1 = -\omega_2 = 1 \\
     6  \leq t \leq 10&,\quad  \omega_1 =  \omega_2 = 3 \\
     10 \leq t \leq 11&,\quad -\omega_1 =  \omega_2 = 1 \\
     11 \leq t \leq 16&,\quad  \omega_1 =  \omega_2 = 3 \\
   \end{align*}$$
   assuming that you have Gaussian noise on the process with covariance.
   $$V = \begin{pmatrix}.05 &  .02 & 0.01\\.02& .05& 0.01\\ 0.01& 0.01& .1\end{pmatrix}$$
2. Write out the formulas for the Extended Kalman Filter.
3. Apply an Extended Kalman filter to the motion simulation above to track the location of the vehicle. Observations can be simulated by using previous simulation data as actual data, i.e. use this as the observed data ($z_k$). Parameters:
   $$x_{0|0} = (0,0,0)$$
   $$V = \begin{pmatrix}.05 &  .02 & 0.01\\.02& .05& 0.01\\ 0.01& 0.01& .1\end{pmatrix}$$
   $$W = \begin{pmatrix} .08& .02 \\.02&  .07\end{pmatrix}$$
   $$P_{0|0} = \begin{pmatrix}2 &0& 0\\0 &1& 0\\0& 0& 0.5\end{pmatrix}$$
4. Output the $x, y$ locations on a 0.5 sec grid and compare in a plot.
   
   ??
5. The covariance matrix $P$ gives the uncertainly ellipse for the location of the robot. Plot 5 ellipses along the path. This ellipse has major and minor axes given by the eigenvectors of $P$ and the axes lengths are given by the associated eigenvalues.

The formulas for the Extended Kalman Filter are as follows.

$$\vec x_k = \begin{pmatrix}x_k \\ y_k \\ \theta_k \end{pmatrix}$$
$$\vec u_k = \begin{pmatrix}\omega_{1, k} \\ \omega_{2, k} \end{pmatrix}$$

$$f(\vec x_k, \vec u_k) = \begin{pmatrix}
    x_k + \frac{r \Delta t}{2}(\omega_{1, k} + \omega_{2, k}) \cos \theta_k \\
    y_k + \frac{r \Delta t}{2}(\omega_{1, k} + \omega_{2, k}) \sin \theta_k \\
    \theta_k + \frac{}{2L}(\omega_{1, k} - \omega_{2, k})
\end{pmatrix}$$

$$h(\vec x_k) = \begin{pmatrix}x_k \\ y_k\end{pmatrix}$$

So then the Jacobians $F_k$ and $H_k$ are

$$F_k = \begin{pmatrix}
    1 & 0 & -\frac{r \Delta t}{2}(\omega_{1, k} + \omega_{2, k}) \sin \theta_k \\
    0 & 1 & \frac{r \Delta t}{2}(\omega_{1, k} + \omega_{2, k}) \cos \theta_k \\
    0 & 0 & 1
\end{pmatrix}$$

$$H_k = \begin{pmatrix}
    1 & 0 & 0 \\
    0 & 1 & 0
\end{pmatrix}$$

Then the Extended Kalman Filter formulas are

* State prediction $$\hat x_{x \mid k-1} = f(\hat x_{k - 1 \mid k - 1}, \vec u_k)$$
* Prediction covariance $$P_{k \mid k - 1} = F_k P_{k - 1 \mid k - 1} F_k^T + V_k$$
* Kalman gain $$K_k = P_{k \mid k - 1} H_k^T {\left(H_k P_{k \mid k - 1} H_k^T + W_k\right)}^{-1}$$
* State update $$\hat x_{k \mid k} = \hat x_{k \mid k - 1} + K_k \left(z_k - h(\hat x_{k \mid k - 1}\right)$$
* State update covariance $$P_{k \mid k} = (I - K_k H_k)P_{k \mid k - 1}$$

In [None]:
def speeds(t):
    """Get the wheel speeds at the given time."""
    if 0 <= t <= 5:
        return np.array([[3, 3]]).T
    elif 5 < t <= 6:
        return np.array([[1, -1]]).T
    elif 6 < t <= 10:
        return np.array([[3, 3]]).T
    elif 10 < t <= 11:
        return np.array([[-1, 1]]).T
    elif 11 < t <= 16:
        return np.array([[3, 3]]).T
    else:
        return np.array([[0, 0]]).T

In [None]:
V = np.array([[0.05, 0.02, 0.01],
              [0.02, 0.05, 0.01],
              [0.01, 0.01, 0.1]])
W = np.array([[0.08, 0.02],
              [0.02, 0.07]])
H = np.array([[1, 0, 0],
              [0, 1, 0]])
P0 = np.array([[2, 0, 0],
               [0, 1, 0],
               [0, 0, 0.5]])
x0 = np.array([[0, 0, 0]]).T
dt = 0.2
r = 5
# L is the half-wheelbase
L = 12 / 2

In [None]:
def f(state, control):
    assert state.shape == (3, 1)
    assert control.shape == (2, 1)
    x, y, theta = state
    w1, w2 = control
    return np.array([
        x + (r * dt / 2) * (w1 + w2) * np.cos(theta),
        y + (r * dt / 2) * (w1 + w2) * np.sin(theta),
        theta + (r * dt / (2 * L)) * (w1 - w2)
    ])

In [None]:
def F(state, control):
    assert state.shape == (3, 1)
    assert control.shape == (2, 1)
    x, y, theta = state
    w1, w2 = control
    return np.array([[1, 0, (-r * dt / 2) * (w1 + w2) * np.sin(theta)],
                     [0, 1, (r * dt / 2) * (w1 + w2) * np.cos(theta)],
                     [0, 0, 1]])

In [None]:
def simulation(x0, t0):
    """Simulates the robot. Yields (state, observation, control) tuples."""
    while True:
        control = speeds(t0)
        x0 = f(x0, control) + np.random.multivariate_normal([0, 0, 0], V, 1).T
        assert x0.shape == (3, 1)
        observation = H @ x0 + np.random.multivariate_normal([0, 0], W, 1).T
        assert observation.shape == (2, 1)
        t0 += dt
        yield x0, observation, control

In [None]:
path = []
sim = simulation(x0, 0)
vals = [next(sim) for _ in np.arange(0, 16+dt, dt)]
path, observations, _ = zip(*vals)
x, y, theta = zip(*path)
zx, zy = zip(*observations)

plt.plot(x, y, label='Simulated Path')
plt.plot(zx, zy, '.', label='Observed Path')

plt.title('Differential Drive Robot Simulation')
plt.legend()
plt.show()

I'm confused by the wording of the problem. It states "Observations can be simulated by using the previous simulation data as actual data." but then it states "I.e., use this as the observed data." Shouldn't the observation add additional noise (using the $W$ covariance) because there's noise in both the process and the observation? The plot of the simulation above just isn't noisy enough for me to believe that that's all the noise I should add...

Note that the path varies wildly between random runs. This is because we are injecting noise *during* the simulation, not after.

In [None]:
def filter(x0, P0, t0):
    """Run the Extended Kalman Filter."""
    time = t0
    estimate = x0
    covariance = P0
    for actual_state, observation, control in simulation(x0, t0):
        assert actual_state.shape == (3, 1)
        assert observation.shape == (2, 1)
        assert control.shape == (2, 1)
        prediction = f(estimate, control)
        assert prediction.shape == (3, 1)
        pred_covar = F(estimate, control) @ covariance @ F(estimate, control).T + V
        assert pred_covar.shape == (3, 3)
        kalman_gain = pred_covar @ H.T @ sp.linalg.inv(H @ pred_covar @ H.T + W)
        assert kalman_gain.shape == (3, 2)
        # Take the first two elements of the prediction because that's
        # all we're observing
        estimate = prediction + kalman_gain @ (observation - H @ prediction)
        assert estimate.shape == (3, 1)
        covariance = (np.eye(3) - kalman_gain @ H) @ pred_covar
        
        yield actual_state, observation, estimate, covariance

In [None]:
# Creates its own simulation each time it's ran.
ekf = filter(x0, P0, 0)
vals = [next(ekf) for _ in np.arange(0, 16+dt, dt)]

In [None]:
states, observations, estimates, covariances = zip(*vals)

x, y, theta = zip(*states)
zx, zy = zip(*observations)
xhat, yhat, that = zip(*estimates)

# zip returns tuples. Convert to properly dimensioned numpy arrays for math.
x = np.array(x).reshape(int(16 / dt + 1))
y = np.array(y).reshape(int(16 / dt + 1))
yhat = np.array(yhat).reshape(int(16 / dt + 1))
xhat = np.array(xhat).reshape(int(16 / dt + 1))
that = np.array(that).reshape(int(16 / dt + 1))

# Ignore the cross variances because I'm not sure how to plot that...
tstd = np.array([np.sqrt(cov[2, 2]) for cov in covariances])

In [None]:
plt.plot(x, y, label='Simulated Path')
plt.plot(zx, zy, '.', label='Observed Path')
plt.plot(xhat, yhat, label='EKF Path', color='r')

plt.title('EKF Path Results')
plt.legend()
plt.xlabel('$x$')
plt.ylabel('$y$')
plt.show()

In [None]:
plt.plot(np.arange(0, 16+dt, dt), theta, label=r'$\theta$')
plt.plot(np.arange(0, 16+dt, dt), that, label=r'$\hat\theta$', color='r')
plt.fill_between(np.arange(0, 16+dt, dt), that - tstd, that + tstd, label='1 std dev', color='r', alpha=0.2)

plt.title(r'EKF $\theta$ estimate')
plt.legend()
plt.xlabel('$t$')
plt.ylabel(r'$\theta$')
plt.show()

In [None]:
plt.plot(x, y, label='Simulated Path')

for i in np.linspace(0, int(16 / dt), 5, dtype=int):
    plt.plot(xhat[i], yhat[i], '.', color='r')
    # Artificially expand the size of the ellipses because the plot
    # is zoomed out too far.
    plot_ellipse(covariances[i][0:2, 0:2], (xhat[i], yhat[i]), nstd=15, color='r', alpha=0.6)

plt.title('EKF Error Ellipses')
plt.legend()
plt.xlabel('$x$')
plt.ylabel('$y$')
plt.show()

Notes:

* The path given by the EKF is remarkably close to the actual simulated path. There are (at least) two reasons for this:
    1. We are observing both $x$ and $y$.
    2. The observation isn't very noisy.
* The path is wildly different than the noise-free path. This is because we inject noise at every step of the simulation, not afterwards. This is expected.
* The error ellipses at certain points along the path aren't very large at all. This is expected because we are observing both $x$ and $y$, so there will be only small amounts of variance for those variables. However, note the error bands on the plot of $\theta$ versus $t$. This is where the noise manifests itself. We are not observing $\theta$, so we have to rely on the physical model, which will amplify noise.
  
  If I were more ambitious, I would look into plotting 3D error ellipsoids so we could see that they are skinny in the $x$ and $y$ directions, bnut much fatter in the $\theta$ direction.

Here is the same problem, but with different (more extreme) noise profiles.

In [None]:
V = np.array([[0.05, 0.02, 0.01],
              [0.02, 0.05, 0.01],
              [0.01, 0.01, 0.1]])
W = np.array([[1.2, 0.3],
              [0.3, 0.9]])

In [None]:
path = []
sim = simulation(x0, 0)
vals = [next(sim) for _ in np.arange(0, 16+dt, dt)]
path, observations, _ = zip(*vals)
x, y, theta = zip(*path)
zx, zy = zip(*observations)

plt.plot(x, y, label='Simulated Path')
plt.plot(zx, zy, '.', label='Observed Path')

plt.title('Differential Drive Robot Simulation')
plt.legend()
plt.show()

In [None]:
# Creates its own simulation each time it's ran.
ekf = filter(x0, P0, 0)
vals = [next(ekf) for _ in np.arange(0, 16+dt, dt)]

In [None]:
states, observations, estimates, covariances = zip(*vals)

x, y, theta = zip(*states)
zx, zy = zip(*observations)
xhat, yhat, that = zip(*estimates)

# zip returns tuples. Convert to properly dimensioned numpy arrays for math.
x = np.array(x).reshape(int(16 / dt + 1))
y = np.array(y).reshape(int(16 / dt + 1))
yhat = np.array(yhat).reshape(int(16 / dt + 1))
xhat = np.array(xhat).reshape(int(16 / dt + 1))
that = np.array(that).reshape(int(16 / dt + 1))

# Ignore the cross variances because I'm not sure how to plot that...
tstd = np.array([np.sqrt(cov[2, 2]) for cov in covariances])

In [None]:
plt.plot(x, y, label='Simulated Path')
plt.plot(zx, zy, '.', label='Observed Path')
plt.plot(xhat, yhat, label='EKF Path', color='r')

plt.title('EKF Path Results')
plt.legend()
plt.xlabel('$x$')
plt.ylabel('$y$')
plt.show()

In [None]:
plt.plot(np.arange(0, 16+dt, dt), theta, label=r'$\theta$')
plt.plot(np.arange(0, 16+dt, dt), that, label=r'$\hat\theta$', color='r')
plt.fill_between(np.arange(0, 16+dt, dt), that - tstd, that + tstd, label='1 std dev', color='r', alpha=0.2)

plt.title(r'EKF $\theta$ estimate')
plt.legend()
plt.xlabel('$t$')
plt.ylabel(r'$\theta$')
plt.show()

In [None]:
plt.plot(x, y, label='Simulated Path')

for i in np.linspace(0, int(16 / dt), 5, dtype=int):
    plt.plot(xhat[i], yhat[i], '.', color='r')
    # Artificially expand the size of the ellipses because the plot
    # is zoomed out too far.
    plot_ellipse(covariances[i][0:2, 0:2], (xhat[i], yhat[i]), nstd=15, color='r', alpha=0.6)

plt.title('EKF Error Ellipses')
plt.legend()
plt.xlabel('$x$')
plt.ylabel('$y$')
plt.show()

Note that the error ellipses are indeed larger.

## ~~Problem 18.1.1~~

Deferred until after lectures are finished.

Let the domain be the rectangle $0 \leq x \leq 15$ and $0 \leq y \leq 10$. Place the start position at $(0,5)$. Place the end position at $(15,5)$. Assume you have circular obstacles centered at $(6,4)$ with radius $2$ and at $(8,6)$ with radius $3$. Find a potential function which can navigate the robot from the start to the end position.

1. Plot the resulting path in Python with obstacles included in the map.
2. Compare to the Wavefront approach.