In [21]:
from abc import ABCMeta, abstractmethod
import numpy as np
import scipy
import scipy.stats as stats
import tensorflow as tf
import time

import os
import math

import skimage as ski
import skimage.io

from im2col_cython import col2im_cython, im2col_cython

## Layer implementations

In [18]:
zero_init = np.zeros

def variance_scaling_initializer(shape, fan_in, factor=2.0, seed=None):
    sigma = np.sqrt(factor / fan_in)
    return stats.truncnorm(-2, 2, loc=0, scale=sigma).rvs(shape)


# -- ABSTRACT CLASS DEFINITION --
class Layer(metaclass = ABCMeta):
    "Interface for layers"
    # See documentation of abstract base classes (ABC): https://docs.python.org/3/library/abc.html

    @abstractmethod
    def forward(self, inputs):
        """
        Args:
          inputs: ndarray tensor.
        Returns:
          ndarray tensor, result of the forward pass.
        """
        pass

    @abstractmethod
    def backward_inputs(self, grads):
        """
        Args:
          grads: gradient of the loss with respect to the output of the layer.
        Returns:
          Gradient of the loss with respect to the input of the layer.
        """
        pass

    def backward_params(self, grads):
        """
        Args:
          grads: gradient of the loss with respect to the output of the layer.
        Returns:
          Gradient of the loss with respect to all the parameters of the layer as a list
          [[w0, g0], ..., [wk, gk], self.name] where w are parameter weights and g their gradient.
          Note that wk and gk must have the same shape.
        """
        pass


# -- CONVOLUTION LAYER --
class Convolution(Layer):
    "N-dimensional convolution layer"

    def __init__(self, input_layer, num_filters, kernel_size, name, padding='SAME',
               weights_initializer_fn=variance_scaling_initializer,
               bias_initializer_fn=zero_init):
        self.input_shape = input_layer.shape
        N, C, H, W = input_layer.shape
        self.C = C
        self.N = N
        self.num_filters = num_filters
        self.kernel_size = kernel_size

        assert kernel_size % 2 == 1

        self.padding = padding

        if padding == 'SAME':
          # with zero padding
          self.shape = (N, num_filters, H, W)
          self.pad = (kernel_size - 1) // 2
        else:
          # without padding
          self.shape = (N, num_filters, H - kernel_size + 1, W - kernel_size + 1)
          self.pad = 0

        fan_in = C * kernel_size**2
        self.weights = weights_initializer_fn([num_filters, kernel_size**2 * C], fan_in)
        self.bias = bias_initializer_fn([num_filters])
        # this implementation doesn't support strided convolutions
        self.stride = 1
        self.name = name
        self.has_params = True

    def forward(self, x):
        k = self.kernel_size
        self.x_cols = im2col_cython(x, k, k, self.pad, self.stride)
        res = self.weights.dot(self.x_cols) + self.bias.reshape(-1, 1)
        N, C, H, W = x.shape
        out = res.reshape(self.num_filters, self.shape[2], self.shape[3], N)
        return out.transpose(3, 0, 1, 2)

    def backward_inputs(self, grad_out):
        # nice trick from CS231n, backward pass can be done with just matrix mul and col2im
        grad_out = grad_out.transpose(1, 2, 3, 0).reshape(self.num_filters, -1)
        grad_x_cols = self.weights.T.dot(grad_out)
        N, C, H, W = self.input_shape
        k = self.kernel_size
        grad_x = col2im_cython(grad_x_cols, N, C, H, W, k, k, self.pad, self.stride)
        return grad_x

    def backward_params(self, grad_out):
        grad_bias = np.sum(grad_out, axis=(0, 2, 3))
        grad_out = grad_out.transpose(1, 2, 3, 0).reshape(self.num_filters, -1)
        grad_weights = grad_out.dot(self.x_cols.T).reshape(self.weights.shape)
        return [[self.weights, grad_weights], [self.bias, grad_bias], self.name]


class MaxPooling(Layer):
    def __init__(self, input_layer, name, pool_size=2, stride=2):
        self.name = name
        self.input_shape = input_layer.shape
        N, C, H, W = self.input_shape
        self.stride = stride
        self.shape = (N, C, H // stride, W // stride)
        self.pool_size = pool_size
        assert pool_size == stride, 'Invalid pooling params'
        assert H % pool_size == 0
        assert W % pool_size == 0
        self.has_params = False

    def forward(self, x):
        N, C, H, W = x.shape
        self.input_shape = x.shape
        # with this clever reshaping we can implement pooling where pool_size == stride
        self.x = x.reshape(N, C, H // self.pool_size, self.pool_size,
                           W // self.pool_size, self.pool_size)
        self.out = self.x.max(axis=3).max(axis=4)
        # if you are returning class member be sure to return a copy
        return self.out.copy()

    def backward_inputs(self, grad_out):
        grad_x = np.zeros_like(self.x)
        out_newaxis = self.out[:, :, :, np.newaxis, :, np.newaxis]
        mask = (self.x == out_newaxis)
        dout_newaxis = grad_out[:, :, :, np.newaxis, :, np.newaxis]
        dout_broadcast, _ = np.broadcast_arrays(dout_newaxis, grad_x)
        # this is almost the same as the real backward pass
        grad_x[mask] = dout_broadcast[mask]
        # in the very rare case that more then one input have the same max value
        # we can aprox the real gradient routing by evenly distributing across multiple inputs
        # but in almost all cases this sum will be 1
        grad_x /= np.sum(mask, axis=(3, 5), keepdims=True)
        grad_x = grad_x.reshape(self.input_shape)
        return grad_x


class Flatten(Layer):
    def __init__(self, input_layer, name):
        self.input_shape = input_layer.shape
        self.N = self.input_shape[0]
        self.num_outputs = 1
        for i in range(1, len(self.input_shape)):
            self.num_outputs *= self.input_shape[i]
        self.shape = (self.N, self.num_outputs)
        self.has_params = False
        self.name = name

    def forward(self, inputs):
        self.input_shape = inputs.shape
        inputs_flat = inputs.reshape(self.input_shape[0], -1)
        self.shape = inputs_flat.shape
        return inputs_flat

    def backward_inputs(self, grads):
        return grads.reshape(self.input_shape)


class FC(Layer):
    def __init__(self, input_layer, num_outputs, name,
               weights_initializer_fn=variance_scaling_initializer,
               bias_initializer_fn=zero_init):
        """
        Args:
          input_layer: layer below
          num_outputs: number of neurons in this layer
          weights_initializer_fn: initializer function for weights,
          bias_initializer_fn: initializer function for biases
        """

        self.input_shape = input_layer.shape
        self.N = self.input_shape[0]
        self.shape = (self.N, num_outputs)
        self.num_outputs = num_outputs

        self.num_inputs = 1
        for i in range(1, len(self.input_shape)):
            self.num_inputs *= self.input_shape[i]

        self.weights = weights_initializer_fn([num_outputs, self.num_inputs], fan_in=self.num_inputs)
        self.bias = bias_initializer_fn([num_outputs])
        self.name = name
        self.has_params = True

    def forward(self, inputs):
        """
        Args:
          inputs: ndarray of shape (N, num_inputs)
        Returns:
          An ndarray of shape (N, num_outputs)
        """
        # TODO
        pass

    def backward_inputs(self, grads):
        """
        Args:
          grads: ndarray of shape (N, num_outputs)
        Returns:
          An ndarray of shape (N, num_inputs)
        """
        # TODO
        pass

    def backward_params(self, grads):
        """
        Args:
          grads: ndarray of shape (N, num_outputs)
        Returns:
          List of params and gradient pairs.
        """
        # TODO
        grad_weights = ...
        grad_bias = ...
        return [[self.weights, grad_weights], [self.bias, grad_bias], self.name]



class ReLU(Layer):
    def __init__(self, input_layer, name):
        self.shape = input_layer.shape
        self.name = name
        self.has_params = False

    def forward(self, inputs):
        """
        Args:
          inputs: ndarray of shape (N, C, H, W).
        Returns:
          ndarray of shape (N, C, H, W).
        """
        # TODO
        pass

    def backward_inputs(self, grads):
        """
        Args:
          grads: ndarray of shape (N, C, H, W).
        Returns:
          ndarray of shape (N, C, H, W).
        """
        # TODO
        pass


class SoftmaxCrossEntropyWithLogits():
    def __init__(self):
        self.has_params = False

    def forward(self, x, y):
        """
        Args:
          x: ndarray of shape (N, num_classes).
          y: ndarray of shape (N, num_classes).
        Returns:
          Scalar, average loss over N examples.
          It is better to compute average loss here instead of just sum
          because then learning rate and weight decay won't depend on batch size.

        """
        # TODO
        pass

    def backward_inputs(self, x, y):
        """
        Args:
          x: ndarray of shape (N, num_classes).
          y: ndarray of shape (N, num_classes).
        Returns:
          Gradient with respect to the x, ndarray of shape (N, num_classes).
        """
        # Hint: don't forget that we took the average in the forward pass
        # TODO
        pass


class L2Regularizer():
    def __init__(self, weights, weight_decay, name):
        """
        Args:
          weights: parameters which will be regularizerized
          weight_decay: lambda, regularization strength
          name: layer name
        """
        # this is still a reference to original tensor so don't change self.weights
        self.weights = weights
        self.weight_decay = weight_decay
        self.name = name

    def forward(self):
        """
         Returns:
          Scalar, loss due to the L2 regularization.
        """
        # TODO
        pass

    def backward_params(self):
        """
        Returns:
          Gradient of the L2 loss with respect to the regularized weights.
        """
        # TODO
        grad_weights = ...
        return [[self.weights, grad_weights], self.name]


class RegularizedLoss():
    def __init__(self, data_loss, regularizer_losses):
        self.data_loss = data_loss
        self.regularizer_losses = regularizer_losses
        self.has_params = True
        self.name = 'RegularizedLoss'

    def forward(self, x, y):
        loss_val = self.data_loss.forward(x, y)
        for loss in self.regularizer_losses:
            loss_val += loss.forward()
        return loss_val

    def backward_inputs(self, x, y):
        return self.data_loss.backward_inputs(x, y)

    def backward_params(self):
        grads = []
        for loss in self.regularizer_losses:
            grads += [loss.backward_params()]
        return grads



## Gradient checker

In [None]:
def rel_error(x, y):
    """ returns relative error """
    return np.max(np.abs(x - y) / (np.maximum(1e-8, np.abs(x) + np.abs(y))))

def eval_numerical_gradient(f, x, df, h=1e-5):
    """
    Evaluate a numeric gradient for a function that accepts a numpy
    array and returns a numpy array.
    - f should be a function that takes a single argument
    - x is the point (numpy array) to evaluate the gradient at
    """
    grad = np.zeros_like(x)
    it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
    while not it.finished:
        ix = it.multi_index

        oldval = x[ix]
        x[ix] = oldval + h
        # evaluate f(x + h)
        pos = f(x.copy()).copy()
        x[ix] = oldval - h
        # evaluate f(x - h)
        neg = f(x.copy()).copy()
        x[ix] = oldval

        # compute the partial derivative with centered formula
        grad[ix] = np.sum((pos - neg) * df) / (2 * h)
        # step to next dimension
        it.iternext()
    return grad

def check_grad_inputs(layer, x, grad_out):
    """
    Args:
    layer: Layer object
    x: ndarray tensor input data
    grad_out: ndarray tensor gradient from the next layer
    """
    grad_x_num = eval_numerical_gradient(layer.forward, x, grad_out)
    grad_x = layer.backward_inputs(grad_out)
    print("Relative error = ", rel_error(grad_x_num, grad_x))
    print("Error norm = ", np.linalg.norm(grad_x_num - grad_x))

def check_grad_params(layer, x, w, b, grad_out):
    """
    Args:
    layer: Layer object
    x: ndarray tensor input data
    w: ndarray tensor layer weights
    b: ndarray tensor layer biases
    grad_out: ndarray tensor gradient from the next layer
    """
    func = lambda params: layer.forward(x)
    grad_w_num = eval_numerical_gradient(func, w, grad_out)
    grad_b_num = eval_numerical_gradient(func, b, grad_out)
    grads = layer.backward_params(grad_out)
    grad_w = grads[0][1]
    grad_b = grads[1][1]
    print("Check weights:")
    print("Relative error = ", rel_error(grad_w_num, grad_w))
    print("Error norm = ", np.linalg.norm(grad_w_num - grad_w))
    print("Check biases:")
    print("Relative error = ", rel_error(grad_b_num, grad_b))
    print("Error norm = ", np.linalg.norm(grad_b_num - grad_b))

print("Convolution")
x = np.random.randn(4, 3, 5, 5)
grad_out = np.random.randn(4, 2, 5, 5)
conv = Convolution(x, 2, 3, "conv1")
print("Check grad wrt input")
check_grad_inputs(conv, x, grad_out)
print("Check grad wrt params")
check_grad_params(conv, x, conv.weights, conv.bias, grad_out)

print("\nMaxPooling")
x = np.random.randn(5, 4, 8, 8)
grad_out = np.random.randn(5, 4, 4, 4)
pool = MaxPooling(x, "pool", 2, 2)
print("Check grad wrt input")
check_grad_inputs(pool, x, grad_out)

print("\nReLU")
x = np.random.randn(4, 3, 5, 5)
grad_out = np.random.randn(4, 3, 5, 5)
relu = ReLU(x, "relu")
print("Check grad wrt input")
check_grad_inputs(relu, x, grad_out)

print("\nFC")
x = np.random.randn(20, 40)
grad_out = np.random.randn(20, 30)
fc = FC(x, 30, "fc")
print("Check grad wrt input")
check_grad_inputs(fc, x, grad_out)
print("Check grad wrt params")
check_grad_params(fc, x, fc.weights, fc.bias, grad_out)

print("\nSoftmaxCrossEntropyWithLogits")
x = np.random.randn(50, 20)
y = np.zeros([50, 20])
y[:,0] = 1
loss = SoftmaxCrossEntropyWithLogits()
grad_x_num = eval_numerical_gradient(lambda x: loss.forward(x, y), x, 1)
out = loss.forward(x, y)
grad_x = loss.backward_inputs(x, y)
print("Relative error = ", rel_error(grad_x_num, grad_x))
print("Error norm = ", np.linalg.norm(grad_x_num - grad_x))

print("\nL2Regularizer")
x = np.random.randn(5, 4, 8, 8)
grad_out = np.random.randn(5, 4, 4, 4)
l2reg = L2Regularizer(x, 1e-2, 'L2reg')
print("Check grad wrt params")
func = lambda params: l2reg.forward()
grad_num = eval_numerical_gradient(func, l2reg.weights, 1)
grads = l2reg.backward_params()
grad = grads[0][1]
print("Relative error = ", rel_error(grad_num, grad))
print("Error norm = ", np.linalg.norm(grad_num - grad))


In [22]:
def forward_pass(net, inputs):
    output = inputs
    for layer in net:
        output = layer.forward(output)
    return output


def backward_pass(net, loss, x, y):
    grads = []
    grad_out = loss.backward_inputs(x, y)
    if loss.has_params:
        grads += loss.backward_params()
    for layer in reversed(net):
        grad_inputs = layer.backward_inputs(grad_out)
        if layer.has_params:
            grads += [layer.backward_params(grad_out)]
        grad_out = grad_inputs
    return grads

def sgd_update_params(grads, config):
    lr = config['lr']
    for layer_grads in grads:
        for i in range(len(layer_grads) - 1):
            params = layer_grads[i][0]
            grads = layer_grads[i][1]
            #print(layer_grads[-1], " -> ", grads.sum())
            params -= lr * grads


def draw_conv_filters(epoch, step, layer, save_dir):
    C = layer.C
    w = layer.weights.copy()
    num_filters = w.shape[0]
    k = int(np.sqrt(w.shape[1] / C))
    w = w.reshape(num_filters, C, k, k)
    w -= w.min()
    w /= w.max()
    border = 1
    cols = 8
    rows = math.ceil(num_filters / cols)
    width = cols * k + (cols-1) * border
    height = rows * k + (rows-1) * border
    #for i in range(C):
    for i in range(1):
        img = np.zeros([height, width])
        
        for j in range(num_filters):
            r = int(j / cols) * (k + border)
            c = int(j % cols) * (k + border)
            img[r:r+k,c:c+k] = w[j,i]
            
        filename = '%s_epoch_%02d_step_%06d_input_%03d.png' % (layer.name, epoch, step, i)
        ski.io.imsave(os.path.join(save_dir, filename), img)


def train(train_x, train_y, valid_x, valid_y, net, loss, config):
    lr_policy = config['lr_policy']
    batch_size = config['batch_size']
    max_epochs = config['max_epochs']
    save_dir = config['save_dir']
    num_examples = train_x.shape[0]
    assert num_examples % batch_size == 0
    num_batches = num_examples // batch_size
    for epoch in range(1, max_epochs+1):
        if epoch in lr_policy:
            solver_config = lr_policy[epoch]
            
        cnt_correct = 0
        #for i in range(num_batches):
        # shuffle the data at the beggining of each epoch
        permutation_idx = np.random.permutation(num_examples)
        train_x = train_x[permutation_idx]
        train_y = train_y[permutation_idx]
        #for i in range(100):
        for i in range(num_batches):
            # store mini-batch to ndarray
            batch_x = train_x[i*batch_size:(i+1)*batch_size, :]
            batch_y = train_y[i*batch_size:(i+1)*batch_size, :]
            logits = forward_pass(net, batch_x)
            loss_val = loss.forward(logits, batch_y)
            # compute classification accuracy
            yp = np.argmax(logits, 1)
            yt = np.argmax(batch_y, 1)
            cnt_correct += (yp == yt).sum()
            grads = backward_pass(net, loss, logits, batch_y)
            sgd_update_params(grads, solver_config)
            
            if i % 5 == 0:
                print("epoch %d, step %d/%d, batch loss = %.2f" % (epoch, i*batch_size, num_examples, loss_val))
                
            if i % 100 == 0:
                draw_conv_filters(epoch, i*batch_size, net[0], save_dir)
                #draw_conv_filters(epoch, i*batch_size, net[3])
            if i > 0 and i % 50 == 0:
                print("Train accuracy = %.2f" % (cnt_correct / ((i+1)*batch_size) * 100))
        print("Train accuracy = %.2f" % (cnt_correct / num_examples * 100))
        evaluate("Validation", valid_x, valid_y, net, loss, config)
    return net


def evaluate(name, x, y, net, loss, config):
    print("\nRunning evaluation: ", name)
    batch_size = config['batch_size']
    num_examples = x.shape[0]
    assert num_examples % batch_size == 0
    num_batches = num_examples // batch_size
    cnt_correct = 0
    loss_avg = 0
    for i in range(num_batches):
        batch_x = x[i*batch_size:(i+1)*batch_size, :]
        batch_y = y[i*batch_size:(i+1)*batch_size, :]
        logits = forward_pass(net, batch_x)
        yp = np.argmax(logits, 1)
        yt = np.argmax(batch_y, 1)
        cnt_correct += (yp == yt).sum()
        loss_val = loss.forward(logits, batch_y)
        loss_avg += loss_val
        #print("step %d / %d, loss = %.2f" % (i*batch_size, num_examples, loss_val / batch_size))
    valid_acc = cnt_correct / num_examples * 100
    loss_avg /= num_batches
    print(name + " accuracy = %.2f" % valid_acc)
    print(name + " avg loss = %.2f\n" % loss_avg)



## Training

In [None]:
from tensorflow.examples.tutorials.mnist import input_data

import nn

DATA_DIR = '/home/kivan/datasets/MNIST/'
SAVE_DIR = "/home/kivan/source/fer/out/"

config = {}
config['max_epochs'] = 8
config['batch_size'] = 50
config['save_dir'] = SAVE_DIR
config['lr_policy'] = {1:{'lr':1e-1}, 3:{'lr':1e-2}, 5:{'lr':1e-3}, 7:{'lr':1e-4}}

#np.random.seed(100) 
np.random.seed(int(time.time() * 1e6) % 2**31)
dataset = input_data.read_data_sets(DATA_DIR, one_hot=True)
train_x = dataset.train.images
train_x = train_x.reshape([-1, 1, 28, 28])
train_y = dataset.train.labels
valid_x = dataset.validation.images
valid_x = valid_x.reshape([-1, 1, 28, 28])
valid_y = dataset.validation.labels
test_x = dataset.test.images
test_x = test_x.reshape([-1, 1, 28, 28])
test_y = dataset.test.labels
train_mean = train_x.mean()
train_x -= train_mean
valid_x -= train_mean
test_x -= train_mean


net = []
inputs = np.random.randn(config['batch_size'], 1, 28, 28)
net += [layers.Convolution(inputs, 16, 5, "conv1")]
net += [layers.MaxPooling(net[-1], "pool1")]
net += [layers.ReLU(net[-1], "relu1")]
net += [layers.Convolution(net[-1], 32, 5, "conv2")]
net += [layers.MaxPooling(net[-1], "pool2")]
net += [layers.ReLU(net[-1], "relu2")]
# out = 7x7
net += [layers.Flatten(net[-1], "flatten3")]
net += [layers.FC(net[-1], 512, "fc3")]
net += [layers.ReLU(net[-1], "relu3")]
net += [layers.FC(net[-1], 10, "logits")]

loss = layers.SoftmaxCrossEntropyWithLogits()

nn.train(train_x, train_y, valid_x, valid_y, net, loss, config)
nn.evaluate("Test", test_x, test_y, net, loss, config)

