In [None]:
import numpy as np
import math
from copy import deepcopy

In [None]:
def glorot_init(weights):
	range = np.sqrt(6. / (weights[0].shape[0] + weights[0].shape[1]))
	weights[0] = np.random.uniform(-range, range, size = weights[0].shape)
	weights[1] = np.zeros(weights[1].shape)

def kaiming_init(weights):
	range = np.sqrt(6. / weights[0].shape[1])
	weights[0] = np.random.uniform(-range, range, size = weights[0].shape)
	weights[1] = np.zeros(weights[1].shape) + 0.01

def shuffle(x, y):
	index_list = np.array([i for i in range(x.shape[0])])
	np.random.shuffle(index_list)
	x = x[index_list]
	y = y[index_list]

def flatten(x):
	return x.reshape(x.shape[0], -1)

In [None]:
class Layer:

	def create(self, input_size):
		pass

	def forward(self, input):
		pass

	def backward(self, gradient):
		pass


class ReLU(Layer):

	def create(self, input_size):
		self.input_size = input_size
		self.output_size = input_size

	def forward(self, input):
		self.input = input
		return np.maximum(0, input)

	def backward(self, gradient):
		return gradient * (self.input > 0)


class Tanh(Layer):

	def create(self, input_size):
		self.input_size = input_size
		self.output_size = input_size

	def forward(self, input):
		self.input = input
		return np.tanh(input)

	def backward(self, gradient):
		return gradient * (1 - np.tanh(self.input) ** 2)


class Softmax(Layer):

	def create(self, input_size):
		self.input_size = input_size
		self.output_size = input_size

	def forward(self, input):
		self.input = input
		b = input.max()
		e = np.exp(input - b)
		return e / e.sum()

	def backward(self, gradient):
		return gradient.copy()


class Input(Layer):

	def __init__(self, input_size):
		self.input_size = input_size
		self.output_size = input_size

	def forward(self, input):
		return input.copy()

	def backward(self, gradient):
		return gradient.copy()

In [None]:
class Parameter(Layer):

	def init(self, next_layer):
		pass


class Linear(Parameter):

	def __init__(self, nb_neurons):
		self.output_size = nb_neurons

	def create(self, input_size):
		self.input_size = input_size
		self.weights = [np.zeros((self.output_size, input_size)), np.zeros((self.output_size))]
		self.velocities = [np.zeros((self.output_size, input_size)), np.zeros((self.output_size))]
		self.m = [np.zeros((self.output_size, input_size)), np.zeros((self.output_size))]
		self.v = [np.zeros((self.output_size, input_size)), np.zeros((self.output_size))]
		self.gradients = [np.zeros((self.output_size, input_size)), np.zeros((self.output_size))]

	def init(self, next_layer):
		if type(next_layer) == ReLU:
			kaiming_init(self.weights)
		else:
			glorot_init(self.weights)

	def forward(self, input):
		self.input = input
		return self.weights[0] @ input + self.weights[1]

	def backward(self, gradient):
		self.gradients[0] += np.outer(gradient, self.input)
		self.gradients[1] += gradient
		return self.weights[0].T @ gradient

In [None]:
class Optimizer:

	def init(self):
		pass

	def update(self, layers):
		pass

	def iteration(self):
		pass


class SGD(Optimizer):

	def __init__(self, learning_rate = 0.01, momentum = 0.9):
		self.learning_rate = learning_rate
		self.momentum = momentum

	def init(self):
		pass

	def update(self, layers):
		for layer in layers:
			if isinstance(layer, Parameter):
				for i in range(len(layer.weights)):
					layer.velocities[i] = (self.momentum * layer.velocities[i]) + ((1. - self.momentum) * layer.gradients[i])
					layer.weights[i] -= layer.velocities[i] * self.learning_rate

	def iteration(self):
		pass


class Adam(Optimizer):

	def __init__(self, learning_rate = 0.001, beta_1 = 0.9, beta_2 = 0.999, epsilon = 1e-8):
		self.learning_rate = learning_rate
		self.beta_1 = beta_1
		self.beta_2 = beta_2
		self.epsilon = epsilon
		self.t = 1

	def init(self):
		self.t = 1

	def update(self, layers):
		for layer in layers:
			if isinstance(layer, Parameter):
				for i in range(len(layer.weights)):
					layer.m[i] = self.beta_1 * layer.m[i] + (1. - self.beta_1) * layer.gradients[i]
					layer.v[i] = self.beta_2 * layer.v[i] + (1. - self.beta_2) * layer.gradients[i] ** 2
					m_hat = layer.m[i] / (1. - (self.beta_1 ** self.t))
					v_hat = layer.v[i] / (1. - (self.beta_2 ** self.t))
					layer.weights[i] -= self.learning_rate * m_hat / (np.sqrt(v_hat) + self.epsilon)

	def iteration(self):
		self.t += 1

In [None]:
class Loss:

	def __init__(self):
		pass

	def forward(self, output, target):
		pass

	def backward(self, output, target):
		pass


class NegativeLogLikelihood(Loss):

	def forward(self, output, target):
		return -np.log(output[target])

	def backward(self, output, target):
		gradient = output.copy()
		gradient[target] -= 1
		return gradient

In [None]:
class Model:

	def __init__(self):
		self.layers = []

	def add(self, layer):

		if not isinstance(layer, Layer):
			raise TypeError("Layer must be an instance of Layer")

		if len(self.layers) > 0:
			if type(layer) == Input:
				raise Exception("Input layer cannot be added after other layers")
			layer.create(self.layers[-1].output_size)
		elif type(layer) != Input:
			raise Exception("First layer must be an input layer")
		else:
			self.input_size = layer.input_size

		self.layers.append(layer)

	def compile(self, loss, optimizer):

		if not isinstance(optimizer, Optimizer):
			raise TypeError("Optimizer must be an instance of Optimizer")
		if not isinstance(loss, Loss):
			raise TypeError("Loss must be an instance of Loss")

		self.optimizer = optimizer
		self.loss = loss

		for i in range(len(self.layers) - 1):
			if type(self.layers[i]) == Softmax:
				raise Exception("Softmax layer must be the last layer")
			elif isinstance(self.layers[i], Parameter):
				self.layers[i].init(self.layers[i + 1])

		if isinstance(self.layers[-1], Parameter):
			self.layers[-1].init(None)

		self.output_size = self.layers[-1].output_size

	def forward(self, input):

		for layer in self.layers:
			input = layer.forward(input)

		return input

	def backward(self, gradient):

		for layer in reversed(self.layers):
			gradient = layer.backward(gradient)

	def clear_gradients(self):

		for layer in self.layers:
			if isinstance(layer, Parameter):
				for gradient in layer.gradients:
					gradient[:] = 0

	def average_gradients(self, batch_size):

		for layer in self.layers:
			if isinstance(layer, Parameter):
				for gradient in layer.gradients:
					gradient /= batch_size

	def check_input(self, x, y):

		nb_data = x.shape[0]

		if y.shape[0] != nb_data:
			raise Exception("Number of labels must be equal to the number of data")

		x_copy = flatten(x.copy())
		y_copy = flatten(y.copy())

		if x_copy.shape[1] != self.input_size:
			raise Exception("Features size must be equal to the input size of the model")

		if type(self.loss) == NegativeLogLikelihood:
			if y_copy.shape[1] != self.output_size and y_copy.shape[1] != 1:
				raise Exception("Labels size must equal to 1 or the output size of the model")
			if y_copy.shape[1] == self.output_size:
				y_copy = y_copy.argmax(axis = 1)
		else:
			if y_copy.shape[1] != self.output_size:
				raise Exception("Labels size must be equal to the output size of the model")

		return x_copy, y_copy

	def train(self, x_train, y_train, epochs, batch_size, x_val = None, y_val = None, print_frequency = 1):

		x_train_copy, y_train_copy = self.check_input(x_train, y_train)

		if x_val is not None:
			x_val_copy, y_val_copy = self.check_input(x_val, y_val)

		nb_data = x_train_copy.shape[0]
		best_dev = 0
		best_model = deepcopy(self)
		self.optimizer.init()

		for epoch in range(epochs):

			shuffle(x_train_copy, y_train_copy)

			for batch in range(math.floor(nb_data / batch_size)):

				loss = 0
				accuracy = 0
				self.clear_gradients()

				for i in range(batch * batch_size, (batch + 1) * batch_size):

					# Forward
					output = self.forward(x_train_copy[i])
					loss += self.loss.forward(output, y_train_copy[i])

					if output.argmax() == y_train_copy[i]:
						accuracy += 1

					# Backward
					gradient = self.loss.backward(output, y_train_copy[i])
					self.backward(gradient)

				# Update
				self.average_gradients(batch_size)
				self.optimizer.update(self.layers)

				# Tests
				if (batch + 1) % print_frequency == 0:

					loss /= batch_size
					accuracy /= batch_size

					if x_val_copy is not None:

						val_accuracy, val_loss = self.test(x_val_copy, y_val_copy, False, False)

						# Save the best model
						if val_accuracy > best_dev :
							best_model = deepcopy(self)
							best_dev = val_accuracy

						msg = "Epoch %i | batch %i | train loss: %.2f | train accuracy: %.1f%% | dev loss: %.2f | dev accuracy: %.1f%%" % (epoch + 1, batch + 1, loss, accuracy * 100., val_loss, val_accuracy * 100.)

					else:
						msg = "Epoch %i | batch %i | train loss: %.2f | train accuracy: %.1f%%" % (epoch + 1, batch + 1, loss, accuracy * 100.)

					if batch == int(nb_data / batch_size) - 1: print(msg)
					else: print(msg, end = "\r")

			self.optimizer.iteration()

		if x_val_copy is not None:
			self.layers = best_model.layers

	def predict(self, x):
		output = self.forward(x)
		return output.argmax()

	def test(self, x, y, check_data = True, print_results = True):

		if check_data:
			x_copy, y_copy = self.check_input(x, y)
		else:
			x_copy, y_copy = x, y

		nb_data = x.shape[0]
		loss = 0
		accuracy = 0

		for i in range(nb_data):

			output = self.forward(x_copy[i])
			loss += self.loss.forward(output, y_copy[i])

			if output.argmax() == y_copy[i]:
				accuracy += 1

		if print_results:
			print("Test loss: %.2f | test accuracy: %.1f%%" % (loss / nb_data, (accuracy / nb_data) * 100.))

		return accuracy / nb_data, loss / nb_data

In [None]:
import os
import dataset_loader

# Download mnist dataset
if("mnist.pkl.gz" not in os.listdir(".")):
	# this link doesn't work any more,
	# seach on google for the file "mnist.pkl.gz"
	# and download it
	!wget http://deeplearning.net/data/mnist/mnist.pkl.gz

# if you have it somewhere else, you can comment the lines above
# and overwrite the path below
mnist_path = "./mnist.pkl.gz"

# load the 3 splits
train_data, dev_data, test_data = dataset_loader.load_mnist(mnist_path)

In [None]:
x_train = np.array(train_data[0])
y_train = np.array(train_data[1])
x_dev = np.array(dev_data[0])
y_dev = np.array(dev_data[1])
x_test = np.array(test_data[0])
y_test = np.array(test_data[1])

In [None]:
model = Model()
model.add(Input(28 * 28))
model.add(Linear(128))
model.add(ReLU())
model.add(Linear(10))
model.add(ReLU())
model.add(Softmax())
model.compile(NegativeLogLikelihood(), Adam())

In [None]:
model.train(x_train, y_train, 10, 100, x_dev, y_dev, 100)

In [None]:
model.test(x_test, y_test)