<a href="https://colab.research.google.com/github/ToniRV/MIT_6.862_Applied_Machine_Learning/blob/master/MIT_6_036_HW09.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#MIT 6.036 Spring 2020: Homework 9#

This colab notebook provides code and a framework for question 1 and 5 of the [homework](https://lms.mitx.mit.edu/courses/course-v1:MITx+6.036+2020_Spring/courseware/Week9/week9_homework/).  You can work out your solutions here, then submit your results back on the homework page when ready.


## Setup

First, download the code distribution for this homework that contains test cases and helper functions.

Run the next code block to download and import the code for this lab.

In [5]:
!rm -rf code_for_hw9*
!wget --quiet https://introml.odl.mit.edu/cat-soop/_static/6.036/homework/hw09/code_for_hw9.zip
!unzip code_for_hw9.zip
!mv code_for_hw9/* .

from dist import *
from sm import *
from util import *
from mdp import *

import mdp
import numpy as np

Archive:  code_for_hw9.zip
   creating: code_for_hw9/
  inflating: code_for_hw9/util.py    
   creating: __MACOSX/
   creating: __MACOSX/code_for_hw9/
  inflating: __MACOSX/code_for_hw9/._util.py  
  inflating: code_for_hw9/sm.py      
  inflating: __MACOSX/code_for_hw9/._sm.py  
  inflating: code_for_hw9/mdp.py     
  inflating: __MACOSX/code_for_hw9/._mdp.py  
  inflating: code_for_hw9/tests.py   
  inflating: __MACOSX/code_for_hw9/._tests.py  
  inflating: code_for_hw9/dist.py    
  inflating: __MACOSX/code_for_hw9/._dist.py  
  inflating: __MACOSX/._code_for_hw9  
   creating: code_for_hw9/__pycache__/
  inflating: code_for_hw9/__pycache__/mdp.cpython-38.pyc  
  inflating: code_for_hw9/__pycache__/sm.cpython-38.pyc  
  inflating: code_for_hw9/__pycache__/dist.cpython-38.pyc  
  inflating: code_for_hw9/__pycache__/util.cpython-38.pyc  


## 1) State Machines

We will implement state machines as sub-classes of the `SM` class, which specifies the `start_state`, `transition_fn` and `output_fn`.

```
class SM:
    start_state = None  # default start state
    def transition_fn(self, s, x):
        '''s:       the current state
           i:       the given input
           returns: the next state'''
        raise NotImplementedError
    def output_fn(self, s):
        '''s:       the current state
           returns: the corresponding output'''
        raise NotImplementedError
```

An example of a sub-class is the `Accumulator` state machine, which adds up (accumulates) its input and outputs the sum. Convince yourself that the implementation works as expected before moving on.

```
class Accumulator(SM):
    start_state = 0
    def transition_fn(self, s, x):
        return s + x
    def output_fn(self, s):
        return s
```

### 1.1 Transduce
Implement the `transduce` method for the `SM` class. It is given an input sequence (a list) and returns an output sequence (a list) of the outputs of the state machine on the input sequence. Assume `self.transition_fn` and `self.output_fn` are defined.

In [0]:
class SM:
    start_state = None

    def transduce(self, input_seq):
        '''input_seq: a list of inputs to feed into SM
           returns:   a list of outputs of SM'''
        state = self.start_state
        output = []
        for inp in input_seq:
            state = self.transition_fn(state, inp)
            output.append(self.output_fn(state))
        return output 

Below is the `Accumulator` state machine implementation that you saw above as well as an unit test to help test your `SM` class.

In [7]:
class Accumulator(SM):
    start_state = 0

    def transition_fn(self, s, x):
        return s + x

    def output_fn(self, s):
        return s
    
def test_accumulator_sm():
    res = Accumulator().transduce([-1, 2, 3, -2, 5, 6])
    assert(res == [-1, 1, 4, 2, 7, 13])
    print("Test passed!")

# Unit test
test_accumulator_sm()

Test passed!


### 1.2 Binary Addition
Implement a `Binary_Addition` state machine that takes in a sequence of pairs of binary digits (0,1) representing two reversed binary numbers and returns a sequence of digits representing the reversed sum. For instance, to sum two binary numbers `100` and `011`, the input sequence will be `[(0, 1), (0, 1), (1, 0)]`. You will need to define `start_state`, `transition_fn` and `output_fn`. Note that when transduced, the input sequence may need to be extended with an extra (0,0) to output the final carry.

In [0]:
class Binary_Addition(SM):
    start_state = (0, 0)

    def transition_fn(self, s, x):
        (carry, digit) = s
        (i0, i1) = x
        total = i0 + i1 + carry
        return 1 if total > 1 else 0, total % 2

    def output_fn(self, s):
        (carry, digit) = s
        return digit

In [12]:
def test_binary_addition_sm():
    res = Binary_Addition().transduce([(1, 1), (1, 0), (0, 0)])
    print(res)
    assert(res == [0, 0, 1])
    print("Test passed!")

# Unit test
test_binary_addition_sm()

[0, 0, 1]
Test passed!


### 1.3 Reverser
Implement a state machine that reverses a sequence. The input is a list of the form:

```
 sequence1 + ['end'] + sequence2
 ```
 
`+` refers to concatenation. `sequence1` is a list of strings, the `'end'` string indicates termination, and `sequence2` is arbitrary. The machine reverses `sequence1`: for each entry in the `sequence1`, the machine outputs `None`. For the `'end'` input and each entry in the second sequence, an item from the reversed `sequence1` is output, or `None` if no characters remain.

In [0]:
class Reverser(SM):
    start_state = ([],False,[]) # Change

    def transition_fn(self, s, x):
        # Your code here
        if (x is 'end'):
          # Signal the end
          s = (s[0], True, s[2]) 
        
        if (s[1] is True):
          # We already found 'end' string
          # Output pop or None if empty
          new_s0 = [*s[0], None] if len(s[2]) == 0 else [*s[0], s[2].pop()]
          s = (new_s0, s[1], s[2])
        else:
          # We still didn't find 'end' string
          # Keep recording string
          # Output None
          print(x)
          s = ([*s[0], None], s[1], [*s[2], x])
        return s

    def output_fn(self, s):
        # Your code here
        (a, b, c) = s
        return a[-1]


In [14]:
def test_reverser_sm():
    res = Reverser().transduce(['foo', ' ', 'bar'] + ['end'] + list(range(5)))
    assert(res == [None, None, None, 'bar', ' ', 'foo', None, None, None])
    print("Test passed!")

# Unit test
test_reverser_sm()

foo
 
bar
Test passed!


### 1.4 RNN
An RNN has a transition function and an output function, each of which is defined in terms of weight matrices, offset vectors and activation functions, analogously to standard neural networks.

* The inputs $x$ are $l\times1$ vectors
* The states $s$ are $m\times1$ vectors
* The outputs $y$ are $n\times1$ vectors

The behavior is defined as follows:
$$\begin{align*} s_{t} & = f_1(W^{ss} s_{{t-1}} + W^{sx} x_{t} + W^{ss}_0) \\ y_{t} & = f_2(W^o s_{t} + W^o_0) \end{align*}$$

where $f_1$ and $f_2$ are two activation functions, such as linear, softmax or tanh.


Note that each input `i` below has dimension `l x 1`. Implement the corresponding state machine, where the weights are given in `__init__`. Make sure to set an appropriate `start_state`.

In [0]:
class RNN(SM):
    def __init__(self, Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2, start_state):
        # Your code here
        self.Wsx = Wsx
        self.Wss = Wss
        self.Wo = Wo
        self.Wss_0 = Wss_0
        self.Wo_0 = Wo_0
        self.f1 = f1
        self.f2 = f2
        self.start_state = start_state
        
    def transition_fn(self, s, x):
        # Your code here
        return self.f1(self.Wss @ s + self.Wsx @ x + self.Wss_0)
    
    def output_fn(self, s):
        # Your code here
        return self.f2(self.Wo @ s + self.Wo_0)

In [16]:
def softmax(z):
    v = np.exp(z)
    return v / np.sum(v, axis = 0)

def test_rnn():
    Wsx1 = np.array([[0.1],
                     [0.3],
                     [0.5]])
    Wss1 = np.array([[0.1,0.2,0.3],
                     [0.4,0.5,0.6],
                     [0.7,0.8,0.9]])
    Wo1 = np.array([[0.1,0.2,0.3],
                    [0.4,0.5,0.6]])
    Wss1_0 = np.array([[0.01],
                       [0.02],
                       [0.03]])
    Wo1_0 = np.array([[0.1],
                      [0.2]])
    in1 = [np.array([[0.1]]),
           np.array([[0.3]]),
           np.array([[0.5]])]
    start_state = np.array([[0,0,0]]).T
    
    rnn = RNN(Wsx1, Wss1, Wo1, Wss1_0, Wo1_0, np.tanh, softmax, start_state)
    expected = np.array([[[0.4638293846951024], [0.5361706153048975]],
                        [[0.4333239107898491], [0.566676089210151]],
                        [[0.3821688606165438], [0.6178311393834561]]])

    assert(np.allclose(expected, rnn.transduce(in1)))
    print("Test passed!")

# Unit test
test_rnn()

Test passed!


### 1.5 Accumulator Sign RNN
Enter the parameter matrices and vectors for an instance of the `RNN` class such that the output is `1` if the cumulative sum of the inputs is positive, `-1` if the cumulative sum is negative and `0` if otherwise. Make sure that you scale the outputs so that the output activation values are very close to `1`, `0` and `-1`. Note that both the inputs and outputs are `1 x 1`.

Hint: `np.tanh` may be useful. Remember to convert your Python lists to `np.array`.

In [0]:
Wsx =             # Your code here                                                                                                                                  
Wss =             # Your code here                                                                                                                                                 
Wo =              # Your code here                                                                                                                                                 
Wss_0 =           # Your code here                                                                                                                                                 
Wo_0 =            # Your code here                                                                                                                                                 
f1 =              # Your code here, e.g. lambda x : x                                                                                                                              
f2 =              # Your code here                                                                                                                                                 
start_state =     # Your code here                                                                                                                                                 
acc_sign = RNN(Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2, start_state)

In [0]:
def test_acc_sign_rnn(acc_sign_rnn):
    res = acc_sign_rnn.transduce([-1, -2, 2, 3, -3, 1])
    expected = np.array([[[-1.0]], [[-1.0]], [[-1.0]], [[1.0]], [[-1.0]], [[0.0]]])
    assert(np.allclose(expected, res))
    print("Test passed!")

# Unit test
test_acc_sign_rnn(acc_sign)

### 1.6 Autoregression RNN

Enter the parameter matrices and vectors for an instance of the `RNN` class such that it implements the following autoregressive model:
$$y_t=y_{t-1} - 2y_{t-2} + 3y_{t-3}$$
Set `start_state` such that $y_1=-2$, $y_t=0$ for $t\lt1$. Note that all inputs will be zero.


In [0]:
Wsx =             # Your code here                                                                                                                                  
Wss =             # Your code here                                                                                                                                                 
Wo =              # Your code here                                                                                                                                                 
Wss_0 =           # Your code here                                                                                                                                                 
Wo_0 =            # Your code here                                                                                                                                                 
f1 =              # Your code here, e.g. lambda x : x                                                                                                                              
f2 =              # Your code here                                                                                                                                                 
start_state =     # Your code here                                                                                                                                                 
auto = RNN(Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2, start_state)

In [0]:
def test_auto_rnn(auto_rnn):
    res = auto_rnn.transduce([np.array([[0]]) for x in range(5)])
    expected = np.array([[[-2.0]], [[2.0]], [[0.0]], [[-10.0]], [[-4.0]]])
    assert(np.allclose(expected, res))
    print("Test passed!")
    
# Unit test
test_auto_rnn(auto)

## 5) MDP Implementations

We'll be using a couple of simple classes to represent MDPs and probability distributions.

###5.1 Working with MDPs

Recall that given a $Q_\pi$ for any policy $\pi$, then $V_\pi(s)$ = $\max_a Q_\pi(s, a)$.

1. Write the `value` method, which takes a $Q$ function (an instance of `TabularQ`) and a state and returns the value `V` of an action that maximizes $Q$ function stored in `q`.



In [0]:
def value(q, s):
    """ Return Q*(s,a) based on current Q

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q_star = value(q,0)
    >>> q_star
    10
    """
    # Your code here
    pass

In [0]:
def test_value():
    q = TabularQ([0,1,2,3], ['b','c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    assert(value(q, 0) == 10)
    print("Test passed!")
    
test_value()

2. Write the `greedy` method, which takes a $Q$ function (an instance of `TabularQ`) and a state and returns the action `a` determined by the policy that acts greedily with respect to the current value of `q`.

In [0]:
def greedy(q, s):
    """ Return pi*(s) based on a greedy strategy.

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q.set(1, 'b', 2)
    >>> greedy(q, 0)
    'c'
    >>> greedy(q, 1)
    'b'
    """
    # Your code here
    pass

In [0]:
def test_greedy():
    q = TabularQ([0, 1, 2, 3],['b', 'c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    q.set(1, 'b', 2)
    assert(greedy(q, 0) == 'c')
    assert(greedy(q, 1) == 'b')
    print("Test passed!")

test_greedy()

3. Write the `epsilon_greedy` method, which takes a state `s` and a parameter `epsilon`, and returns an action. With probability `1 - epsilon` it should select the greedy action and with probability `epsilon` it should select an action uniformly from the set of possible actions.

    - You should use `random.random()` to generate a random number to test againts eps.
    - You should use the `draw` method of `uniform_dist` to generate a random action.
    - You can use the `greedy` function defined earlier.

In [0]:
def epsilon_greedy(q, s, eps = 0.5):
    """ Returns an action.

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q.set(1, 'b', 2)
    >>> eps = 0.
    >>> epsilon_greedy(q, 0, eps) #greedy
    'c'
    >>> epsilon_greedy(q, 1, eps) #greedy
    'b'
    """
    if random.random() < eps:  # True with prob eps, random action
        # Your code here
    else:
        # Your code here

In [0]:
def test_epsilon_greedy():
    q = TabularQ([0, 1, 2, 3],['b', 'c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    q.set(1, 'b', 2)
    eps = 0.0
    assert(epsilon_greedy(q, 0, eps) == 'c')
    assert(epsilon_greedy(q, 1, eps) == 'b')
    print("Test passed!")
    
test_epsilon_greedy()

### 5.2 Implement Q-Value Iteration
Provide the definition of the `value_iteration` function. It takes an MDP instance and a `TabularQ` instance. It should terminate when

$$\max_{(s, a)}\left|Q_t(s, a) - Q_{t-1}(s, a)\right| < \epsilon$$

that is, the biggest difference between the value functions on successive iterations is less than input parameter `eps`. This function should return the final `TabularQ` instance. It should do no more that `max_iters` iterations.

* Make sure to copy the Q function between iterations, e.g. `new_q = q.copy()`.
* The `q` parameter contains the initialization of the Q function.
* The `value` function is already defined.

In [0]:
def value_iteration(mdp, q, eps=0.01, max_iters=1000):
    # Your code here
    pass

Below is the implementation of the "tiny" MDP detailed in Problem 2 and Problem 5.3. We will be using it to test `value_iteration`.

In [0]:
def tiny_reward(s, a):
    # Reward function
    if s == 1: return 1
    elif s == 3: return 2
    else: return 0

def tiny_transition(s, a):
    # Transition function
    if s == 0:
        if a == 'b':
            return DDist({1 : 0.9, 2 : 0.1})
        else:
            return DDist({1 : 0.1, 2 : 0.9})
    elif s == 1:
        return DDist({1 : 0.1, 0 : 0.9})
    elif s == 2:
        return DDist({2 : 0.1, 3 : 0.9})
    elif s == 3:
        return DDist({3 : 0.1, 0 : 0.9})
    
def test_value_iteration():
    tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 0.9)
    q = TabularQ(tiny.states, tiny.actions)
    qvi = value_iteration(tiny, q, eps=0.1, max_iters=100)
    expected = dict([((2, 'b'), 5.962924188028282),
                     ((1, 'c'), 5.6957634856549095),
                     ((1, 'b'), 5.6957634856549095),
                     ((0, 'b'), 5.072814297918393),
                     ((0, 'c'), 5.262109602844769),
                     ((3, 'b'), 6.794664584556008),
                     ((3, 'c'), 6.794664584556008),
                     ((2, 'c'), 5.962924188028282)])
    for k in qvi.q:
        print("k=%s, expected=%s, got=%s" % (k, expected[k], qvi.q[k]))      
        assert(abs(qvi.q[k] - expected[k]) < 1.0e-5)
    print("Test passed!")

test_value_iteration()

### 5.3 Receding-horizon control and online search
Write a procedure `q_em(mdp, s, a, h)` that computes the horizon-h Q value for state `s` and action `a` by using the definition of the finite-horizon Q function in the notes (but including a discount factor). 

This can be written as a relatively simple recursive procedure with a base case (what is the Q value when horizon is 0?) and a recursive case that computes the horizon `h` values assuming we can (recursively) get horizon `h-1` values.

In [0]:
def q_em(mdp, s, a, h):
    # Your code here
    pass

We will be using the "tiny" MDP again to test `q_em`.

In [0]:
def test_q_em():
    tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 0.9)
    assert(np.allclose([q_em(tiny, 0, 'b', 1)], [0.0]))
    assert(np.allclose([q_em(tiny, 0, 'b', 2)], [0.81]))
    assert(np.allclose([q_em(tiny, 0, 'b', 3)], [1.0287000000000002]))
    assert(np.allclose([q_em(tiny, 0, 'c', 3)], [1.4103]))
    assert(np.allclose([q_em(tiny, 2, 'b', 3)], [1.9116000000000002]))
    print("Tests passed!")

test_q_em()