In [1]:
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import tensorflow as tf

In [2]:
def one_hot_encode(y, num_classes):
    return np.eye(num_classes)[y]

In [3]:
# read mnist data
def read_mnist_images(filename):
    with open(filename, 'rb') as f:
        data = f.read()
        assert int.from_bytes(data[:4], byteorder='big') == 2051
        n_images = int.from_bytes(data[4:8], byteorder='big')
        n_rows = int.from_bytes(data[8:12], byteorder='big')
        n_cols = int.from_bytes(data[12:16], byteorder='big')
        images = np.frombuffer(data, dtype=np.uint8, offset=16).reshape(n_images, n_rows, n_cols)
        return images, n_images


def read_mnist_labels(filename):
    with open(filename, 'rb') as f:
        data = f.read()
        assert int.from_bytes(data[:4], byteorder='big') == 2049
        labels = np.frombuffer(data, dtype=np.uint8, offset=8)
        return labels


x, num_inputs = read_mnist_images('mnist/train-images.idx3-ubyte')
x = x.reshape((-1, 1, 28, 28))
x = x / 255.
y = one_hot_encode(read_mnist_labels('mnist/train-labels.idx1-ubyte'), 10)

In [4]:
class LayerDense:
    def __init__(self, n_inputs, n_neurons):
        rng = np.random.default_rng()
        variance = np.sqrt(6/(n_inputs + n_neurons))
        self.weights = (rng.standard_normal(size=(n_inputs, n_neurons))) * variance # xavier initialization from tensorflow
        self.biases = np.zeros((1, n_neurons))

    def forward(self, inputs):
        self.output = np.dot(inputs, self.weights) + self.biases
        self.inputs = inputs


    def backward(self, d_values):
        self.dweights = np.dot(self.inputs.T, d_values) / len(self.inputs)
        self.dbiases = np.sum(d_values, axis=0, keepdims=True) / len(self.inputs)
        self.dinputs = np.dot(d_values, self.weights.T) / len(self.inputs)

In [5]:
class LayerConvolutional:
    def __init__(self, num_kernels, num_channels, kernel_size):
        rng = np.random.default_rng()
        variance = 1/10
        #self.weights =  0.1 * rng.standard_normal(size=(num_kernels, num_channels, kernel_size, kernel_size))
        self.weights = (2*rng.standard_normal(size=(num_kernels, num_channels, kernel_size, kernel_size))) * variance
        self.biases = np.zeros(num_kernels)
        self.kernel_size = kernel_size
        self.num_kernels = num_kernels
        self.num_channels = num_channels
    
    def forward(self, inputs):
        if hasattr(self, 'output'):
            delattr(self, 'output')
        self.pad_size_rows = inputs.shape[-2] - (inputs.shape[-2] - self.kernel_size + 1)
        self.pad_size_cols = inputs.shape[-1] - (inputs.shape[-1] - self.kernel_size + 1)
        self.inputs = inputs
        self.padded_inputs = np.pad(inputs, ((0, 0), (0, 0), (0, self.pad_size_rows), (0, self.pad_size_cols)))
        if self.num_channels != self.inputs.shape[1]:
            raise Exception('Error: number of filter and image channels does not match.')
        for kernel, bias in zip(self.weights, self.biases):
            if not hasattr(self, 'output'):
                self.output = sp.signal.convolve(self.padded_inputs, [kernel], mode='valid') + bias
            else:
                self.output = np.append(self.output, sp.signal.convolve(self.padded_inputs, [kernel], mode='valid') + bias, axis=1)
    
    def backward(self, d_values):
        self.dbiases = np.zeros_like(self.biases)
        self.dinputs = np.zeros_like(self.inputs)
        if hasattr(self, 'dweights'):
            delattr(self, 'dweights')
        for kernel_id in range(self.num_kernels):
            kernel = self.weights[kernel_id]
            rotated_kernel = np.rot90(kernel, k=2, axes=(1, 2))
            per_kernel_dvalues = d_values[:, kernel_id:(kernel_id+1), :, :]
            for channel_id in range(self.num_channels):
                channel_rot_kernel = rotated_kernel[channel_id]
                channel_inputs = self.padded_inputs[:, channel_id:channel_id+1, :, :]
                if channel_id == 0:
                    cur_dfilter = sp.signal.convolve(channel_inputs, per_kernel_dvalues, mode='valid').reshape((-1, 1, self.kernel_size, self.kernel_size))
                else:
                    cur_dfilter = np.append(cur_dfilter, sp.signal.convolve(channel_inputs, per_kernel_dvalues, mode='valid').reshape((-1, 1, self.kernel_size, self.kernel_size)), axis=1)
                self.dinputs[:, channel_id:channel_id+1, :, :] += sp.signal.convolve(per_kernel_dvalues, [[channel_rot_kernel]], mode='full')[:, :, (self.pad_size_rows//2):-(self.pad_size_rows//2), (self.pad_size_cols//2):-(self.pad_size_cols//2)]
            if not hasattr(self, 'dweights'):
                self.dweights = cur_dfilter
            else:
                self.dweights = np.append(self.dweights, cur_dfilter, axis=0)
            self.dbiases[kernel_id] += np.sum(per_kernel_dvalues)
            
            self.dbiases /= len(self.inputs)
            self.dinputs /= len(self.inputs)
            self.dweights /= len(self.inputs)

In [6]:
class LayerMaxPooling:
    def __init__(self, kernel_size=2, stride=2):
        self.kernel_size = kernel_size
        self.stride = stride
    
    def forward(self, inputs):
        self.inputs = inputs
        pad_cols = self.inputs.shape[-1] % self.stride if self.inputs.shape[-1] % self.stride != 0 else 0
        pad_rows = self.inputs.shape[-2] % self.stride if self.inputs.shape[-2] % self.stride != 0 else 0
        self.padded_inputs = np.pad(inputs, ((0, 0), (0, 0), (0, pad_rows), (0, pad_cols))) if pad_cols or pad_rows else inputs
        self.windowed_padded_inputs = np.lib.stride_tricks.sliding_window_view(self.padded_inputs, (self.kernel_size, self.kernel_size), axis=(2, 3))[:, :, ::self.stride, ::self.stride]
        self.output = self.windowed_padded_inputs.max(axis=(4, 5))
        
    def backward(self, d_values):
        self.dinputs = np.zeros_like(self.inputs)
        samples, channels, rows, cols, window_rows, window_cols = self.windowed_padded_inputs.shape
        for sample in range(samples):
            for channel_id in range(channels):
                for row in range(rows):
                    for col in range(cols):
                        window = self.windowed_padded_inputs[sample, channel_id, row, col]
                        max_id = np.argmax(window)
                        row_placement_in_block = max_id // window_cols
                        col_placement_in_block = max_id % window_cols
                        self.dinputs[sample, channel_id, window_rows*row + row_placement_in_block, window_cols*col + col_placement_in_block] = d_values[sample, channel_id, row, col]

In [7]:
class LayerFlatten:
    def forward(self, inputs):
        self.num_samples, self.num_channels, self.num_rows, self.num_cols = inputs.shape
        self.output = inputs.reshape((self.num_samples, self.num_channels * self.num_rows * self.num_cols))
    
    def backward(self, d_values):
        self.dinputs = d_values.reshape((self.num_samples, self.num_channels, self.num_rows, self.num_cols))

In [8]:
class ActivationReLU:
    def forward(self, inputs):
        self.output = np.maximum(0, inputs)
        self.inputs = inputs

    def backward(self, d_values):
        self.dinputs = d_values.copy()
        self.dinputs[self.inputs <= 0] = 0

In [9]:
class ActivationSoftmaxLossCategoricalCrossentropy:
    def forward(self, inputs, correct_labels):
        exp_values = np.exp(inputs - np.max(inputs, axis=1, keepdims=True)) # -np.max(...) for numerical stability with big number
        self.output = exp_values / np.sum(exp_values, axis=1, keepdims=True)
        predictions = np.clip(self.output, 1e-7, 1 - 1e-7)
        loss = np.sum(-np.log(predictions) * correct_labels)
        return loss

    def backward(self, dvalues, correct_labels):
        self.dinputs = dvalues - correct_labels

In [10]:
class OptimizerSGD:
    def __init__(self, learning_rate=1.0, decay=0., momentum=0.):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate 
        self.decay = decay 
        self.iterations = 0 
        self.momentum = momentum

    def pre_update_params(self): 
        if self.decay: 
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

    def update_params(self, layer):
        if self.momentum: 
            if not hasattr(layer, 'weight_momentums'): 
                layer.weight_momentums = np.zeros_like(layer.weights) 
                layer.bias_momentums = np.zeros_like(layer.biases)
            
            weight_updates = self.momentum * layer.weight_momentums - self.current_learning_rate * layer.dweights 
            layer.weight_momentums = weight_updates 
            bias_updates = self.momentum * layer.bias_momentums - self.current_learning_rate * layer.dbiases 
            layer.bias_momentums = bias_updates
        else:
            weight_updates = -self.learning_rate * layer.dweights
            bias_updates = -self.learning_rate * layer.dbiases
        
        layer.weights += weight_updates
        layer.biases += bias_updates

    def post_update_params(self):
        self.iterations += 1

In [11]:
class Network:
    def __init__(self, layers=[], optimizer=None):
        self.layers = layers
        self.optimizer = optimizer
    
    def add(self, layer):
        self.layers.append(layer)
    
    def forward_propagation(self, x_batch, y_batch):
        prev_layer = None
        for layer in self.layers:
            layer_input = x_batch if prev_layer is None else prev_layer.output
            if isinstance(layer, ActivationSoftmaxLossCategoricalCrossentropy):
                loss = layer.forward(layer_input, y_batch)
            else:
                layer.forward(layer_input)
            prev_layer = layer
        return loss
    
    def back_propagation(self, y_batch):
        layers_reversed = self.layers[::-1]
        prev_layer = None
        adjustable_layers = (LayerDense, LayerConvolutional)
        for layer in layers_reversed:
            if isinstance(layer, ActivationSoftmaxLossCategoricalCrossentropy):
                layer.backward(layer.output, y_batch)
            else:
                layer.backward(prev_layer.dinputs)
            prev_layer = layer
        self.optimizer.pre_update_params()
        for layer in self.layers:
            if isinstance(layer, adjustable_layers):
                self.optimizer.update_params(layer)
        self.optimizer.post_update_params()
    
    def train(self, x, y, epochs, batch_size):
        for i in range(epochs):
            for j in range(0, len(x), batch_size):
                x_batch = x[j:j+batch_size]
                y_batch = y[j:j+batch_size]
                loss = self.forward_propagation(x_batch, y_batch)
                self.back_propagation(y_batch)
                predictions = np.argmax(self.layers[-1].output, axis=1)
                accuracy = np.mean(predictions == np.argmax(y_batch, axis=1))
            print(f'epoch: {i}, ' + f'acc: {accuracy:.3f}, ' + f'loss: {loss:.3f}')

# MNIST Model - Dense layers

In [12]:
x, num_inputs = read_mnist_images('mnist/train-images.idx3-ubyte')
x = x.reshape((-1, 1, 28, 28))
x = x.reshape(-1, 784) / 255.
y = one_hot_encode(read_mnist_labels('mnist/train-labels.idx1-ubyte'), 10)

In [13]:
network1 = Network([
    LayerDense(784, 10),
    ActivationReLU(),
    LayerDense(10, 10),
    ActivationSoftmaxLossCategoricalCrossentropy()
], OptimizerSGD())
epochs = 100
batch_size = 60
x = x.reshape(-1, 784)
network1.train(x, y, epochs, batch_size)

epoch: 0, acc: 0.933, loss: 12.840
epoch: 1, acc: 0.950, loss: 11.104
epoch: 2, acc: 0.950, loss: 10.170
epoch: 3, acc: 0.933, loss: 9.811
epoch: 4, acc: 0.933, loss: 9.641
epoch: 5, acc: 0.933, loss: 9.887
epoch: 6, acc: 0.933, loss: 10.171
epoch: 7, acc: 0.933, loss: 9.895
epoch: 8, acc: 0.950, loss: 9.220
epoch: 9, acc: 0.950, loss: 8.129
epoch: 10, acc: 0.933, loss: 7.356
epoch: 11, acc: 0.933, loss: 6.931
epoch: 12, acc: 0.967, loss: 6.530
epoch: 13, acc: 0.967, loss: 5.979
epoch: 14, acc: 0.967, loss: 5.685
epoch: 15, acc: 0.967, loss: 5.499
epoch: 16, acc: 0.967, loss: 5.319
epoch: 17, acc: 0.967, loss: 4.825
epoch: 18, acc: 0.967, loss: 4.787
epoch: 19, acc: 0.967, loss: 4.659
epoch: 20, acc: 0.967, loss: 4.545
epoch: 21, acc: 0.967, loss: 4.502
epoch: 22, acc: 0.967, loss: 4.518
epoch: 23, acc: 0.967, loss: 4.502
epoch: 24, acc: 0.967, loss: 4.429
epoch: 25, acc: 0.967, loss: 4.550
epoch: 26, acc: 0.983, loss: 4.366
epoch: 27, acc: 0.983, loss: 4.437
epoch: 28, acc: 0.983, los

KeyboardInterrupt: 

In [14]:
count = 0
for test_sample_num in range(10000):
    x_test, num_inputs = read_mnist_images('mnist/t10k-images.idx3-ubyte')
    x_test_flat = x_test.reshape(-1, 784) / 255.
    y_test = one_hot_encode(read_mnist_labels('mnist/t10k-labels.idx1-ubyte'), 10)
    network1.forward_propagation(x_test_flat[test_sample_num:test_sample_num+1, :], y_test)
    pred = np.argmax(network1.layers[-1].output)
    correct = np.argmax(y_test[test_sample_num])
    if pred != correct:
        count += 1
print(f'Accuracy on eval dataset{100 - count/10000 * 100}')


650
Network Prediction: 6
Correct Prediction: 6


# Fashion MNIST model - Dense layers

In [16]:
x_fashion, num_inputs_fashion = read_mnist_images('fashion_mnist/train-images-idx3-ubyte')
x_fashion = x_fashion.reshape(-1, 784) / 255.
y_fashion = one_hot_encode(read_mnist_labels('fashion_mnist/train-labels-idx1-ubyte'), 10)

In [19]:
network = Network([
    LayerDense(784, 10),
    ActivationReLU(),
    LayerDense(10, 10),
    ActivationReLU(),
    LayerDense(10, 10),
    ActivationSoftmaxLossCategoricalCrossentropy()
], OptimizerSGD(learning_rate=0.01))

epochs = 30
batch_size = 10

network.train(x_fashion, y_fashion, epochs, batch_size)

epoch: 0, acc: 1.000, loss: 7.281
epoch: 1, acc: 0.900, loss: 5.507
epoch: 2, acc: 0.900, loss: 4.324
epoch: 3, acc: 0.900, loss: 3.724
epoch: 4, acc: 0.900, loss: 3.437
epoch: 5, acc: 0.900, loss: 3.294
epoch: 6, acc: 0.900, loss: 3.199
epoch: 7, acc: 0.900, loss: 3.136
epoch: 8, acc: 0.900, loss: 3.095
epoch: 9, acc: 0.900, loss: 3.058
epoch: 10, acc: 0.900, loss: 3.032
epoch: 11, acc: 0.900, loss: 3.013
epoch: 12, acc: 0.900, loss: 2.995
epoch: 13, acc: 0.900, loss: 2.976
epoch: 14, acc: 0.900, loss: 2.961
epoch: 15, acc: 0.800, loss: 2.950
epoch: 16, acc: 0.800, loss: 2.936
epoch: 17, acc: 0.800, loss: 2.921
epoch: 18, acc: 0.800, loss: 2.895
epoch: 19, acc: 0.800, loss: 2.865
epoch: 20, acc: 0.900, loss: 2.839
epoch: 21, acc: 0.900, loss: 2.813
epoch: 22, acc: 0.900, loss: 2.788
epoch: 23, acc: 0.900, loss: 2.763
epoch: 24, acc: 0.900, loss: 2.743
epoch: 25, acc: 0.900, loss: 2.719
epoch: 26, acc: 0.900, loss: 2.697
epoch: 27, acc: 0.900, loss: 2.673
epoch: 28, acc: 0.900, loss: 2

In [None]:
test_sample_num = 301
x_test, num_inputs = read_mnist_images('fashion_mnist/t10k-images-idx3-ubyte')
x_test_flat = x_test.reshape(-1, 784) / 255.
y_test = one_hot_encode(read_mnist_labels('fashion_mnist/t10k-labels-idx1-ubyte'), 10)
network.forward_propagation(x_test_flat[test_sample_num:test_sample_num+1, :], y_test)
print(f'Network Prediction: {np.argmax(network.layers[-1].output)}')
print(f'Correct Prediction: {np.argmax(y_test[test_sample_num])}')
plt.imshow(x_test[test_sample_num], cmap='gray')


In [22]:
count = 0
for test_sample_num in range(10000):
    x_test, num_inputs = read_mnist_images('fashion_mnist/t10k-images-idx3-ubyte')
    x_test_flat = x_test.reshape(-1, 784) / 255.
    y_test = one_hot_encode(read_mnist_labels('fashion_mnist/t10k-labels-idx1-ubyte'), 10)
    network.forward_propagation(x_test_flat[test_sample_num:test_sample_num+1, :], y_test)
    pred = np.argmax(network.layers[-1].output)
    correct = np.argmax(y_test[test_sample_num])
    if pred != correct:
        count += 1
print(f'Accuracy on eval dataset{100 - count/10000 * 100}')


1854
Network Prediction: 5
Correct Prediction: 5


# Fashion MNIST model - CNN Architecture

In [25]:
model  = tf.keras.models.Sequential([
    tf.keras.layers.Dense(784, activation='relu'),
    tf.keras.layers.Dense(10, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

In [31]:
model.summary()

In [34]:
print(model.optimizer)

<keras.src.optimizers.sgd.SGD object at 0x15f7d7650>


In [27]:
model.compile(optimizer='SGD',
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

2024-04-02 08:32:40.933721: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M1
2024-04-02 08:32:40.933758: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2024-04-02 08:32:40.933767: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2024-04-02 08:32:40.934080: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-04-02 08:32:40.934109: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [18]:
x_fashion, num_inputs_fashion = read_mnist_images('fashion_mnist/train-images-idx3-ubyte')
x_fashion = x_fashion.reshape(-1, 784) / 255.
y_fashion = one_hot_encode(read_mnist_labels('fashion_mnist/train-labels-idx1-ubyte'), 10)

In [29]:
history = model.fit(x_fashion, y_fashion, epochs=20, batch_size=20)

Epoch 1/20


  output, from_logits = _get_logits(
2024-04-02 08:32:44.486004: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m3000/3000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 8ms/step - accuracy: 0.6642 - loss: 1.0228
Epoch 2/20
[1m2368/3000[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m5s[0m 9ms/step - accuracy: 0.8375 - loss: 0.4724

KeyboardInterrupt: 

In [172]:
x_test, num_inputs = read_mnist_images('fashion_mnist/t10k-images-idx3-ubyte')
x_test_flat = x_test.reshape(-1, 784) / 255.
y_test = one_hot_encode(read_mnist_labels('fashion_mnist/t10k-labels-idx1-ubyte'), 10)