### Problem Set 07: Gibbs sampling and Hidden Markov Models (HMMs)


In this problem set, you will explore Gibbs sampling and Hidden Markov Models (HMMs) by implementing some foundational algorithms.

0. [Credit for Contributors (required)](#contributors)

1. [Sampling (25 points)](#problem1)
  
2. [HMMs (35 points)](#problem2)
   * [Conditioning Warmup (5 points)](#conditioning-warmup)
   * [HMM (5 points)](#hmm)
   * [Transition distribution creation (5 points)](#transition-distribution)
   * [Observation variable creation (5 points)](#observation-variable)
   * [Observation CPT creation (5 points)](#observation-cpt)
   * [Viterbi Decoding (10 points)](#viterbi-decoding)

3. [Time Spent on Pset (5 points)](#part4)
    
**65 points** total for Problem Set 7

## <a id="contributors"></a> Credit for Contributors

List the various students, lecture notes, or online resouces that helped you complete this problem set:

Ex: I worked with Bob on the cat activity planning problem.

<div class="alert alert-info">
Write your answer in the cell below this one.
</div>

--> *(double click on this cell to delete this text and type your answer here)*

In [None]:
# Be sure to run the cell below to import the code needed for this assignment.
from __future__ import division

%matplotlib inline
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import copy
from collections import namedtuple
import itertools

# imports for autograder
from principles_of_autonomy.grader import Grader
from principles_of_autonomy.notebook_tests.pset_7 import TestPSet7
print("Loaded")

# <a id="problem1">Problem 1: Sampling (25 points)</a> 

Consider the following 3 node factor graph:

<center>
<img src="3node_Gibbs.png" width=500>
</center>

The table on the right gives three different CPT tables. Gibbs sampling did a poor job on generating samples that reflected the joint of these three variables for one of these CPTs. Which table was it, 1, 2 or 3?

In [7]:
## Enter your answer by changing the assignment expression below, e.g., q12_answer = 3
q1_answer = 0 

In [None]:
# test_1
Grader.run_single_test_inline(TestPSet7, "test_1", locals())

Consider the following Bayes net.

<center>
<img src="sampling_bayes.png" width=50%>
</center>

We are interested in drawing samples from the distribution described by this network, conditioned on evidence, using prior sampling with rejection.

Is this a good way to sample from $P(K \mid B = b)$?

In [9]:
## Enter your answer by changing the assignment expression below, e.g., q2_answer = 'no'
q2_answer = 'yes or no' 

In [None]:
# test_2
Grader.run_single_test_inline(TestPSet7, "test_2", locals())

Is this a good way to sample from $P(B \mid K = k)$?

In [11]:
## Enter your answer by changing the assignment expression below, e.g., q3_answer = 'no'
q3_answer = 'yes or no' 

In [None]:
# test_3
Grader.run_single_test_inline(TestPSet7, "test_3", locals())

Enter the set of nodes in the Markov Blanket of node G, e.g. `['B','D']`.

In [13]:
## Enter your answer by changing the assignment expression below, e.g., q3_answer = 'no'
q4_answer = () 

In [None]:
# test_4
Grader.run_single_test_inline(TestPSet7, "test_4", locals())

Enter the set of nodes in the Markov Blanket of node D, e.g. `['B','D']`.

In [15]:
## Enter your answer by changing the assignment expression below, e.g., q3_answer = 'no'
q5_answer = () 

In [None]:
# test_5
Grader.run_single_test_inline(TestPSet7, "test_5", locals())

# <a id="problem1">Problem 2: HMMs (35 points)</a> 

In [17]:
class RV:
    '''A random variable with a finite domain.

    Example usage:
      A = RV("A", ["x", "y", "z"])
      print(A.domain)
      print(A.dim)
      B = RV("B", [(0, 0), (0, 1), (0, 2)]))
      print(B.domain)
      print(B.dim)
    '''

    def __init__(self, name, domain):
        '''Initialize a RV.

        Args:
          name: str name for the RV.
          domain: list or tuple of domain values.
        '''
        assert isinstance(domain, (list, tuple))
        self.name = name
        self.domain = domain
        self.dim = len(domain)

    def __hash__(self):
        return hash((self.name, tuple(self.domain)))

    def __eq__(self, other):
        return self.name == other.name and self.domain == other.domain

    def __repr__(self):
        return f"RV('{self.name}', {self.domain})"

class CPT:
    '''A CPT over RVs.

    Example usage:
      A = RV("varA", ["x", "y", "z"])
      B = RV("varB", [0, 1])
      table = np.array([
        [0.1, 0.0],
        [0.4, 0.9],
        [0.5, 0.1]
      ])
      cpt = CPT([A, B], table)
      print(cpt.rvs)
      print(cpt.get(("y", 0)))
      print(cpt.get_by_rvs({A: "y", B: 0}))
      print(cpt.get_by_names({"varA": "y", "varB": 0}))
    '''

    def __init__(self, rvs, table):
        '''Create a cpt from a list of RVs and a numpy array.

        The order of the random variables corresponds to the axes
        of the numpy array.

        Args:
          rvs: A list or tuple of RVs.
          array: A numpy array of the probabilities.

        Returns:
          cpt: A CPT.'''
        assert isinstance(rvs, (tuple, list))
        assert len(rvs) == len(table.shape)
        assert all(rv.dim == dim for (rv, dim) in zip(rvs, table.shape))
        assert isinstance(table, np.ndarray)
        self.rvs = rvs
        self.table = table

    def set(self, assignment, new_value):
        '''Given a complete assignment and a value, update table.

        Args:
          assignment: A tuple of values in the order of self.rv.
          new_value: A new value to add to the table.

        Returns:
          value: The value in self.table.
        '''
        assert len(assignment) == len(self.rvs)
        indices = [None for _ in self.rvs]
        for index, value in enumerate(assignment):
            rv = self.rvs[index]
            indices[index] = rv.domain.index(value)
        self.table[tuple(indices)] = new_value

    def get(self, assignment):
        '''Given a complete assignment of values, lookup table value.

        Args:
          assignment: A tuple of values in the order of self.rv.

        Returns:
          value: The value in self.table.
        '''
        assert len(assignment) == len(self.rvs)
        indices = [None for _ in self.rvs]
        for index, value in enumerate(assignment):
            rv = self.rvs[index]
            indices[index] = rv.domain.index(value)
        return self.table[tuple(indices)]

    def get_by_rvs(self, rvs_to_vals):
        '''Given a complete assignment of RVs to values, lookup table value.

        Args:
          rvs_to_values: A dict from RVs to values in their domains.

        Returns:
          value: The value in self.table.
        '''
        assert set(rvs_to_vals.keys()) == set(self.rvs)
        indices = [None for _ in self.rvs]
        for rv, value in rvs_to_vals.items():
            index = self.rvs.index(rv)
            indices[index] = rv.domain.index(value)
        return self.table[tuple(indices)]

    def get_by_names(self, rv_name_dict):
        '''Given a dict from RV names (strs) to assignments,
        return the corresponding value in the CPT table.

        Args:
          rv_name_dict: A dict from str names to values.
          cpt: A CPT.

        Returns:
          value: The float value from cpt.table.
        '''
        assert len(rv_name_dict) == len(self.rvs)
        rv_name_to_rv = {rv.name: rv for rv in self.rvs}
        rvs_to_vals = {}
        for rv_name, value in rv_name_dict.items():
            rv = rv_name_to_rv[rv_name]
            rvs_to_vals[rv] = value
        return self.get_by_rvs(rvs_to_vals)

    def __hash__(self):
        return hash(tuple(self.rvs)) ^ hash(self.table.tobytes())

    def __eq__(self, other):
        return hash(self) == hash(other)

    def __neq__(self, other):
        return not (self == other)

    def allclose(self, other, decimals=6):
        '''Check whether two CPTs are (nearly) equal.
        '''
        if set(self.rvs) != set(other.rvs):
            raise ValueError("Can only compare CPTs with the same RVs.")
        new_idxs = [other.rvs.index(rv) for rv in self.rvs]
        trans_table2 = np.transpose(other.table, new_idxs)
        assert self.table.shape == trans_table2.shape
        return np.allclose(self.table, trans_table2)


def condition_CPT(cpt, evidence):
    """Given a CPT and an assignment of one or more RVs as evidence,
        create a new posterior CPT representing the effect of the evidence.

    Example usage:
        A, B = RV('A', ["x", "y", "z"]), RV('B', [0, 1])
        cpt = CPT([A, B],
            np.array([
                [1, 0],
                [0, 1],
                [0, 0],
        ]))
        evidence = {A: "y"}
        result = posterior_CPT(cpt, evidence)
        assert result.rvs == [B]
        assert result.get_by_rvs({B: 0}) == 0
        assert result.get_by_rvs({B: 1}) == 1

    Args:
        cpt: A CPT.
        evidence: A dict from RVs to values.

    Returns:
        posterior_CPT: A CPT.
    """
    new_rvs = [rv for rv in cpt.rvs if rv not in evidence]
    idxs = []
    for rv in cpt.rvs:
        if rv in evidence:
            value = evidence[rv]
            idx = rv.domain.index(value)
        else:
            idx = slice(None)
        idxs.append(idx)
    new_table = cpt.table[tuple(idxs)].copy()
    return CPT(new_rvs, new_table)


def normalize_CPT(cpt):
    """Normalize the values of a CPT so they sum to 1.

    Args:
        cpt: A CPT.

    Returns:
        new_cpt: A CPT.
    """
    denom = cpt.table.sum()
    if denom == 0:
        raise Exception("WARNING: tried to normalize an all-zeros CPT.")
    return CPT(cpt.rvs, cpt.table / denom)


def max_CPT(cpt, rv=None):
    """Create a new CPT where rv has been maximized out.

    If rv is None, then this function returns the max
    over all assignments in the CPT.

    Example usage:
        A, B = RV('A', ["x", "y", "z"]), RV('B', [0, 1])
        cpt = CPT([A, B],
            np.array([
                [1., 5.],
                [2., 6.],
                [3., 4.],
        ]))
        assert max_CPT(cpt) == 6.
        cpt1 = max_CPT(cpt, rv=A)
        assert cpt1.rvs == [B]
        assert cpt1.get_by_rvs({B: 0}) == 3
        assert cpt1.get_by_rvs({B: 1}) == 6
        cpt2 = max_CPT(cpt, rv=B)
        assert cpt2.rvs == [A]
        assert cpt2.get_by_rvs({A: "x"}) == 5
        assert cpt2.get_by_rvs({A: "y"}) == 6
        assert cpt2.get_by_rvs({A: "z"}) == 4

    Args:
        cpt: A CPT.
        rv: A random variables in the CPT or None.

    Returns:
        new_cpt: A CPT.
    """
    if rv is None:
        return cpt.table.max()
    axis = cpt.rvs.index(rv)
    new_table = cpt.table.max(axis=axis)
    new_rvs = list(cpt.rvs)
    new_rvs.remove(rv)
    return CPT(new_rvs, new_table)


def argmax_CPT(cpt, rv=None):
    """Create a new "CPT" where the values are domain assignments
    of the given rv that lead to maximum values.

    If rv is None, then this function returns the argmax assignment
    over all assignments in the CPT.

    Example usage:
        A, B = RV('A', ["x", "y", "z"]), RV('B', [0, 1])
        cpt = CPT([A, B],
            np.array([
                [1., 5.],
                [2., 0.],
                [3., 4.],
        ]))
        assert argmax_CPT(cpt) == ("x", 1)
        cpt1 = argmax_CPT(cpt, rv=A)
        assert cpt1.rvs == [B]
        assert cpt1.get_by_rvs({B: 0}) == "z"
        assert cpt1.get_by_rvs({B: 1}) == "x"
        cpt2 = argmax_CPT(cpt, rv=B)
        assert cpt2.rvs == [A]
        assert cpt2.get_by_rvs({A: "x"}) == 1
        assert cpt2.get_by_rvs({A: "y"}) == 0
        assert cpt2.get_by_rvs({A: "z"}) == 1

    Args:
        cpt: A CPT.
        rv: A random variable in the CPT or None.

    Returns:
        new_cpt: A CPT.
    """
    if rv is None:
        idxs = np.unravel_index(cpt.table.argmax(), cpt.table.shape)
        return tuple(v.domain[idx] for v, idx in zip(cpt.rvs, idxs))
    axis = cpt.rvs.index(rv)
    new_table_idxs = cpt.table.argmax(axis=axis)
    new_table = np.empty(new_table_idxs.shape, dtype=object)
    for idx, val in np.ndenumerate(new_table_idxs):
        new_table[idx] = rv.domain[val]
    new_rvs = list(cpt.rvs)
    new_rvs.remove(rv)
    return CPT(new_rvs, new_table)


HMM = namedtuple("HMM", [
    "state_rvs",  # A list of RVs, one per timestep
    "observation_rvs",  # A list of RVs, one per timestep
    "transition_CPTs",  # A list of CPTs, one per transition
    "observation_CPTs",  # A list of CPTs, one per timestep
    "initial_distribution"  # A single CPT
])

def iter_joint_values(rvs):
    '''Iterates over joint assignments for a list of RVs.

    Returns an iterator that can be used in a for loop.

    Example usage:
      for assignment in iter_joint_values(rvs):
        print(assignment)  # a tuple
        assert assignment[0] in rvs[0].domain

    Args:
      rvs: A list of RVs.

    Yields:
      assignment: A tuple of ints representing a joint
        assignment of the random variables.
    '''
    domains = [rv.domain for rv in rvs]
    return itertools.product(*domains)

def get_sub_assignment(rvs, assignment, sub_rvs):
    '''Given an assignment of rvs to values, get a subassignment,
    that is, a sub-tuple of the given assignment involving only
    the given sub_rvs.

    Example usage:
      x = RV("x", [0, 1])
      y = RV("y", ["a", "b"])
      z = RV("z", [3, 5])
      rvs = (x, y, z)
      assignment = (0, "b", 3)
      sub_rvs = (z, x)
      sub_assignment = get_sub_assignment(rvs, assignment, sub_rvs)
      assert sub_assignment == (3, 0)

    Args:
      rvs: A tuple or list of RVs.
      assignment: A tuple or list of values.
      sub_rvs: A tuple or list of RVs, a subset of rvs.

    Returns:
      sub_assignment: A tuple of values.
    '''
    assert set(sub_rvs).issubset(set(rvs))
    sub_assignment = []
    for rv in sub_rvs:
        idx = rvs.index(rv)
        val = assignment[idx]
        sub_assignment.append(val)
    return tuple(sub_assignment)

def multiply_CPTs(cpts):
    '''Multiply CPTs together.

    Args:
      cpts: A list of CPTs.

    Returns:
      result: A new CPT.
    '''
    # Note: this implementation favors clarity over
    # efficiency. If you are curious about a more
    # efficient implementation, please ask course staff :)

    # Collect all random variables
    rvs = set()
    for t in cpts:
        rvs |= set(t.rvs)
    rvs = list(rvs)

    # Initialize result table
    table = np.empty([rv.dim for rv in rvs])

    # Initialize result CPT
    result = CPT(rvs, table)

    # Iterate over assignments in the new CPT
    for assignment in iter_joint_values(rvs):
        value = 1.
        for t in cpts:
            # Get the sub-assignment
            sub_assignment = get_sub_assignment(rvs, assignment, t.rvs)
            # Compute product
            value *= t.get(sub_assignment)
        # Add value to result table
        result.set(assignment, value)

    # Finalize CPT
    return result


def create_hmm(obstacle_map, num_timesteps, noise_prob=0.):
    """Converts a map into an HMM.

    Creates the state RVs, the observation RVs, the transition CPTs,
    and the observation CPTs.

    You should not need to use this; it's used in tests.

    Args:
      obstacle_map: A list of lists of ints; see example and
          description in `create_state_variable`.
      num_timesteps: An int number of timesteps, must be >= 1.
      noise_prob: Observation noise parameter.s

    Returns:
      problem: An HMM.
    """
    assert num_timesteps >= 1

    # Create variables
    state_rvs = []
    observation_rvs = []
    for t in range(num_timesteps):
        state_t = create_state_variable(obstacle_map, f"state{t}")
        obs_t = create_observation_variable(f"observation{t}")
        state_rvs.append(state_t)
        observation_rvs.append(obs_t)

    # Create observation CPTs
    observation_CPTs = []
    for state, obs in zip(state_rvs, observation_rvs):
        obs_cpt_t = create_observation_cpt(obstacle_map, state, obs,
                                                 noise_prob=noise_prob)
        observation_CPTs.append(obs_cpt_t)

    # Create transition CPTs
    transition_CPTs = []
    if num_timesteps > 1:
        for state_t, state_t1 in zip(state_rvs[:-1], state_rvs[1:]):
            trans_cpt_t = create_transition_cpt(obstacle_map, state_t, state_t1)
            transition_CPTs.append(trans_cpt_t)

    # Create (uniform) initial state distribution
    init_state_rv = state_rvs[0]
    table = np.ones((init_state_rv.dim,)) / init_state_rv.dim
    initial_distribution = CPT([init_state_rv], table)

    return HMM(state_rvs, observation_rvs, transition_CPTs,
               observation_CPTs, initial_distribution)

In this section, we will implement an HMM for a robot that is moving around rando
mly in a 2D grid with obstacles. The robot has sensors that allow it to detect ob
stacles in its immediate vicinity. It knows the grid map, with the locations of all obstacles, but it is uncertain about its own location in the grid. We will use Viterbi to determine the most likely locations for the robot given a sequence of local and potentially noisy observations.

Concretely, we will represent the 2D grid with obstacles as a list of lists, where 1s represent obstacles and 0s represent free space. Example:

```
obstacle_map = [
  [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0],
  [1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1],
  [1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0],
  [0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0]
]
```

The state of the robot is its location in the grid, represented as a tuple of ints, (row, col). Transitions are uniformly distributed amongst the robot's **current location** and the neighboring free (not obstacle) locations, where neighboring = 4 cardinal directions (up, down, left, right).

Observations are a 4-tuple that list which directions have obstacles, in order [N E S W], with a 1 for an obstacle and 0 for no obstacle. Observations that are "off the map" are 1, as though they are obstacles. For instance, in the map above, if there were no observation noise, then the observation for the top left corner (state=(0, 0)) would be (1, 0, 1, 1). Observations can also be corrupted with noise; see the `create_observation_cpt` docstring for more details.

Our ultimate task will be to take in a sequence of observations and return the corresponding sequence of most likely states.

### 2.1 Conditioning Warmup (5 points)

Given a CPT and a variable that occurs in it, return a new CPT conditioned on the specified variable having value 0.

For reference, our solution is **1** line(s) of code.

In [18]:
def conditioning_warmup(cpt, rv):
    '''Given a CPT and a variable that occurs in it, return a new
    CPT conditioned on the specified variable having value 0.

    Args:
      cpt: CPT
      rv: A RV in cpt that has 0 in its domain.

    Returns:
      new_cpt: CPT
    '''
    raise NotImplementedError()

In [None]:
# test_6
Grader.run_single_test_inline(TestPSet7, "test_6", locals())

### 2.2 HMM (5 points)

Write a function that creates a random variable for a state at a given time step in an obstacle HMM. The domain of the state variable should be a list of (row, col) indices into the map. Only free positions (not obstacles) should be included in the domain of the state variable. See docstring for more description.

For reference, our solution is **2** line(s) of code.

In [20]:
def create_state_variable(obstacle_map, name):
    '''Creates a RV for the HMM state.

    The state can be any position in the map.

    Example map:
      obstacle_map = [
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0],
        [1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1],
        [1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0],
        [0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0]
      ]

    Ones are obstacles and zeros are free positions.

    The domain of the state variable should be a list of (row, col)
    indices into the map. Only free positions (not obstacles) should
    be included in the domain of the state variable.

    The domain should be in row-major order. For example, an empty
    2x2 obstacle map should lead to the domain:

      [(0, 0), (0, 1), (1, 0), (1, 1)].

    Args:
      obstacle_map: A list of lists of ints, see example above.
      name: A str name for the state variable.

    Returns:
      state_var: A RV as described above.
    '''
    raise NotImplementedError()


In [None]:
# test_7
Grader.run_single_test_inline(TestPSet7, "test_7", locals())

### 2.3 Transition distribution creation (5 points)

Write a function that creates a CPT for the transition distribution between states $s_{t}$ and $s_{t+1}$. Refer to the previous question for more information about the state variables and their domains.

For reference, our solution is **10** line(s) of code.

In [22]:
def create_transition_cpt(obstacle_map, st, st1):
    '''Write a function to create a CPT for the transition from state s_t
    to s_{t+1}, in an HMM that corresponds to the map.

    Transitions are uniformly distributed amongst the robot's current
    location and the neighboring free (not obstacle) locations, where
    neighboring = 4 cardinal directions (up, down, left, right).

    Hint: remember that if we have a CPT with two variables A and B,
    with dimension N and M, then the CPT table will be a numpy array
    of shape (N, M). Furthermore, the CPT value for the i^{th} domain
    value of A and the j^{th} domain value of B will be table[i, j]. With
    this in mind, you may find it useful to use the following pattern in
    your code somewhere:

    ```
    for i, (prev_r, prev_c) in enumerate(st.domain):
      ...
      for j, (next_r, next_c) in enumerate(st1.domain):
        ...
        table[i, j] = ...
    ```

    Args:
      st: An RV representing the state at time t.
      st1: An RV representing the state at time t+1.
      obstacle_map: A list of lists of ints;
          see example and description in `create_state_variable`.

    Returns:
      cpt: A CPT for the transition between st and st1.
    '''
    raise NotImplementedError()


In [None]:
# test_8
Grader.run_single_test_inline(TestPSet7, "test_8", locals())

### 2.4 Observation variable creation (5 points)
Write a function that creates a random variable for an observation at a given time step in an obstacle HMM. See docstring for description.

For reference, our solution is **2** line(s) of code.

In [24]:
def create_observation_variable(name):
    '''Creates a RV for the HMM observation with the given name.

    Observations are a 4-tuple that list which directions have obstacles,
    in order [N E S W], with a 1 for an obstacle and 0 for no obstacle.

    Observations that are "off the map" are 1, as though they are obstacles.

    For instance, in the following map:

      obstacle_map = [
        [0, 1],
        [0, 0],
      ]

    if there were no observation noise, then the observation for the top left
    location would be (1, 1, 0, 1).

    The domain of the observation variable should be a list of 4-tuples.

    Hint: you may find it useful to use `itertools.product`. For example,
    see what happens with `list(itertools.product(["foo", "bar"], repeat=2))`.

    Args:
      name: A str name for the variable.

    Returns:
      zt: A RV as described above.
    '''
    raise NotImplementedError()


In [None]:
# test_9
Grader.run_single_test_inline(TestPSet7, "test_9", locals())

### 2.5 Observation CPT creation</subsection> (5 points)

Write a function that creates a CPT for the observation distribution between $s_{t}$ and $z_{t}$.

For reference, our solution is **26** line(s) of code.


In [26]:
def create_observation_cpt(obstacle_map, state_rv, observation_rv,
                                 noise_prob=0.):
    '''Write a function to create a CPT between state_rv
    and observation_rv in an HMM that corresponds to the map.

    You can assume that state_rv was created by `create_state_variable`
    and observation_rv was created by `create_observation_variable`.

    See `create_observation_variable` for a description of the
    observation model. Recall the order is [N E S W].

    If noise_prob = 0., then the observations are noise-free. That is,
    you observe 0 if there is a free space and 1 otherwise.

    In general, for each of the four observation entries, with
    probability 1 - noise_prob, the entry will be "correct"; with
    probability noise_prob, the entry will be incorrect, that is,
    the opposite of the true occupancy.

    So if the noise-free observation would be (1, 0, 0, 1), then
    the probability of observation (1, 1, 0, 1) would be
    noise_prob*(1 - noise_prob)^3.

    Args:
      obstacle_map: A list of lists of ints;
          see example and description in `create_state_variable`.
      state_rv: An RV representing the state at time t.
      observation_rv: An RV representing the observation at time t.
      noise_prob: A float between 0 and 1 indicating the probability
          that an observation flips.

    Returns:
      cpt: A CPT for the distribution between st and zt.
    '''

    raise NotImplementedError() 


In [None]:
# test_10
Grader.run_single_test_inline(TestPSet7, "test_10", locals())

### 2.6 Viterbi Decoding</subsection> (10 points)

Complete the implementation of Viterbi decoding. Note: in general, it is a good idea for numerical stability to implement Viterbi in log space. However, for the purpose of this homework, we are not expecting your implementation to be in log space. Moreover, the provided helper functions (e.g., `multiply_CPTs`) assume that you are _not_ working in log space.

For reference, our solution is **23** line(s) of code after comments are removed.

In [28]:
def run_viterbi(hmm, observations):
    '''Run the Viterbi algorithm to compute the most likely state
    assignments given observations.

    Normalize the messages after each step to avoid numerical instability.

    Args:
      hmm: An HMM.
      observations: A list of observation values.

    Returns:
      most_likely_states: A list of state values.
    '''
    raise NotImplementedError() 

In [None]:
# test_11
Grader.run_single_test_inline(TestPSet7, "test_11", locals())

# <a name="part4"></a> Time Spent on Pset (5 points)

Please use [this form](https://forms.gle/MV7NCiLhhTnpYt9x5) to tell us how long you spent on this pset. After you submit the form, the form will give you a confirmation word. Please enter that confirmation word below to get an extra 5 points. 

In [30]:
form_confirmation_word = #"ENTER THE CONFIRMATION WORD HERE"

In [None]:
# Run all tests
Grader.grade_output([TestPSet7], [locals()], "results.json")
Grader.print_test_results("results.json")