<a href="https://colab.research.google.com/github/TangJiahui/6.036_Machine_Learning/blob/main/MIT_6_036_HW09_State_Machine_and_Markov_Decision_Process.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#MIT 6.036 Fall 2020: Homework 9#

This colab notebook provides code and a framework for questions 1 and 5 of the [homework](https://lms.mitx.mit.edu/courses/course-v1:MITx+6.036+2020_Fall/courseware/Week9/week9_homework).  You can work out your solutions here, then submit your results back on the homework page when ready.


## Setup

First, download the code distribution for this homework that contains test cases and helper functions.

Run the next code block to download and import the code for this lab.

In [1]:
!rm -rf code_for_hw9*
!rm -rf *.py
!rm -rf __*
!wget --no-check-certificate --quiet https://introml.odl.mit.edu/cat-soop/_static/6.036/homework/hw09/code_for_hw9.zip
!unzip code_for_hw9.zip
!mv code_for_hw9/* .

from dist import *
from sm import *
from util import *
from mdp import *

import mdp
import numpy as np

Archive:  code_for_hw9.zip
   creating: code_for_hw9/
  inflating: code_for_hw9/dist.py    
  inflating: code_for_hw9/mdp.py     
  inflating: code_for_hw9/sm.py      
  inflating: code_for_hw9/tests.py   
  inflating: code_for_hw9/util.py    


## 1) State Machines

We will implement state machines as sub-classes of the `SM` class, which specifies the `start_state`, `transition_fn` and `output_fn`.

```
class SM:
    start_state = None  # default start state
    def transition_fn(self, s, i):
        '''s:       the current state
           i:       the given input
           returns: the next state'''
        raise NotImplementedError
    def output_fn(self, s):
        '''s:       the current state
           returns: the corresponding output'''
        raise NotImplementedError
```

An example of a sub-class is the `Accumulator` state machine, which adds up (accumulates) its input and outputs the sum. Convince yourself that the implementation works as expected before moving on.

```
class Accumulator(SM):
    start_state = 0
    def transition_fn(self, s, i):
        return s + i
    def output_fn(self, s):
        return s
```

### 1.1 Transduce
Implement the `transduce` method for the `SM` class. It is given an input sequence (a list) and returns an output sequence (a list) of the outputs of the state machine on the input sequence. Assume `self.transition_fn` and `self.output_fn` are defined.

In [29]:
class SM:
    start_state = None
    def transduce(self, input_seq):
        '''input_seq: a list of inputs to feed into SM
           returns:   a list of outputs of SM'''
        state = []
        prev_state = self.start_state
        for i in input_seq:
            new_state = self.transition_fn(prev_state, i)
            state.append(new_state)
            prev_state = new_state
        return self.output_fn(state)

Below is the `Accumulator` state machine implementation that you saw above as well as an unit test to help test your `SM` class.

In [31]:
class Accumulator(SM):
    start_state = 0

    def transition_fn(self, s, i):
        return s + i

    def output_fn(self, s):
        return s
    
def test_accumulator_sm():
    res = Accumulator().transduce([-1, 2, 3, -2, 5, 6])
    assert(res == [-1, 1, 4, 2, 7, 13])
    print("Test passed!")

# Unit test
test_accumulator_sm()

Test passed!


### 1.2 Binary Addition
Implement a `Binary_Addition` state machine that takes in a sequence of pairs of binary digits (0,1) representing two reversed binary numbers and returns a sequence of digits representing the reversed sum. For instance, to sum two binary numbers `100` and `011`, the input sequence will be `[(0, 1), (0, 1), (1, 0)]`. You will need to define `start_state`, `transition_fn` and `output_fn`. Note that when transduced, the input sequence may need to be extended with an extra (0,0) to output the final carry.

In [64]:
class Binary_Addition(SM):
    # first index: sum at this position
    # second index: carryover from last position
    start_state = (0,0)

    def transition_fn(self, s, x):
        return ((s[1]+sum(x))%2, (s[1]+sum(x))//2)

    def output_fn(self, s):
        return [i[0] for i in s]

In [65]:
def test_binary_addition_sm():
    res = Binary_Addition().transduce([(1, 1), (1, 0), (0, 0)])
    print(res)
    assert(res == [0, 0, 1])
    print("Test passed!")

# Unit test
test_binary_addition_sm()

[0, 0, 1]
Test passed!


### 1.3 Reverser
Implement a state machine that reverses a sequence. The input is a list of the form:

```
 sequence1 + ['end'] + sequence2
 ```
 
`+` refers to concatenation. `sequence1` is a list of strings, the `'end'` string indicates termination, and `sequence2` is arbitrary. The machine reverses `sequence1`: for each entry in the `sequence1`, the machine outputs `None`. For the `'end'` input and each entry in the second sequence, an item from the reversed `sequence1` is output, or `None` if no characters remain.

In [108]:
class Reverser(SM):
    start_state = None
    end = False
    index = 0
    sequence1 = []

    def transition_fn(self, s, x):
        if x == "end" or self.end:
            self.end = True
            self.index -= 1
            try:
                return self.sequence1[self.index]
            except:
                return None
        elif not self.end:
            self.sequence1.append(x)
            return None

    def output_fn(self, s):
        # Your code here
        return s

In [109]:
def test_reverser_sm():
    res = Reverser().transduce(['foo', ' ', 'bar'] + ['end'] + list(range(5)))
    print(res)
    assert(res == [None, None, None, 'bar', ' ', 'foo', None, None, None])
    print("Test passed!")

# Unit test
test_reverser_sm()

[None, None, None, 'bar', ' ', 'foo', None, None, None]
Test passed!


In [110]:
class Reverser(SM):
    start_state = ([], 'input')

    def transition_fn(self, s, x):
        (symbols, mode) = s
        if mode == 'input':
            if x == 'end':
                return symbols, 'output'
            return symbols + [x], mode
        else:
            return symbols[:-1], mode

    def output_fn(self, s):
        (symbols, mode) = s
        if mode == 'output' and len(symbols) > 0:
            return symbols[-1]
        else:
            return None
        return symbols

# 2) MDP

In [114]:
A = np.matrix([[1.0,-0.09,-0.81,0],[-0.81,0.91, 0.0,0.0],[0,0,0.91,-0.81],[-0.81,0.0,0,0.91]])
b = np.matrix([[0],[1],[0],[2]])
v = np.linalg.solve(A,b)  # A v = b
v

matrix([[6.05288295],
        [6.48663207],
        [6.7519581 ],
        [7.58553317]])

## 5) MDP Implementations

We'll be using a couple of simple classes to represent MDPs and probability distributions.

###5.1 Working with MDPs

Recall that given a $Q_\pi$ for any policy $\pi$, then $V_\pi(s)$ = $\max_a Q_\pi(s, a)$.

1. Write the `value` method, which takes a $Q$ function (an instance of `TabularQ`) and a state and returns the value `V` of an action that maximizes $Q$ function stored in `q`.



In [115]:
def value(q, s):
    """ Return Q*(s,a) based on current Q

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q_star = value(q,0)
    >>> q_star
    10
    """
    q_star = None
    for a in q.actions:
        if q_star is None:
            q_star = q.q[(s,a)]
        elif q.q[(s,a)] > q_star:
            q_star = q.q[(s,a)]
    return q_star

In [116]:
def test_value():
    q = TabularQ([0,1,2,3], ['b','c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    assert(value(q, 0) == 10)
    print("Test passed!")
    
test_value()

Test passed!


2. Write the `greedy` method, which takes a $Q$ function (an instance of `TabularQ`) and a state and returns the action `a` determined by the policy that acts greedily with respect to the current value of `q`.

In [119]:
def greedy(q, s):
    """ Return pi*(s) based on a greedy strategy.

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q.set(1, 'b', 2)
    >>> greedy(q, 0)
    'c'
    >>> greedy(q, 1)
    'b'
    """
    q_star = value(q, s)
    for a in q.actions:
        if q.get(s, a) == q_star:
            return a

In [120]:
def test_greedy():
    q = TabularQ([0, 1, 2, 3],['b', 'c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    q.set(1, 'b', 2)
    assert(greedy(q, 0) == 'c')
    assert(greedy(q, 1) == 'b')
    print("Test passed!")

test_greedy()

Test passed!


3. Write the `epsilon_greedy` method, which takes a state `s` and a parameter `epsilon`, and returns an action. With probability `1 - epsilon` it should select the greedy action and with probability `epsilon` it should select an action uniformly from the set of possible actions.

    - You should use `random.random()` to generate a random number to test againts eps.
    - You should use the `draw` method of `uniform_dist` to generate a random action.
    - You can use the `greedy` function defined earlier.

In [125]:
def epsilon_greedy(q, s, eps = 0.5):
    """ Returns an action.

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q.set(1, 'b', 2)
    >>> eps = 0.
    >>> epsilon_greedy(q, 0, eps) #greedy
    'c'
    >>> epsilon_greedy(q, 1, eps) #greedy
    'b'
    """
    action = greedy(q, s)
    if random.random() < eps:  # True with prob eps, random action
        return uniform_dist(q.actions).draw()
    else:
        return action

In [126]:
def test_epsilon_greedy():
    q = TabularQ([0, 1, 2, 3],['b', 'c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    q.set(1, 'b', 2)
    eps = 0.0
    assert(epsilon_greedy(q, 0, eps) == 'c')
    assert(epsilon_greedy(q, 1, eps) == 'b')
    print("Test passed!")
    
test_epsilon_greedy()

Test passed!


### 5.2 Implement Q-Value Iteration
Provide the definition of the `value_iteration` function. It takes an MDP instance and a `TabularQ` instance. It should terminate when

$$\max_{(s, a)}\left|Q_t(s, a) - Q_{t-1}(s, a)\right| < \epsilon$$

that is, the biggest difference between the value functions on successive iterations is less than input parameter `eps`. This function should return the final `TabularQ` instance. It should do no more that `max_iters` iterations.

* Make sure to copy the Q function between iterations, e.g. `new_q = q.copy()`.
* The `q` parameter contains the initialization of the Q function.
* The `value` function is already defined.
* Use `mdp` class definitions to get 
  * the reward function: `reward_fn`,
  * the discount factor: `discount_factor`,
  * transition model: `transition_model`,
  * expectation of the Q-values over a distribution: `transition_model(s,a).expectation`.

All attributes of `mdp` can be accessed through `mdp.<attribute>`, e.g. `mdp.reward_fn`.

In [134]:
def value_iteration(mdp, q, eps=0.01, max_iters=1000):
    for i in range(max_iters):
        max_value = 0
        max_a, max_s = None,None
        new_q = q.copy()
        for pairs in q.q:
            s,a = pairs[0], pairs[1]
            v = mdp.reward_fn(s,a)
            v += mdp.discount_factor * mdp.transition_model(s,a).expectation(lambda s: value(q,s))
            new_q.set(s,a,v)
            if v>max_value:
                max_value = v
                max_a,max_s = a,s
        if new_q.q[(max_s,max_a)] - q.q[(max_s,max_a)]<eps:
            return new_q
        q = new_q.copy()

In [136]:
# alternatively
def value_iteration(mdp, q, eps = 0.01, max_iters = 1000):
    def v(s):
        return value(q,s)
    for it in range(max_iters):
        new_q = q.copy()
        delta = 0
        for s in mdp.states:
            for a in mdp.actions:
                new_q.set(s, a, mdp.reward_fn(s, a) + mdp.discount_factor * \
                          mdp.transition_model(s, a).expectation(v))
                delta = max(delta, abs(new_q.get(s, a) - q.get(s, a)))
        if delta < eps:
            return new_q
        q = new_q
    return q

Below is the implementation of the "tiny" MDP detailed in Problem 2 and Problem 5.3. We will be using it to test `value_iteration`.

In [138]:
def tiny_reward(s, a):
    # Reward function
    if s == 1: return 1
    elif s == 3: return 2
    else: return 0

def tiny_transition(s, a):
    # Transition function
    if s == 0:
        if a == 'b':
            return DDist({1 : 0.9, 2 : 0.1})
        else:
            return DDist({1 : 0.1, 2 : 0.9})
    elif s == 1:
        return DDist({1 : 0.1, 0 : 0.9})
    elif s == 2:
        return DDist({2 : 0.1, 3 : 0.9})
    elif s == 3:
        return DDist({3 : 0.1, 0 : 0.9})
    
def test_value_iteration():
    tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 0.9)
    q = TabularQ(tiny.states, tiny.actions)
    qvi = value_iteration(tiny, q, eps=0.1, max_iters=100)
    expected = dict([((2, 'b'), 5.962924188028282),
                     ((1, 'c'), 5.6957634856549095),
                     ((1, 'b'), 5.6957634856549095),
                     ((0, 'b'), 5.072814297918393),
                     ((0, 'c'), 5.262109602844769),
                     ((3, 'b'), 6.794664584556008),
                     ((3, 'c'), 6.794664584556008),
                     ((2, 'c'), 5.962924188028282)])
    for k in qvi.q:
        print("k=%s, expected=%s, got=%s" % (k, expected[k], qvi.q[k]))      
        assert(abs(qvi.q[k] - expected[k]) < 1.0e-5)
    print("Test passed!")

test_value_iteration()

k=(0, 'b'), expected=5.072814297918393, got=5.072814297918393
k=(0, 'c'), expected=5.262109602844769, got=5.262109602844769
k=(1, 'b'), expected=5.6957634856549095, got=5.6957634856549095
k=(1, 'c'), expected=5.6957634856549095, got=5.6957634856549095
k=(2, 'b'), expected=5.962924188028282, got=5.962924188028282
k=(2, 'c'), expected=5.962924188028282, got=5.962924188028282
k=(3, 'b'), expected=6.794664584556008, got=6.794664584556008
k=(3, 'c'), expected=6.794664584556008, got=6.794664584556008
Test passed!


### 5.3 Receding-horizon control and online search
Write a procedure `q_em(mdp, s, a, h)` that computes the horizon-h Q value for state `s` and action `a` by using the definition of the finite-horizon Q function in the notes (but including a discount factor). 

This can be written as a relatively simple recursive procedure with a base case (what is the Q value when horizon is 0?) and a recursive case that computes the horizon `h` values assuming we can (recursively) get horizon `h-1` values.

In [143]:
def q_em(mdp, s, a, h):
    if h==0:
        return 0
    else:
        return mdp.reward_fn(s,a) + mdp.discount_factor*mdp.transition_model(s,a).expectation(lambda s:q_em(mdp, s, a, h-1))

We will be using the "tiny" MDP again to test `q_em`.

In [144]:
def test_q_em():
    tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 0.9)
    assert(np.allclose([q_em(tiny, 0, 'b', 1)], [0.0]))
    assert(np.allclose([q_em(tiny, 0, 'b', 2)], [0.81]))
    assert(np.allclose([q_em(tiny, 0, 'b', 3)], [1.0287000000000002]))
    assert(np.allclose([q_em(tiny, 0, 'c', 3)], [1.4103]))
    assert(np.allclose([q_em(tiny, 2, 'b', 3)], [1.9116000000000002]))
    print("Tests passed!")

test_q_em()

Tests passed!
