In [476]:
import numpy as np
import pandas as pd
import scipy.special

In [477]:
class AutoEncoder:

	def __init__(self, spec:dict, learning_rate:int=0.05):
		if not "input" in [layer["type"] for layer in spec["layers"]]:
			raise ValueError("Models must contain at least one input layer.")
		if not "hidden" in [layer["type"] for layer in spec["layers"]]:
			raise ValueError("Models must contain at least one hidden layer.")
		if not "output" in [layer["type"] for layer in spec["layers"]]:
			raise ValueError("Models must contain at least one output layer.")
		if 0 in [layer["nodes"] for layer in spec["layers"]]:
			raise ValueError("All layers must contain at least one node.")

		self.spec = spec
		self.learning_rate = learning_rate
		
		for i, layer in enumerate(spec["layers"]):
			if i < len(spec["layers"]) - 1:
				next_layer = spec["layers"][i+1]
				self.spec["layers"][i]["weights"] = np.random.normal(
					loc=0.0, 
					scale=pow(layer["nodes"], -0.5), 
					size=(next_layer["nodes"], layer["nodes"])
				)
		self.model_summary()
	
	def train(self, instances:list, epochs:int=1) -> None:
		"""Trains model using input instances and labels.

		Args:
			instances (list): tensor of input instances.
			epochs (int, optional): number of epochs for which to
			train the model. Defaults to 1.
		"""
		print("\n".join([
			"========================================",
			"Training Model",
			"========================================",
		]))
		for epoch in range(epochs):
			print(f"Epoch: {epoch}/{epochs}")
			for i, instance in enumerate(instances):
				if (((i+1)/len(instances)) * 100) % 10 == 0:
					print(f"Progress: {((i+1)/len(instances)) * 100}%")
				
				activations = self.forward_pass(instance)
				self.backpropogate(instance, activations)

	def forward_pass(self, instance:list) -> list:
		"""Conducts a forward pass through the model,
		generating activations for hidden and output
		layer(s).

		Args:
			instance (list): instance for which to conduct forward 
			pass.

		Returns:
			list: list of dictionaries containing activations
			at each layer.
		"""
		activations = self.spec["layers"]
		for i, layer in enumerate(self.spec["layers"]):
			if layer["type"] == "input":
				activations[i]["activations"] = np.array(instance, ndmin=2)
			else:
				inputs = np.dot(activations[i-1]["weights"], activations[i-1]["activations"].T)
				activations[i]["activations"] = layer["activation_func"](inputs).T
		return activations

	def backpropogate(self, instance:list, activations:list) -> None:
		"""Adjusts model weights through backpropogation.
		Implemented backpropogation assumes sigmoid activation
		functions.

		Args:
			instance (list): instance for which to backpropogate.
			activations (list): list of dictionaries containing activations
			at each layer. Output by self.forward_pass.
		"""
		for i, layer in reversed(list(enumerate(activations))[1:]):
			if layer["type"] == "output":
				activations[i]["errors"] = np.array(instance, ndmin=2).T - layer["activations"].T
			else:
				activations[i]["errors"] = np.dot(layer["weights"].T, activations[i+1]["errors"])
			
			self.spec["layers"][i-1]["weights"] += self.learning_rate * (
				np.dot(
					(activations[i]["errors"] * activations[i]["activations"].T * (1 - activations[i]["activations"].T)),
					activations[i-1]["activations"]
				)
			)

	def query(self, instance):
		pass
	
	def evaluate(self, instances, labels):
		pass

	def model_summary(self):
		print("\n".join([
			"========================================",
			"Model summary: {}".format(self.spec["name"]),
			"========================================",
			*["{} layer:	{} nodes".format(layer["type"].title(), layer["nodes"]) for layer in self.spec["layers"]],
			"========================================",
		]))

In [478]:
spec = {
	"name": "MNIST_autoencoder",
	"layers": [
		{ "type": "input", "nodes": 784 },
		{ "type": "hidden", "nodes": 32, "activation_func": lambda x: scipy.special.expit(x) },
		{ "type": "hidden", "nodes": 10, "activation_func": lambda x: scipy.special.expit(x) },
		{ "type": "hidden", "nodes": 32, "activation_func": lambda x: scipy.special.expit(x) },
		{ "type": "output", "nodes": 784, "activation_func": lambda x: scipy.special.expit(x) },
	],
}

model = AutoEncoder(spec, 0.05)

(32, 784)
(10, 32)
(32, 10)
(784, 32)
Model summary: MNIST_autoencoder
Input layer:	784 nodes
Hidden layer:	32 nodes
Hidden layer:	10 nodes
Hidden layer:	32 nodes
Output layer:	784 nodes


In [479]:
def load_mnist_data(file_path:str, nrows:int=0) -> tuple[list, list]:
	"""Reads in MNIST dataset, aquired from: 
	https://www.kaggle.com/datasets/oddrationale/mnist-in-csv.

	Args:
		file_path (str): file path to the dataset.
		nrows (int, optional): number of rows to read in. 
		Defaults to all rows.

	Returns:
		tuple[list, list]: image instances, and corresponding 
		one-hot encoded labels.
	"""
	if nrows:
		df = pd.read_csv(filepath_or_buffer=file_path, nrows=nrows)
	else: 
		df = pd.read_csv(filepath_or_buffer=file_path)
	x = ((df[df.columns[1:]]/255 * 0.99) + 0.01).to_numpy()
	y = pd.get_dummies(df["label"]).to_numpy()
	return x, y

x_train, y_train = load_mnist_data(
	file_path="data/mnist_train.csv",
	nrows=1000,
)
x_test, y_test = load_mnist_data(
	file_path="data/mnist_test.csv",
	nrows=100,
)

In [480]:
model.train(instances=x_train)

Training Model
Epoch: 0/1
Progress: 10.0%
Progress: 20.0%
Progress: 30.0%
Progress: 40.0%
Progress: 50.0%
Progress: 60.0%
Progress: 70.0%
Progress: 80.0%
Progress: 90.0%
Progress: 100.0%
