In [26]:
# All imports and variables you will need

import numpy as np
OBSERVATIONS = [i for i in range(10)]  # all possible vertical coordinates
STATES = [(i, j) for i in range(10) for j in range(10)]  # all 100 states
ACTIONS = ["Left", "Right", "Up", "Down"]  # the four possible actions

def generate_random_float():
  """ Returns a random float between 0 and 1 
  """
  return np.random.rand()

## Calculating Probabilities

### 1. `action_prior(action)` [2 points]

This function will take in one of the actions in `[Left, Right, Up, Down]`, and return the probability of that action occurring. You will be returning a `float` between 0 and 1 which is the probability of that action. You will be using the following table as the probability mass function for that action.

$a_{i}$ | $P$$($$A$$=$$a_{i}$$)$
--- | ---
Left | 0.2
Right | 0.6
Up | 0.1
Down | 0.1

In [27]:
def action_prior(action):
  """Return prior probability distribution on an action.
    arguments:
      action (string): "Left", "Right", "Up", "Down"
    returns:
      probability of given action (float).
  """
  dist = {
    "Left": 0.2,
    "Right": 0.6,
    "Up": 0.1,
    "Down": 0.1
  }
  # assert action in dist, 'action "{}" not in dist'.format(action)
  return dist[action]

In [28]:
# Testing 1
action_prior("Right")

0.6

### 2. `state_prior(state)` [2 points]

This function will take in one of the states of the grid world and return the probability of the robot being in that state. You will be returning a `float` between 0 and 1 which is the probability of that action. A state is represented by $(i,j)$, where $i$ is the row number in the grid, between 0 and 9 and $j$ is the column number in the grid, between 0 and 9. You will be using the following grid world diagram to calculate the probability of a state. You should use a numpy array to represent the grid world. The origin of this grid world (0,0) is in the top left corner. $i$ increases as you go down in the grid, and $j$ increases as you go right in the grid.

<img src="https://i.ibb.co/TqfCLCw/grid-world.png" style="width: 400px"/>


In [29]:
def state_prior(state):
  """Return prior probability distribution on a state.
  arguments:
      state (tuple): (i: int, j: int)
  returns:
      probability of given state (float).
  """
  # Use np.array to create a 2D array that represents the prior on the state
  # (i,j) where i is the row index and j is the column index of the agent's
  # position.
  states_dist = np.array([
    [.0, .0, .0, .0, .1, .0, .0, .0, .0, .0,],
    [.0, .1, .0, .0, .0, .0, .0, .0, .0, .0,],
    [.0, .0, .0, .0, .0, .0, .0, .0, .0, .0,],
    [.0, .0, .0, .1, .0, .0, .0, .0, .0, .0,],
    [.1, .0, .0, .0, .0, .1, .0, .0, .1, .0,],
    [.0, .0, .0, .0, .0, .0, .0, .0, .0, .0,],
    [.0, .0, .0, .0, .0, .0, .0, .0, .0, .0,],
    [.0, .0, .1, .0, .0, .0, .1, .0, .0, .0,],
    [.0, .0, .0, .0, .0, .0, .0, .1, .0, .0,],
    [.0, .0, .0, .0, .0, .0, .0, .0, .0, .1,]
  ])
  # assert type(state) is tuple and len(state) == 2 and type(state[0]) is int and type(state[1]) is int, 'invalid argument state'
  # assert state[0] >= 0 and state[1] >= 0 and state[0] < states_dist.shape[0] and state[1] < states_dist.shape[1], 'state out of bounds'
  return float(states_dist[[state[0]], [state[1]]])

In [30]:
# Test
state_prior([9, 9])

0.1

### 3. `sensor_model(k, S)` [3 points]

This function will take in a value of the sensor $k$, and the state $(i,j)$ that the robot is at. You will need to calculate $P(O=k|S)$. The sensor reports the row that the robot is at in the grid world. You can use the following equations to implement this function.

![](https://i.ibb.co/qxF09cW/sensor-model.png)


In [31]:
# Define the sensor model conditional distribution, P(O|S)
def sensor_model(k, S):
  """Give value of Conditional Probability Table (CPT) for sensor model P(O|S).
  arguments:
      k: the value of the sensor, corresponding to a row in the grid world (i).
      S (int,int): a tuple (i,j), representing the position in the grid.
  returns:
      P(k|i,j) (float).
  notes:
      Sensor reports noisy observation of vertical coordinate i.
  """
  if k == S[0]:
    return 0.91
  else:
    return 0.01

In [32]:
# Test
sensor_model(8, (9, 1))

0.01

### 4.`transition_model(T, S, A)` [5 points]

This function will take in the next state the robot is in, $T = (i,j)$, the current state the robot is in, $S = (i,j)$, and $A$, an action from `[Left, Right, Up, Down]`. You will be calculating $P(T|S, A)$. You will be using the following statements to calculate the probability.


$T$ is the correct state when you take action $A$ from $S$; $P(T|S,A) = 0.85$ \\
$T$ is a possible state when you take an action other than $A$ from $S$; $P(T|S,A) = 0.05$ \\
Otherwise; $P(T|S,A) = 0$

This model accounts for the robot possibly taking the wrong action.

**NOTE:** If the robot is at the edge of the grid world, and tries to perform an action that will take it out of the grid world, it will stay in the same state (location). 


In [33]:
# Define the state transition model P(T|S,A)
def transition_model(T, S, A):
  """Give value of CPT for transition model P(T|S,A).
  arguments:
      T (int,int): a tuple (i,j), representing the next position in the grid
      S (int,int): a tuple (i,j), representing the position in the grid.
      A (str): a string representing the action to be taken
  returns:
      P(T|S,A) (float).
  """
  def next_state(S, A):
    transition_offset = {
      'Left': (0, -1),
      'Right': (0, 1),
      'Up': (-1, 0),
      'Down': (1, 0)
    }
    temp = (S[0] + transition_offset[A][0], S[1] + transition_offset[A][1])
    if temp in STATES:
      return temp
    else:
      return S
   
  next_states_prob = {}
  for action in ACTIONS:
    if action == A:
      this_prob = 0.85
    else:
      this_prob = 0.05
    this_next_state = next_state(S, action)
    if this_next_state not in next_states_prob:
      next_states_prob[this_next_state] = 0.0
    next_states_prob[this_next_state] = next_states_prob[this_next_state] + this_prob
  
  if T in next_states_prob:
    return next_states_prob[T]
  else:
    return 0.0

In [34]:
# Test
transition_model((0, 0), (0, 0), "Down")

0.1

### Extra Credit: `transition_model_portal(T,S,A)` [5 points]

You will be implementing the same transition model as above; however, there are portals in the grid world now. The portals are as such:

1. Going Right from (3,4) takes you to (6,3)
2. Going Up from (0,1) takes you to (5,5)
3. Going Left from (3,8) takes you to (0,0)

Once again, you will be using the following statements to calculate the probability:

$T$ is the correct state when you take action $A$ from $S$; $P(T|S,A) = 0.85$ \\
$T$ is a possible state when you take an action other than $A$ from $S$; $P(T|S,A) = 0.05$ \\
Otherwise; $P(T|S,A) = 0$

**NOTE**: If the robot is at the edge of the grid world, and tries to perform an action that will take it out of the grid world, it will stay in the same state (location), unless a portal dictates otherwise.

In [35]:
def transition_model_portal(T, S, A):
  """Give value of CPT for transition model P(T|S,A) with portals.
    arguments:
        T (int,int): a tuple (i,j), representing the next position in the grid
        S (int,int): a tuple (i,j), representing the position in the grid.
        A (str): a string representing the action to be taken
    returns:
        P(T|S,A) (float).
  """
  def next_state(S, A):
    transition_offset = {
      'Left': (0, -1),
      'Right': (0, 1),
      'Up': (-1, 0),
      'Down': (1, 0)
    }
    portals = {
        ((3, 4), 'Right'): (6, 3),
        ((0, 1), 'Up'): (5, 5),
        ((3, 8), 'Left'): (0, 0)
    }
    if (S, A) in portals:
      return portals[(S, A)]
    temp = (S[0] + transition_offset[A][0], S[1] + transition_offset[A][1])
    if temp in STATES:
      return temp
    else:
      return S
   
  next_states_prob = {}
  for action in ACTIONS:
    if action == A:
      this_prob = 0.85
    else:
      this_prob = 0.05
    this_next_state = next_state(S, action)
    if this_next_state not in next_states_prob:
      next_states_prob[this_next_state] = 0.0
    next_states_prob[this_next_state] = next_states_prob[this_next_state] + this_prob
  
  if T in next_states_prob:
    return next_states_prob[T]
  else:
    return 0.0

In [36]:
transition_model((6, 3), (3, 4), 'Right')

0.0

## Sampling

### 1. `calculate_cdf(pmf, ordered_outcomes)` [5 points]

This function will take in a probability mass function (pmf) and all the possible outcomes the pmf can take. For example, if we consider the cdf for actions, the variable `ordered_outcomes` will be `[Left, Right, Up, Down]`. You will be returning a function `cdf(outcome)`, which returns the cumulative probability of that outcome. For example, in the table below, that would be $F(a_i)$. **HINT**: Numpy will be useful here!

$a_{i}$ | $P$$($$A$$=$$a_{i}$$)$ | $F$$($$a_{i}$$)$
--- | --- | ---
Left | 0.2 | 0.2
Right | 0.6 | 0.8
Up | 0.1 | 0.9
Down | 0.1 | 1.0


In [37]:
# Create the function cdf
def calculate_cdf(pmf, ordered_outcomes):
  """Calculate cumulative distribution function.
  arguments:
      pmf (function): probability mass function.
      ordered_outcomes (list): all outcomes in an ordered list.
  returns:
      a function that represents the CDF
  note:
      The ordered_outcomes defines how the CDF behaves.
  """
  
  def cdf(outcome):
    pmf_result = np.array([pmf(outcome) for outcome in ordered_outcomes])
    cdf_list = np.cumsum(pmf_result)
    look_up = dict(zip(ordered_outcomes, cdf_list))
    return look_up[outcome]
    

  return cdf

In [38]:
# Test
def pmf(action):
  look_up = {
    'Left': 0.2,
    'Right': 0.6,
    'Up': 0.1,
    'Down': 0.1
  }
  return look_up[action]

cdf = calculate_cdf(pmf, ACTIONS)

In [39]:
# Test
cdf('Down')

1.0

### 2. `sample_from_pmf(pmf, ordered_outcomes)` [6 points]
 
This function will take in a pmf and all the possible outcomes the pmf can take, and the goal is to sample from this pmf, using inverse transform sampling. You will be returning either an action or state, depending on the pmf and ordered outcomes.

**HINT**: Numpy will be useful here! 

Please use `generate_random_float` to get a random float between 0 and 1.

In [40]:
# Fill in the body of the following function to sample from an arbitrary PMF.
def sample_from_pmf(pmf, ordered_outcomes):
  """Sample from a PMF
    arguments:
      pmf (function): a probability mass function
      ordered_outcomes (list): ordered list of all outcomes
    returns:
      a state or an action, depending on the pmf and outcomes
  """
  cdf = calculate_cdf(pmf, ordered_outcomes)
  rand_float = generate_random_float()
  for outcome in ordered_outcomes:
    if cdf(outcome) > rand_float:
      return outcome

In [41]:
# Test
for i in range(0, 20):
  print(sample_from_pmf(pmf, ACTIONS))

Right
Left
Left
Right
Right
Left
Right
Up
Right
Right
Right
Right
Right
Left
Right
Left
Up
Up
Right
Right


### 3. `sample_from_sensor_model(state)` [3 points]

This function is similar to `sample_from_pmf`. The only difference is that this is specific to the sensor model, and hence, it has the additional parameter `state`, referring to the $S$ in $P(O|S)$. So, in this case, your pmf will be `sensor_model`. You will be returning an observation, which is a value of the sensor. 

**HINT**: Numpy will be useful here!

Please use `generate_random_float` to get a random float between 0 and 1.

In [42]:
def sample_from_sensor_model(state):
  """Sample from sensor model
  arguments: 
      state (int, int): a pair (i,j), representing the position in the grid.
  returns:
      observation (int): the sensor reading that has been sampled
  """
  def pmf(k):
    return sensor_model(k, state)

  k_list = list(np.arange(STATES[0][0], STATES[-1][0] + 1))
  
  return sample_from_pmf(pmf, k_list)

### 4. `sample_from_transition_model(state, action)` [3 points]
This function is also similar to `sample_from_pmf`. The difference is that this is specific to the transition model, and hence, it has additional parameters `state` and `action`, referring to the $S$ and $A$ in $P(T|S,A)$. So, in this case, your pmf will be `transition_model`. You will be returning a state $T$.

**HINT**: Numpy will be useful here!

Please use `generate_random_float` to get a random float between 0 and 1.
``

In [43]:
def sample_from_transition_model(state, action):
  """Sample from transition model
  arguments:
      state (int, int): a pair (i,j), representing the position in the grid.
      action (str): the action taken
  returns:
      next_state (int, int): a pair (i,j), representing the robot's next position in the grid
  """
  def pmf(T):
    return transition_model(T, state, action)
  
  def next_state(S, A):
    transition_offset = {
      'Left': (0, -1),
      'Right': (0, 1),
      'Up': (-1, 0),
      'Down': (1, 0)
    }
    temp = (S[0] + transition_offset[A][0], S[1] + transition_offset[A][1])
    if temp in STATES:
      return temp
    else:
      return S
  
  
  return sample_from_pmf(pmf, list(set([next_state(state, some_action) for some_action in ACTIONS])))

In [44]:
sample_from_transition_model((1, 0), 'Left')

(1, 0)

## Inference

<img src="https://i.ibb.co/QmdLWvh/Screen-Shot-2020-01-21-at-7-26-56-AM.png" style="width: 300px;" />

### 1. `sample_from_dbn()` [9 points]

In this function, you will be sampling from a dynamic Bayes net. You will need to perform the sampling for the Bayes net shown above, which has three observations, three states and two actions. You will be returning all the sampled states, observations and actions in a dictionary `samples`. The sampling functions you have written earlier will be helpful in this. Make sure you go through the slides, as this will be helpful in creating your algorithm.


In [45]:
def sample_from_dbn():
  """Sample from a dynamic bayes network
  returns:
      samples (dict): A dictionary with the states, observations and actions sampled
  """
  samples = {'states': [], 'observations': [], 'actions': []}
  # Your code below

  s1 = sample_from_pmf(state_prior, STATES)
  o1 = sample_from_sensor_model(s1)
  a1 = sample_from_pmf(action_prior, ACTIONS)
  s2 = sample_from_transition_model(s1, a1)
  o2 = sample_from_sensor_model(s2)
  a2 = sample_from_pmf(action_prior, ACTIONS)
  s3 = sample_from_transition_model(s2, a2)
  o3 = sample_from_sensor_model(s3)
  samples['states'].append(s1)
  samples['states'].append(s2)
  samples['states'].append(s3)
  samples['observations'].append(o1)
  samples['observations'].append(o2)
  samples['observations'].append(o3)
  samples['actions'].append(a1)
  samples['actions'].append(a2)

  return samples

In [46]:
# Test
sample_from_dbn()

{'states': [(4, 5), (4, 6), (5, 6)],
 'observations': [4, 1, 4],
 'actions': ['Right', 'Right']}

### 2. `maximum_probable_explanation(actions, observations)` [12 points]
In this function, you will implement MPE, which infers the most probable values of the states, given the actions and observations. You will be implementing MPE for the Bayes net above. You are asked to maximize the posterior probability
$$P(S_1, S_2, S_3|O_1=o_1, A_1=a_1, O_2=o_2, A_2=a_2, O_3=o_3)$$
To do this, you will implement a highly specialized version of the max-product algorithm. You will be returning the three states $S_1, S_2, S_3$ in a list.

**Note:** there are three helper functions below. They are partly implemented and is there to aid you through the algorithm, but our way of implementation is not required as long as the main function `maximum_probable_explanation` works correctly.

In [47]:
def create_lookup_table_for_s1(o1, a1):
  """Create a lookup table for state 1
    arguments:
      o1 (int): observation 1
      a1 (str): action 1
    returns:
      g1 (dict): lookup table for s1
      v1 (dict): value table for s1
  """
  def product1(s1, s2):
    """Inner function to calculate the factor value of state 1 and 2
      arguments:
        s1 (int, int): state 1
        s2 (int, int): state 2
      returns:
        value of the factor (float)
    """
    return state_prior(s1) * sensor_model(o1, s1) * transition_model(s2, s1, a1)
    
  # Your code below for create_lookup_table_for_s1
  g1 = {}
  v1 = {}
  for s2 in STATES:
    max_s1 = None
    max_prd = None
    for s1 in STATES:
      prd = product1(s1, s2)
      if max_prd is None or prd > max_prd:
        max_s1 = s1
        max_prd = prd
    g1[s2] = max_s1
    v1[s2] = max_prd

  # Your code above
  return g1, v1

def create_lookup_table_for_s2(o2, a2, v1):
  """Create a lookup table for state 2
    arguments:
      o2 (int): observation 2
      a2 (str): action 2
      v1 (dict): value table for state 1
    returns:
      g2 (dict): lookup table for s2
      v2 (dict): value table for s2
  """
  def product2(s2, s3):
    """Inner function to calculate the factor value of state 2 and 3
      arguments:
        s2 (int, int): state 2
        s3 (int, int): state 3
      returns:
        value of the factor (float)
    """
    return v1[s2] * sensor_model(o2, s2) * transition_model(s3, s2, a2)

  g2 = {}
  v2 = {}
  # Your code below for create_lookup_table_for_s2
  for s3 in STATES:
    max_s2 = None
    max_prd = None
    for s2 in STATES:
      prd = product2(s2, s3)
      if max_prd is None or prd > max_prd:
        max_s2 = s2
        max_prd = prd
    g2[s3] = max_s2
    v2[s3] = max_prd

  # Your code above
  return g2, v2


def create_lookup_table_for_s3(o3, v2):
  """Create a lookup table for state 3
    arguments:
      o3 (int): observation 3
      v2 (dict): value table for s2
    returns:
      g3 (state): lookup table for s3
      v3 (float): value table for s3
  """
  def product3(s3):
    """Inner function to calculate the factor value of state 2 and 3
      arguments:
        s3 (int, int): state 3
      returns:
        value of the factor (float)
    """
    return v2[s3] * sensor_model(o3, s3)

  g3 = None
  v3 = None
  # Your code below for create_lookup_table_for_s3
  for s3 in STATES:
    prd = product3(s3)
    if v3 is None or prd > v3:
      g3 = s3
      v3 = prd
  
  # Your code above
  return g3, v3

def maximum_probable_explanation(actions, observations):
  """Calculate the most probable states given the actions and observations
      arguments:
        actions ([str]): Array of all actions taken
        observations ([int]): Sensor measurements
      returns:
        list of most probable states ([(int, int)])
  """
  # Known values
  o1, o2, o3 = observations
  a1, a2 = actions

  # eliminate S1 -> lookup table S1=g1(S2)
  g1, v1 = create_lookup_table_for_s1(o1, a1)

  # eliminate S2 -> lookup table S2=g2(S3)
  g2, v2 = create_lookup_table_for_s2(o2, a2, v1)

  # eliminate S3 -> lookup table S3=g3()
  g3, v3 = create_lookup_table_for_s3(o3, v2)

  # back-substitute (use the lookup tables to find the most probable states)
  # Your code below

  return[g1[g2[g3]], g2[g3], g3]


In [48]:
actions = ['Down', 'Down']
observations = [3, 4, 5]
correct_states = [(3,3), (4,3), (5,3)]
assert maximum_probable_explanation(actions, observations) == correct_states

In [49]:
[g1[g2[g3]], g2[g3], g3]

NameError: name 'g1' is not defined

### EXTRA CREDIT: `general_maximum_probable_explanation(actions, observations)` [5 points]

You can receive extra credit if your implementation of MPE is for a Bayes Net having the same structure as above, but having any number of states. So, if you're given $N$ observations and $N - 1$ actions, you will be returning $N$ states.

In [89]:
def general_maximum_probable_explanation(actions, observations):
  """Calculate the most probable states given the actions and observations
      arguments:
        actions ([str]): Array of all actions taken
        observations ([int]): Sensor measurements
      returns:
        list of most probable states.
  """
  if len(actions) == 0:
    # apply simple bayes rule here
    likelihood = {}
    max_state = None
    max_val = None
    for state in STATES:
      likelihood[state] = sensor_model(observations[0], state) * state_prior(state)
      if max_val is None or likelihood[state] > max_val:
        max_val = likelihood[state]
        max_state = state
    return [state]
    
  actions = actions.copy()
  observations = observations.copy()
  lookup_tables = [];
  start = True
  while True:
    if start:
      o1 = observations.pop(0)
      a1 = actions.pop(0)
      g1, curr_v = create_lookup_table_for_s1(o1, a1)
      lookup_tables.append(g1)
      start = False
    else:
      if len(actions) != 0:
        curr_o = observations.pop(0)
        curr_a = actions.pop(0)
        curr_g, curr_v = create_lookup_table_for_s2(curr_o, curr_a, curr_v)
        lookup_tables.append(curr_g)
      else:
        curr_o = observations.pop(0)
        curr_g, curr_v = create_lookup_table_for_s3(curr_o, curr_v)
        lookup_tables.append(curr_g)
        break
  
  states = []
  curr_state = lookup_tables.pop()
  states.append(curr_state)
  while len(lookup_tables) != 0:
    curr_table = lookup_tables.pop()
    curr_state = curr_table[curr_state]
    states.insert(0, curr_state)
    
  return states

In [None]:
general_maximum_probable_explanation(actions, observations)

In [None]:
observations

In [None]:
def test_action_prior():
    ret = []
    for test_param in ACTIONS:
        ret.append({
            'param': test_param,
            'result': action_prior(test_param)
        })
    return ret

In [None]:
def test_state_prior():
    ret = []
    for state in STATES:
        ret.append({
            'param': state,
            'result': state_prior(state)
        })
    return ret

In [None]:
def test_sensor_model():
    ret = []
    for state in STATES:
        for k in range(10):
            ret.append({
                'param': (k, state),
                'result': sensor_model(k, state)
            })
    return ret

In [None]:
def test_transition_model():
    ret = []
    for S in STATES:
        for T in STATES:
            for a in ACTIONS:
                ret.append({
                    'param': (T, S, a),
                    'result': transition_model(T, S, a)
                })
    return ret

In [None]:
def test_transition_model_portal():
    ret = []
    for S in STATES:
        for T in STATES:
            for a in ACTIONS:
                ret.append({
                    'param': (T, S, a),
                    'result': transition_model_portal(T, S, a)
                })
    return ret

In [None]:
np.random.seed(66)
_N = 100000
_pseudorandom_list = np.random.rand(_N)
_rand_idx = 0

def generate_random_float():
    global _rand_idx
    global _N
    ret = _pseudorandom_list[_rand_idx % _N]
    _rand_idx = _rand_idx + 1
    return ret

def normalize(nparr):
    return nparr / np.sum(np.array(nparr))

In [None]:
def test_calculate_cdf():
    ret = []
    num_outcomes = [0, 1, 2, 5, 10, 20, 50, 100]
    for num_outcome in num_outcomes:
        ordered_outcome = [str(i) for i in range(num_outcome)]
        ordered_values = normalize([generate_random_float() for _ in range(num_outcome)])
        table = dict(zip(ordered_outcome, ordered_values))
        def pmf(outcome):
            return table[outcome]
        cdf = calculate_cdf(pmf, ordered_outcome)
        curr_test_result = {}
        curr_test_result['size'] = num_outcome
        curr_test_result['function'] = {}
        for outcome in ordered_outcome:
            curr_test_result['function'][outcome] = cdf(outcome)
        ret.append(curr_test_result)
    return ret

In [None]:
def test_sample_from_pmf():
    ret = []
    num_outcomes = [0, 1, 2, 5, 10, 20, 50, 100]
    sample_size = 100
    for num_outcome in num_outcomes:
        ordered_outcome = [str(i) for i in range(num_outcome)]
        ordered_values = normalize([generate_random_float() for _ in range(num_outcome)])
        table = dict(zip(ordered_outcome, ordered_values))
        def pmf(outcome):
            return table[outcome]
        cdf = calculate_cdf(pmf, ordered_outcome)
        curr_test_result = {}
        curr_test_result['size'] = num_outcome
        curr_test_result['samples'] = []
        for _ in range(sample_size):
            curr_test_result['samples'].append(sample_from_pmf(pmf, ordered_outcome))
        ret.append(curr_test_result)
    return ret

In [None]:
def test_sample_from_sensor_model():
    sample_size_each_state = 100
    ret = []
    for state in STATES:
        for _ in range(sample_size_each_state):
            ret.append({
                'state': state,
                'result': sample_from_sensor_model(state)
            })
    return ret

In [None]:
def test_sample_from_transition_model():
    sample_size_each_state_action = 100
    ret = []
    for state in STATES:
        for action in ACTIONS:
            for _ in range(sample_size_each_state_action):
                ret.append({
                    'state_action': (state, action),
                    'result': sample_from_transition_model(state, action)
                })
    return ret

In [None]:
def test_sample_from_dbn():
    sample_size = 100
    ret = []
    for _ in range(sample_size):
        ret.append(sample_from_dbn())
    return ret

In [53]:
def test_maximum_probable_explanation():
    sample_size = 50
    ret = []
    for _ in range(sample_size):
        sampled = sample_from_dbn()
        sampled['states'] = maximum_probable_explanation(sampled['actions'], sampled['observations'])
        ret.append(sampled)
    return ret

In [81]:
def sample_from_dbn_general(n):
  """Sample from a dynamic bayes network
  returns:
      samples (dict): A dictionary with the states, observations and actions sampled
  """
  samples = {'states': [], 'observations': [], 'actions': []}
  # Your code below

  s1 = sample_from_pmf(state_prior, STATES)
  o1 = sample_from_sensor_model(s1)
  samples['states'].append(s1)
  samples['observations'].append(o1)
  if n == 1:
    return samples
  
  s = s1
  for _ in range(n - 1):
    a = sample_from_pmf(action_prior, ACTIONS)
    s = sample_from_transition_model(s, a)
    o = sample_from_sensor_model(s)
    samples['actions'].append(a)
    samples['states'].append(s)
    samples['observations'].append(o)
    
  print(samples)
  return samples

In [92]:
def test_general_maximum_probable_explanation():
    sample_size = 50
    ret = []
    for i in range(sample_size):
        sampled = sample_from_dbn_general(i + 1)
        print(sampled)
        sampled['states'] = general_maximum_probable_explanation(sampled['actions'], sampled['observations'])
        ret.append(sampled)
    return ret

In [96]:
ret = test_general_maximum_probable_explanation()

{'states': [(7, 6)], 'observations': [7], 'actions': []}
{'states': [(4, 5), (4, 6)], 'observations': [4, 4], 'actions': ['Right']}
{'states': [(4, 5), (4, 6)], 'observations': [4, 4], 'actions': ['Right']}
{'states': [(4, 5), (3, 5), (3, 6)], 'observations': [4, 9, 3], 'actions': ['Up', 'Right']}
{'states': [(4, 5), (3, 5), (3, 6)], 'observations': [4, 9, 3], 'actions': ['Up', 'Right']}
{'states': [(4, 8), (4, 7), (5, 7), (5, 8)], 'observations': [4, 4, 5, 5], 'actions': ['Right', 'Right', 'Right']}
{'states': [(4, 8), (4, 7), (5, 7), (5, 8)], 'observations': [4, 4, 5, 5], 'actions': ['Right', 'Right', 'Right']}
{'states': [(3, 3), (3, 4), (2, 4), (2, 3), (2, 2)], 'observations': [8, 9, 0, 2, 2], 'actions': ['Right', 'Right', 'Left', 'Right']}
{'states': [(3, 3), (3, 4), (2, 4), (2, 3), (2, 2)], 'observations': [8, 9, 0, 2, 2], 'actions': ['Right', 'Right', 'Left', 'Right']}
