# Learning Representations by Recirculation

In [1]:
from typing import Union
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import seaborn as sns

In [2]:
% matplotlib notebook

UsageError: Line magic function `%` not found.


In [None]:
np.random.seed(0)

### Activation Functions
In the paper the only activation function described is the logistic function. They said that other smooth monotonic functions will work too. Here I implemented the logistic function, and also other smooth monotonic functions.
* Logistic: $$f(x)=\frac{1}{1+e^{-x}}$$
* Linear:$$f(x)=x$$
* Tanh: $$f(x)=\frac{e^{2x}-1}{e^{2x}+1}$$
* Rectified Linear Unit (ReLU): $$f(x)=x^+=max(0,x)$$
* Leaky Rectified Linear Unit (Leak ReLU):
$$
f(x)=
    \begin{cases}
    x  &\quad \text{if } x>0\\
    ax & \quad \text{if } x<0
    \end{cases}
$$

In [None]:
def logistic(x: np.ndarray):
    return 1 / (1 + np.exp(-x))


def linear(x: np.ndarray):
    return x


def tanh(x: np.ndarray):
    return (np.exp(2 * x) - 1) / (np.exp(2 * x) + 1)


def relu(x: np.ndarray):
    return np.maximum(x, 0)


def leaky_relu(x: np.ndarray, a: float = 0.01):
    return np.where(x > 0, x, x * a)


### Weight Initializers

In [None]:
def uniform_initializer(low, high, shape):
    return np.random.uniform(low, high, shape)

### Error Metrics

In [None]:
def squared_reconstruction_error(y1, y2):
    return 1 / 2 * np.sum((y1 - y2) ** 2)

### Util Functions

In [None]:
def add_bias(x):
    shape = list(x.shape)
    shape[-1] = 1
    bias = np.ones(shape)
    return np.hstack((x, bias))

## Recirculation Implementations

In [None]:
class Recirculation:
    ACTIVATIONS = {
        'logistic': logistic
    }

    INITIALIZERS = {
        'uniform': uniform_initializer
    }

    def __init__(self, input_len: int, hidden_units: Union[int, list], regression_rate: float, learning_rate: float,
                 activation_function: str, weight_initializer: str, low_weight_range: float = -0.5,
                 high_weight_range: float = 0.5):
        if type(hidden_units) is int:
            hidden_units = [hidden_units]

        self.activation_function = self.ACTIVATIONS[activation_function]
        self.weight_initializer = self.INITIALIZERS[weight_initializer]
        self.regression_rate = regression_rate
        self.learning_rate = learning_rate

        self.layer_shapes = [input_len] * 2
        for unit in hidden_units:
            self.layer_shapes.insert(-1, unit)

        self.layer_count = len(self.layer_shapes)

        self.weights = []
        for i in range(self.layer_count - 1):
            self.weights.append(
                self.weight_initializer(
                    low_weight_range, high_weight_range,
                    # adding one to the first shape is because of the bias
                    (self.layer_shapes[i] + 1, self.layer_shapes[i + 1])
                )
            )

    def fit(self, x: np.ndarray, epoch: int = 100, loging: str = 'low') -> dict:
        if loging not in ['low', 'high']:
            raise ValueError('Incorrect loging value. It can be either low or high.')

        history = {
            'error': [],
        }
        if loging == 'high':
            history['weights'] = [self.weights]
            history['prediction_history'] = [self.predict(x)]
            history['predicted_classes_history'] = [self.predict_class(x)]

        for _ in range(epoch):
            layer_outputs = [x]
            # first pass
            # calculate hidden layer outputs (based on equation 1, 2)
            for i in range(self.layer_count - 2):
                layer_outputs.append(
                    self.activation_function(
                        add_bias(layer_outputs[-1]) @ self.weights[i]
                    )
                )
            # calculate output with regression (based on equation 5)
            layer_outputs.append(
                self.regression_rate * layer_outputs[0] + (1 - self.regression_rate) *
                self.activation_function(
                    add_bias(layer_outputs[-1]) @ self.weights[-1]
                )
            )
            # note: the above statement can be inside the for loop below but this is more readable

            # second pass
            # calculate hidden layer outputs (based on equation 6)
            for i in range(1, self.layer_count - 1):
                layer_outputs.append(
                    self.regression_rate * layer_outputs[i] + (1 - self.regression_rate) *
                    self.activation_function(
                        add_bias(layer_outputs[-1]) @ self.weights[i - 1]
                    )
                )

            # compute reconstruction error
            error = squared_reconstruction_error(layer_outputs[0], layer_outputs[self.layer_count - 1])

            # updating weights (based on equations 3, 4)
            for i in range(self.layer_count - 1):
                delta_w = self.learning_rate * add_bias(layer_outputs[i + 1]).T @ (
                        layer_outputs[i] - layer_outputs[i + 2])
                self.weights[len(self.weights) - 1 - i] += delta_w

            # update history
            history['error'].append(error)
            if loging == 'high':
                history['weights'].append(self.weights)
                history['prediction_history'].append(self.predict(x))
                history['predicted_classes_history'].append(self.predict_class(x))

        return history

    def predict(self, x: np.ndarray) -> np.ndarray:
        layer_output = [x]
        for i in range(self.layer_count - 1):
            layer_output.append(
                self.activation_function(
                    add_bias(layer_output[-1]) @ self.weights[i]
                )
            )
        return layer_output[-1]

    def predict_class(self, x: np.ndarray) -> np.ndarray:
        pred = self.predict(x)
        classes = np.zeros_like(pred)
        classes[np.arange(len(classes)), pred.argmax(axis=1)] = 1
        return classes

    def encode(self, x):
        layer_output = [x]
        for i in range(self.layer_count - 2):
            layer_output.append(
                self.activation_function(
                    add_bias(layer_output[-1]) @ self.weights[i]
                )
            )
        return layer_output[-1]


In [None]:
x = np.array([
    [1, 0, 0, 0],
    [0, 1, 0, 0],
    [0, 0, 1, 0],
    [0, 0, 0, 1],
])

In [None]:
parameters = {
    'input_len': 4,
    'hidden_units': 2,
    'regression_rate': 0.75,
    'learning_rate': 20,
    'activation_function': 'logistic',
    'weight_initializer': 'uniform',
    'low_weight_range': -0.5,
    'high_weight_range': 0.5,
}

In [None]:
r = Recirculation(**parameters)

In [None]:
history = r.fit(x, loging='high')

In [None]:
plt.plot(np.arange(100), np.array(history['error']))

view predictions

In [None]:
r.predict_class(x)

In [None]:
r.predict(x)

In [None]:
r.encode(x)

Let's do the test for 100 times to see average number of weight updates to get an error bellow 0.05. The paper tested the average for the error bellow 0.1 but looking at the error graph above, we see that the model achieves 0.1 error in the first few epochs but because of the high regression rate, it's not stable. However, it is more stable around 0.05 error.

In [None]:
indexes = []
for _ in range(100):
    r = Recirculation(**parameters)
    history = r.fit(x)
    error_array = np.array(history['error'])
    indexes.append(np.argmax(error_array < 0.05))
print(np.average(indexes))