# Homework 4

## Imports and Utilities
**Note**: these imports and functions are available in catsoop. You do not need to copy them in.

In [None]:
## Same as hw03
from collections import namedtuple
import itertools
import numpy as np


class RV:
  """A random variable with a finite domain.

  Example usage:
    A = RV("A", ["x", "y", "z"])
    print(A.domain)
    print(A.dim)
    B = RV("B", [(0, 0), (0, 1), (0, 2)]))
    print(B.domain)
    print(B.dim)
  """
  def __init__(self, name, domain):
    """Initialize a RV.

    Args:
      name: str name for the RV.
      domain: list or tuple of domain values.
    """
    assert isinstance(domain, (list, tuple))
    self.name = name
    self.domain = domain
    self.dim = len(domain)

  def __hash__(self):
    return hash((self.name, tuple(self.domain)))

  def __eq__(self, other):
    return self.name == other.name and self.domain == other.domain

  def __repr__(self):
    return f"RV('{self.name}', {self.domain})"


class Potential:
  """A potential over RVs.
  
  Example usage:
    A = RV("varA", ["x", "y", "z"])
    B = RV("varB", [0, 1])
    table = np.array([
      [0.1, 0.0],
      [0.4, 0.9],
      [0.5, 0.1]
    ])
    potential = Potential([A, B], table)
    print(potential.rvs)
    print(potential.get(("y", 0)))
    print(potential.get_by_rvs({A: "y", B: 0}))
    print(potential.get_by_names({"varA": "y", "varB": 0}))
  """
  def __init__(self, rvs, table):
    """Create a potential from a list of RVs and a numpy array.

    The order of the random variables corresponds to the axes
    of the numpy array.

    Args:
      rvs: A list or tuple of RVs.
      array: A numpy array of potential values.

    Returns:
      potential: A Potential."""
    assert isinstance(rvs, (tuple, list))
    assert len(rvs) == len(table.shape)
    assert all(rv.dim == dim for (rv, dim) in zip(rvs, table.shape))
    assert isinstance(table, np.ndarray)
    self.rvs = rvs
    self.table = table

  def set(self, assignment, new_value):
    """Given a complete assignment and a value, update table.
    
    Args:
      assignment: A tuple of values in the order of self.rv.
      new_value: A new value to add to the table.
      
    Returns:
      value: The value in self.table.
    """
    assert len(assignment) == len(self.rvs)
    indices = [None for _ in self.rvs]
    for index, value in enumerate(assignment):
      rv = self.rvs[index]
      indices[index] = rv.domain.index(value)
    self.table[tuple(indices)] = new_value

  def get(self, assignment):
    """Given a complete assignment of values, lookup table value.
    
    Args:
      assignment: A tuple of values in the order of self.rv.
      
    Returns:
      value: The value in self.table.
    """
    assert len(assignment) == len(self.rvs)
    indices = [None for _ in self.rvs]
    for index, value in enumerate(assignment):
      rv = self.rvs[index]
      indices[index] = rv.domain.index(value)
    return self.table[tuple(indices)]
    
  def get_by_rvs(self, rvs_to_vals):
    """Given a complete assignment of RVs to values, lookup table value.
    
    Args:
      rvs_to_values: A dict from RVs to values in their domains.
      
    Returns:
      value: The value in self.table.
    """
    assert set(rvs_to_vals.keys()) == set(self.rvs)
    indices = [None for _ in self.rvs]
    for rv, value in rvs_to_vals.items():
      index = self.rvs.index(rv)
      indices[index] = rv.domain.index(value)
    return self.table[tuple(indices)]

  def get_by_names(self, rv_name_dict):
    """Given a dict from RV names (strs) to assignments,
    return the corresponding value in the potential table.

    Args:
      rv_name_dict: A dict from str names to values.
      potential: A Potential.

    Returns:
      value: The float value from potential.table.
    """
    assert len(rv_name_dict) == len(self.rvs)
    rv_name_to_rv = {rv.name: rv for rv in self.rvs}
    rvs_to_vals = {}
    for rv_name, value in rv_name_dict.items():
      rv = rv_name_to_rv[rv_name]
      rvs_to_vals[rv] = value
    return self.get_by_rvs(rvs_to_vals)

  def __hash__(self):
    return hash(tuple(self.rvs)) ^ hash(self.table.tobytes())

  def __eq__(self, other):
    return hash(self) == hash(other)

  def __neq__(self, other):
    return not (self == other)

  def allclose(self, other, decimals=6):
    """Check whether two potentials are (nearly) equal.
    """
    if set(self.rvs) != set(other.rvs):
      raise ValueError("Can only compare potentials with the same RVs.")
    new_idxs = [other.rvs.index(rv) for rv in self.rvs]
    trans_table2 = np.transpose(other.table, new_idxs)
    assert self.table.shape == trans_table2.shape
    return np.allclose(self.table, trans_table2)


def iter_joint_values(rvs):
  """Iterates over joint assignments for a list of RVs.

  Returns an iterator that can be used in a for loop.

  Example usage:
    for assignment in iter_joint_values(rvs):
      print(assignment)  # a tuple
      assert assignment[0] in rvs[0].domain
  
  Args:
    rvs: A list of RVs.

  Yields:
    assignment: A tuple of ints representing a joint
      assignment of the random variables.
  """
  domains = [rv.domain for rv in rvs]
  return itertools.product(*domains)


def get_sub_assignment(rvs, assignment, sub_rvs):
  """Given an assignment of rvs to values, get a subassignment,
  that is, a sub-tuple of the given assignment involving only
  the given sub_rvs.

  Example usage:
    x = RV("x", [0, 1])
    y = RV("y", ["a", "b"])
    z = RV("z", [3, 5])
    rvs = (x, y, z)
    assignment = (0, "b", 3)
    sub_rvs = (z, x)
    sub_assignment = get_sub_assignment(rvs, assignment, sub_rvs)
    assert sub_assignment == (3, 0)
  
  Args:
    rvs: A tuple or list of RVs.
    assignment: A tuple or list of values.
    sub_rvs: A tuple or list of RVs, a subset of rvs.
    
  Returns:
    sub_assignment: A tuple of values.
  """
  assert set(sub_rvs).issubset(set(rvs))
  sub_assignment = []
  for rv in sub_rvs:
    idx = rvs.index(rv)
    val = assignment[idx]
    sub_assignment.append(val)
  return tuple(sub_assignment)


def multiply_potentials(potentials):
  """Multiply potentials together.

  Args:
    potentials: A list of Potentials.
  
  Returns:
    result: A new Potential.
  """
  # Note: this implementation favors clarity over
  # efficiency. If you are curious about a more
  # efficient implementation, please ask course staff :)

  # Collect all random variables
  rvs = set()
  for pot in potentials:
    rvs |= set(pot.rvs)
  rvs = list(rvs)

  # Initialize result table
  table = np.empty([rv.dim for rv in rvs])

  # Initialize result potential
  result = Potential(rvs, table)

  # Iterate over assignments in the new potential
  for assignment in iter_joint_values(rvs):
    value = 1.
    for pot in potentials:
      # Get the sub-assignment
      sub_assignment = get_sub_assignment(rvs, assignment, pot.rvs)
      # Compute product
      value *= pot.get(sub_assignment)
    # Add value to result table
    result.set(assignment, value)

  # Finalize potential
  return result


### New for HW04 ###
HMM = namedtuple("HMM", [
    "state_rvs",  # A list of RVs, one per timestep
    "observation_rvs",  # A list of RVs, one per timestep
    "transition_potentials",  # A list of Potentials, one per transition
    "observation_potentials",  # A list of Potentials, one per timestep
    "initial_distribution"  # A single Potential
])


def condition_potential(potential, condition):
  """Given a potential and an assignment of one or more RVs,
    create a new potential representing the conditional.

  Example usage:
    A, B = RV('A', ["x", "y", "z"]), RV('B', [0, 1])
    pot = Potential([A, B],
      np.array([
        [1, 0],
        [0, 1],
        [0, 0],
    ]))
    condition = {A: "y"}
    result = condition_potential(pot, condition)
    assert result.rvs == [B]
    assert result.get_by_rvs({B: 0}) == 0
    assert result.get_by_rvs({B: 1}) == 1

  Args:
    potential: A Potential.
    condition: A dict from RVs to values.

  Returns:
    conditional_potential: A Potential.
  """
  new_rvs = [rv for rv in potential.rvs if rv not in condition]
  idxs = []
  for rv in potential.rvs:
    if rv in condition:
      value = condition[rv]
      idx = rv.domain.index(value)
    else:
      idx = slice(None)
    idxs.append(idx)
  new_table = potential.table[tuple(idxs)].copy()
  return Potential(new_rvs, new_table)


def normalize_potential(potential):
  """Normalize the values of a potential so they sum to 1.

  Args:
    potential: A Potential.

  Returns:
    new_potential: A Potential.
  """
  denom = potential.table.sum()
  if denom == 0:
    raise Exception("WARNING: tried to normalize an all-zeros potential.")
  return Potential(potential.rvs, potential.table / denom)


def max_potential(potential, rv=None):
  """Create a new potential where rv has been maximized out.
  
  If rv is None, then this function returns the max
  over all assignments in the potential.

  Example usage:
    A, B = RV('A', ["x", "y", "z"]), RV('B', [0, 1])
    pot = Potential([A, B],
      np.array([
        [1., 5.],
        [2, 6.],
        [3., 4.],
    ]))
    assert max_potential(pot) == 6.
    pot1 = max_potential(pot, rv=A)
    assert pot1.rvs == [B]
    assert pot1.get_by_rvs({B: 0}) == 3
    assert pot1.get_by_rvs({B: 1}) == 6
    pot2 = max_potential(pot, rv=B)
    assert pot2.rvs == [A]
    assert pot2.get_by_rvs({A: "x"}) == 5
    assert pot2.get_by_rvs({A: "y"}) == 6
    assert pot2.get_by_rvs({A: "z"}) == 4

  Args:
    potential: A Potential.
    rv: A random variables in the potential or None.
  
  Returns:
    new_potential: A Potential.
  """
  if rv is None:
    return potential.table.max()
  axis = potential.rvs.index(rv)
  new_table = potential.table.max(axis=axis)
  new_rvs = list(potential.rvs)
  new_rvs.remove(rv)
  return Potential(new_rvs, new_table)


def argmax_potential(potential, rv=None):
  """Create a new "potential" where the values are domain assignments
  of the given rv that lead to maximum values.

  If rv is None, then this function returns the argmax assignment
  over all assignments in the potential.

  Example usage:
    A, B = RV('A', ["x", "y", "z"]), RV('B', [0, 1])
    pot = Potential([A, B],
      np.array([
        [1., 5.],
        [2, 0.],
        [3., 4.],
    ]))
    assert argmax_potential(pot) == ("x", 1)
    pot1 = argmax_potential(pot, rv=A)
    assert pot1.rvs == [B]
    assert pot1.get_by_rvs({B: 0}) == "z"
    assert pot1.get_by_rvs({B: 1}) == "x"
    pot2 = argmax_potential(pot, rv=B)
    assert pot2.rvs == [A]
    assert pot2.get_by_rvs({A: "x"}) == 1
    assert pot2.get_by_rvs({A: "y"}) == 0
    assert pot2.get_by_rvs({A: "z"}) == 1  
  
  Args:
    potential: A Potential.
    rv: A random variable in the potential or None.
  
  Returns:
    new_potential: A Potential.
  """
  if rv is None:
    idxs = np.unravel_index(potential.table.argmax(), potential.table.shape)
    return tuple(v.domain[idx] for v, idx in zip(potential.rvs, idxs))
  axis = potential.rvs.index(rv)
  new_table_idxs = potential.table.argmax(axis=axis)
  new_table = np.empty(new_table_idxs.shape, dtype=object)
  for idx, val in np.ndenumerate(new_table_idxs):
    new_table[idx] = rv.domain[val]
  new_rvs = list(potential.rvs)
  new_rvs.remove(rv)
  return Potential(new_rvs, new_table)

def create_hmm(obstacle_map, num_timesteps):
  """Converts a map into an HMM.
  
  Creates the state RVs, the observation RVs, the transition potentials,
  and the observation potentials.

  You should not need to use this; it's used in tests.

  Args:
    obstacle_map: A list of lists of ints; see example and
        description in `create_state_variable`.
    num_timesteps: An int number of timesteps, must be >= 1.

  Returns:
    problem: An HMM.
  """
  assert num_timesteps >= 1

  # Create variables
  state_rvs = []
  observation_rvs = []
  for t in range(num_timesteps):
    state_t = create_state_variable(obstacle_map, f"state{t}")
    obs_t = create_observation_variable(f"observation{t}")
    state_rvs.append(state_t)
    observation_rvs.append(obs_t)

  # Create observation potentials
  observation_potentials = []
  for state, obs in zip(state_rvs, observation_rvs):
    obs_pot_t = create_observation_potential(obstacle_map, state, obs)
    observation_potentials.append(obs_pot_t)

  # Create transition potentials
  transition_potentials = []
  if num_timesteps > 1:
    for state_t, state_t1 in zip(state_rvs[:-1], state_rvs[1:]):
      trans_pot_t = create_transition_potential(obstacle_map, state_t, state_t1)
      transition_potentials.append(trans_pot_t)

  # Create (uniform) initial state distribution
  init_state_rv = state_rvs[0]
  table = np.ones((init_state_rv.dim,)) / init_state_rv.dim
  initial_distribution = Potential([init_state_rv], table)
  
  return HMM(state_rvs, observation_rvs, transition_potentials,
             observation_potentials, initial_distribution)



## Problems

### State variable creation
Write a function that creates a random variable for a state at a given time step in an obstacle HMM. See docstring for description.

For reference, our solution is **3** lines of code.

In [None]:
def create_state_variable(obstacle_map, name):
  """Creates a RV for the HMM state.

  The state can be any position in the map.

  Example map:
    obstacle_map = [
      [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0],
      [1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1],
      [1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0],
      [0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0]
    ]

  Ones are obstacles and zeros are free positions.

  The domain of the state variable should be a list of (row, col)
  indices into the map. Only free positions (not obstacles) should
  be included in the domain of the state variable.

  Args:
    obstacle_map: A list of lists of ints, see example above.
    name: A str name for the state variable.

  Returns:
    state_var: A RV as described above.
  """
  raise NotImplementedError("Implement me!")

Tests

In [None]:
  def create_state_variable_test():
    map1 = [[0, 1, 0],
            [1, 0, 0]]
    rv = create_state_variable(map1, "current_state")
    assert rv.name == "current_state"
    assert rv.dim == 4
    assert set(rv.domain) == {(0, 0), (0, 2), (1, 1), (1, 2)}

create_state_variable_test()
print('Tests passed.')

### Transition potential creation
Write a function that creates a potential for the transition distribution between states $s_{t}$ and $s_{t+1}$.

For reference, our solution is **12** lines of code.

In [None]:
def create_transition_potential(obstacle_map, st, st1):
  """Write a function to create a potential for the transition from state s_t
  to s_{t+1}, in an HMM that corresponds to the map.

  Transitions are uniformly distributed amongst the robot's current
  location and the neighboring free (not obstacle) locations, where
  neighboring = 4 cardinal directions (up, down, left, right).

  Args:
    st: An RV representing the state at time t.
    st1: An RV representing the state at time t+1.
    obstacle_map: A list of lists of ints;
        see example and description in `create_state_variable`.

  Returns:
    potential: A Potential for the transition between st and st1.
  """
  raise NotImplementedError("Implement me!")

Tests

In [None]:
  def create_transition_potential_test():
    map1 = [[0, 0, 0],
            [1, 0, 1]]
    s0 = create_state_variable(map1, "state_0")
    s1 = create_state_variable(map1, "state_1")
    pot = create_transition_potential(map1, s0, s1)
    assert set(pot.rvs) == {s0, s1}
    assert pot.get_by_names({"state_0": (0, 0), "state_1": (0, 0)}) == 0.5
    assert pot.get_by_names({"state_0": (0, 0), "state_1": (0, 1)}) == 0.5
    assert pot.get_by_names({"state_0": (0, 0), "state_1": (1, 1)}) == 0.
    assert pot.get_by_names({"state_0": (0, 1), "state_1": (1, 1)}) == 0.25
    assert pot.get_by_names({"state_0": (0, 1), "state_1": (0, 1)}) == 0.25
    assert pot.get_by_names({"state_0": (0, 1), "state_1": (0, 2)}) == 0.25
    assert pot.get_by_names({"state_0": (0, 1), "state_1": (0, 0)}) == 0.25
    assert pot.get_by_names({"state_0": (1, 1), "state_1": (0, 0)}) == 0.
    assert pot.get_by_names({"state_0": (1, 1), "state_1": (0, 1)}) == 0.5

create_transition_potential_test()
print('Tests passed.')

### Observation variable creation
Write a function that creates a random variable for an observation at a given time step in an obstacle HMM. See docstring for description.

For reference, our solution is **2** lines of code.

In [None]:
def create_observation_variable(name):
  """Creates a RV for the HMM observation with the given name.

  Observations are a 4-tuple that list which directions have obstacles,
  in order [N E S W], with a 1 for an obstacle and 0 for no
  obstacle. The observations are perfect and error-free. For instance,
  in the following map:

    obstacle_map = [
      [0, 1],
      [0, 0],
    ]

  the observation for the top left location would always be (1, 0, 1, 1).

  The domain of the observation variable should be a list of 4-tuples.

  Args:
    name: A str name for the variable.

  Returns:
    zt: A RV as described above.
  """
  raise NotImplementedError("Implement me!")

Tests

In [None]:
  def create_observation_variable_test():
    rv = create_observation_variable("obs")
    assert rv.name == "obs"
    assert rv.dim == 2**4
    assert set(rv.domain) == {
      (0, 0, 0, 0), (1, 0, 0, 0), (0, 1, 0, 0), (1, 1, 0, 0),
      (0, 0, 1, 0), (1, 0, 1, 0), (0, 1, 1, 0), (1, 1, 1, 0),
      (0, 0, 0, 1), (1, 0, 0, 1), (0, 1, 0, 1), (1, 1, 0, 1),
      (0, 0, 1, 1), (1, 0, 1, 1), (0, 1, 1, 1), (1, 1, 1, 1),
    }

create_observation_variable_test()
print('Tests passed.')

### Observation potential creation
Write a function that creates a potential for the observation distribution between $s_{t}$ and $z_{t}$.

For reference, our solution is **17** lines of code.

In [None]:
def create_observation_potential(obstacle_map, state_rv, observation_rv):
  """Write a function to create a potential between state_rv
  and observation_rv in an HMM that corresponds to the map.

  You can assume that state_rv was created by `create_state_variable`
  and observation_rv was created by `create_observation_variable`.

  See `create_observation_variable` for a description of the
  observation model. Recall the order is [N E S W].

  Args:
    obstacle_map: A list of lists of ints;
        see example and description in `create_state_variable`.
    state_rv: An RV representing the state at time t.
    observation_rv: An RV representing the observation at time t.

  Returns:
    potential: A Potential for the distribution between st and zt.
  """
  raise NotImplementedError("Implement me!")

Tests

In [None]:
  def create_observation_potential_test():
    map1 = [[0, 0, 0],
            [1, 0, 1]]
    s0 = create_state_variable(map1, "state_0")
    z0 = create_observation_variable("obs_0")
    pot = create_observation_potential(map1, s0, z0)
    assert set(pot.rvs) == {s0, z0}
    assert pot.get_by_names({"state_0": (0, 0), "obs_0": (1, 0, 1, 1)}) == 1.
    assert pot.get_by_names({"state_0": (0, 0), "obs_0": (1, 1, 0, 1)}) == 0.
    assert pot.get_by_names({"state_0": (0, 1), "obs_0": (1, 0, 0, 0)}) == 1.
    assert pot.get_by_names({"state_0": (0, 2), "obs_0": (1, 1, 1, 0)}) == 1.
    assert pot.get_by_names({"state_0": (1, 1), "obs_0": (0, 1, 1, 1)}) == 1.

create_observation_potential_test()
print('Tests passed.')

### Viterbi Decoding
Complete the implementation of Viterbi decoding.

For reference, our solution is **37** lines of code.

In [None]:
def run_viterbi(hmm, observations):
  """Run the Viterbi algorithm to compute the most likely state
  assignments given observations.

  Normalize the messages after each step to avoid numerical instability.

  Args:
    hmm: An HMM.
    observations: A list of observation values.

  Returns:
    most_likely_states: A list of state values.
  """
  raise NotImplementedError("Implement me!")

Tests

In [None]:
  def viterbi_test1():
    map1 = [[0, 0, 0],
            [1, 0, 1]]
    observations1 = [(1, 0, 1, 1), (1, 0, 0, 0), (1, 0, 0, 0)]
    hmm1 = create_hmm(map1, len(observations1))
    most_likely_states1 = run_viterbi(hmm1, observations1)
    assert most_likely_states1 == [(0, 0), (0, 1), (0, 1)]

viterbi_test1()
  def viterbi_test2():
    map2 = [[0, 0, 0, 0, 0]]
    observations2 = [(1, 0, 1, 0), (1, 0, 1, 1), (1, 0, 1, 0),
                     (1, 0, 1, 0), (1, 0, 1, 0), (1, 1, 1, 0)]
    hmm2 = create_hmm(map2, len(observations2))
    most_likely_states2 = run_viterbi(hmm2, observations2)
    assert most_likely_states2 == [(0, 1), (0, 0), (0, 1), (0, 2), (0, 3), (0, 4)]

viterbi_test2()
print('Tests passed.')