#MIT 6.036 Fall 2021: Homework 11#

This colab notebook provides code and a framework for question 1 and 5 of hw11.  You can work out your solutions here, then submit your results back on the homework page when ready.


## Setup

First, download the code distribution for this homework that contains test cases and helper functions.

Run the next code block to download and import the code for this lab.

In [None]:
!rm -rf code_for_hw11*
!wget --no-check-certificate --quiet https://go.odl.mit.edu/subject/6.036/_static/catsoop/homework/hw11/code_for_hw11.zip
!unzip code_for_hw11.zip
!mv code_for_hw11/* .

from dist import *
from sm import *
from util import *

import numpy as np

Archive:  code_for_hw11.zip
   creating: code_for_hw11/
  inflating: code_for_hw11/util.py   
  inflating: code_for_hw11/sm.py     
  inflating: code_for_hw11/tests.py  
  inflating: code_for_hw11/dist.py   


## 1) State Machines

We will implement state machines as sub-classes of the `SM` class, which specifies the `start_state`, `transition_fn` and `output_fn`.

```
class SM:
    start_state = None  # default start state
    def transition_fn(self, s, x):
        '''s:       the current state
           i:       the given input
           returns: the next state'''
        raise NotImplementedError
    def output_fn(self, s):
        '''s:       the current state
           returns: the corresponding output'''
        raise NotImplementedError
```

An example of a sub-class is the `Accumulator` state machine, which adds up (accumulates) its input and outputs the sum. Convince yourself that the implementation works as expected before moving on.

```
class Accumulator(SM):
    start_state = 0
    def transition_fn(self, s, x):
        return s + x
    def output_fn(self, s):
        return s
```

### 1.1 Transduce
A transducer consumes the input andupdates the state. It then uses the
updated state to produce the output.
Implement the `transduce` method for the `SM` class.  It is given an
input sequence (a list) and returns an output sequence (a list) of the
outputs of the state machine on the input sequence. The input and output sequences are of the same length.  Assume `self.transition_fn`, `self.output_fn` and `self.start_state` are defined.

In [None]:
class SM:
    start_state = None

    def transduce(self, input_seq):
        '''input_seq: a list of inputs to feed into SM
           returns:   a list of outputs of SM'''
        state = self.start_state
        output = []
        for inp in input_seq:
            state = self.transition_fn(state, inp)
            output.append(self.output_fn(state))
        return output


Below is the `Accumulator` state machine implementation that you saw above as well as an unit test to help test your `SM` class.

In [None]:
class Accumulator(SM):
    start_state = 0

    def transition_fn(self, s, x):
        return s + x

    def output_fn(self, s):
        return s

def test_accumulator_sm():
    res = Accumulator().transduce([-1, 2, 3, -2, 5, 6])
    assert(res == [-1, 1, 4, 2, 7, 13])
    print("Test passed!")

# Unit test
test_accumulator_sm()

Test passed!


### 1.2 Binary Addition
Implement a `Binary_Addition` state machine that takes in a sequence of pairs of binary digits (0,1) representing two reversed binary numbers and returns a sequence of digits representing the reversed sum. For instance, to sum two binary numbers `100` and `011`, the input sequence will be `[(0, 1), (0, 1), (1, 0)]`. You will need to define `start_state`, `transition_fn` and `output_fn`. Note that when transduced, the input sequence may need to be extended with an extra (0,0) to output the final carry.

In [None]:
class Binary_Addition(SM):
    start_state = (0,0)

    def transition_fn(self, s, x):
        addition = x[0] + x[1] + s[1]
        carry = 1 if addition > 1 else 0
        digit = addition if addition == 1 else addition % 2
        return (digit, carry)

    def output_fn(self, s):
        return s[0]

In [None]:
def test_binary_addition_sm():
    res = Binary_Addition().transduce([(1, 1), (1, 0), (0, 0)])
    assert(res == [0, 0, 1])
    print("Test passed!")

# Unit test
test_binary_addition_sm()

Test passed!


### 1.3 Reverser
Implement a state machine that reverses a sequence. The input is a list of the form:

```
 sequence1 + ['end'] + sequence2
 ```

`+` refers to concatenation. `sequence1` is a list of strings, the `'end'` string indicates termination, and `sequence2` is arbitrary. The machine reverses `sequence1`: for each entry in the `sequence1`, the machine outputs `None`. For the `'end'` input and each entry in the second sequence, an item from the reversed `sequence1` is output, or `None` if no characters remain.

In [None]:
class Reverser(SM):
    sequence_1 = 1
    sequence_2 = 0
    start_state = ([], sequence_1)

    def transition_fn(self, s, x):
        direction = s[1]

        if direction == self.sequence_1:
            output = s[0] + [x]
            if x == 'end':
                # switch directions
                direction = self.sequence_2
                return (s[0], direction)
            return (output, direction)

        # sequence 2
        if direction == self.sequence_2:
            output = s[0][:-1]
            return (output, direction)

    def output_fn(self, s):
        symbols = s[0]
        seq_direction = s[1]

        if seq_direction == self.sequence_1:
            return None
        # 2nd sequence
        return symbols[-1] if symbols else None


In [None]:
def test_reverser_sm():
    res = Reverser().transduce(['foo', ' ', 'bar'] + ['end'] + list(range(5)))
    assert(res == [None, None, None, 'bar', ' ', 'foo', None, None, None])
    print("Test passed!")

# Unit test
test_reverser_sm()

Test passed!


### 1.4 (Recurrent) Neural Networks as State Machines
A neural network is a (learned) function that maps input vectors to output vectors. A recurrent neural network (RNN) is a (learned) state machine that maps input sequences to output sequences.

In particular, an RNN has a transition function and an output function, each of which is defined in terms of weight matrices, offset vectors and activation functions, analogously to standard neural networks.

* The inputs $x$ are $l\times1$ vectors
* The states $s$ are $m\times1$ vectors
* The outputs $y$ are $n\times1$ vectors

The behavior is defined as follows:
$$\begin{align*} s_{t} & = f_1(W^{ss} s_{{t-1}} + W^{sx} x_{t} + W^{ss}_0) \\ y_{t} & = f_2(W^o s_{t} + W^o_0) \end{align*}$$

where $f_1$ and $f_2$ are two activation functions, such as linear, softmax or tanh.


Note that each input `i` below has dimension `l x 1`. Implement the corresponding state machine, where the weights are given in `__init__`. Make sure to set an appropriate `start_state`.

In [None]:
class RNN(SM):
    def __init__(self, Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2, start_state):
        self.Wsx = Wsx
        self.Wss = Wss
        self.Wo = Wo
        self.Wss_0 = Wss_0
        self.Wo_0 = Wo_0
        self.start_state = start_state
        self.f1 = f1
        self.f2 = f2

    def transition_fn(self, s, x):
        return self.f1(self.Wss@s + self.Wsx@x + self.Wss_0)
    def output_fn(self, s):
        return self.f2(self.Wo@s + self.Wo_0)


In [None]:
def softmax(z):
    v = np.exp(z)
    return v / np.sum(v, axis = 0)

def test_rnn():
    Wsx1 = np.array([[0.1],
                     [0.3],
                     [0.5]])
    Wss1 = np.array([[0.1,0.2,0.3],
                     [0.4,0.5,0.6],
                     [0.7,0.8,0.9]])
    Wo1 = np.array([[0.1,0.2,0.3],
                    [0.4,0.5,0.6]])
    Wss1_0 = np.array([[0.01],
                       [0.02],
                       [0.03]])
    Wo1_0 = np.array([[0.1],
                      [0.2]])
    in1 = [np.array([[0.1]]),
           np.array([[0.3]]),
           np.array([[0.5]])]
    start_state = np.array([[0,0,0]]).T

    rnn = RNN(Wsx1, Wss1, Wo1, Wss1_0, Wo1_0, np.tanh, softmax, start_state)
    expected = np.array([[[0.4638293846951024], [0.5361706153048975]],
                        [[0.4333239107898491], [0.566676089210151]],
                        [[0.3821688606165438], [0.6178311393834561]]])

    assert(np.allclose(expected, rnn.transduce(in1)))
    print("Test passed!")

# Unit test
test_rnn()

Test passed!


### 1.5 Accumulator Sign (RNN)
Enter the parameter matrices and vectors for an instance of the `RNN` class such that the output is `1` if the cumulative sum of the inputs is positive, `-1` if the cumulative sum is negative and `0` if otherwise. Make sure that you scale the outputs so that the output activation values are very close to `1`, `0` and `-1`. Note that both the inputs and outputs are `1 x 1`.

Hint: `np.tanh` may be useful. Remember to convert your Python lists to `np.array`.

In [None]:
Wsx =             np.ones((1, 1))
Wss =             np.ones((1, 1))
Wo =              np.ones((1, 1))
Wss_0 =           np.zeros((1, 1))
Wo_0 =            np.zeros((1, 1))
f1 =              lambda x : x
f2 =              np.sign
start_state =     np.full((1, 1), 0)
acc_sign = RNN(Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2, start_state)


In [None]:
def test_acc_sign_rnn(acc_sign_rnn):
    res = acc_sign_rnn.transduce([-1, -2, 2, 3, -3, 1])
    expected = np.array([[[-1.0]], [[-1.0]], [[-1.0]], [[1.0]], [[-1.0]], [[0.0]]])
    assert(np.allclose(expected, res))
    print("Test passed!")

# Unit test
test_acc_sign_rnn(acc_sign)

ValueError: ignored

### 1.6 Autoregression (RNN)

Enter the parameter matrices and vectors for an instance of the `RNN` class such that it implements the following autoregressive model:
$$y_t=y_{t-1} - 2y_{t-2} + 3y_{t-3}$$
Set `start_state` such that $y_1=-2$, $y_t=0$ for $t\lt1$. Note that all inputs will be zero.


In [None]:
Wsx =             0
Wss =             np.array([[1, -2, 3], [1, 0, 0], [0, 1, 0]])
Wo =              np.array([[1, 0, 0]])
Wss_0 =           0
Wo_0 =            0
f1 =              lambda x: x
f2 =              lambda x: x
start_state =     np.array([[-2], [0], [0]])
auto = RNN(Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2, start_state)


In [None]:
def test_auto_rnn(auto_rnn):
    res = auto_rnn.transduce([np.array([[0]]) for x in range(5)])
    expected = np.array([[[-2.0]], [[2.0]], [[0.0]], [[-10.0]], [[-4.0]]])
    assert(np.allclose(expected, res))
    print("Test passed!")

# Unit test
test_auto_rnn(auto)

ValueError: ignored

In [None]:
A = np.identity(4) - 0.9*np.matrix([[0.0, 0.1, 0.9, 0.0], [0.9, 0.1, 0.0, 0.0], [0.0, 0.0, 0.1, 0.9], [0.9, 0.0, 0.0, 0.1]])
b = np.matrix([[0], [1], [0], [2]])
v = np.linalg.solve(A,b)  # A v = b

print(v.tolist())

[[6.0528829454671795], [6.486632072338918], [6.751958097477378], [7.585533171240018]]
