In [196]:
import pickle
import tarfile
from random import choices, choice
from math import inf
from itertools import product, cycle
import numpy as np
import matplotlib.pyplot as plt
import requests
from tqdm import tqdm

In [197]:
np.random.seed(321)

# Downloading the dataset

In [198]:
def unpickle(file):
    """
    source: https://www.cs.toronto.edu/~kriz/cifar.html
    """
    with open(file, 'rb') as fo:
        data = pickle.load(fo, encoding='bytes')
    return data

In [199]:
dataset = requests.get('https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz')

with open('cifar-10-python.tar.gz', 'wb') as dataset_file:
    dataset_file.write(dataset.content)

In [200]:
with tarfile.open('cifar-10-python.tar.gz') as tar:
    tar.extractall()

# "Training" the model or preprocessing the dataset

In [201]:
def process_image(data: np.ndarray) -> np.ndarray:
    """
    
    takes image as a row of 3072 elements, 
    where the first 1024 entries contain the red channel values,
    the next 1024 the green, and the final 1024 the blue.
    
    returns image as an array of shape (32, 32, 3)
    
    """
    red_channel, green_channel, blue_channel = np.split(data, 3)
    pixels = np.stack((red_channel, green_channel, blue_channel), axis=-1)
    return np.reshape(pixels, (32, 32, 3))

In [202]:
def process_batch(batch: np.ndarray) -> tuple[list[np.ndarray], list[int]]:
    """
    
    processes a batch and returns a tuple of a list of processed images with shape (32, 32, 3) and list of labels
    
    """
    batch_images = list(map(process_image, batch[b'data']))
    batch_labels = batch[b'labels']
    
    return batch_images, batch_labels

In [251]:
batch_filenames = ['cifar-10-batches-py/data_batch_1',
                   'cifar-10-batches-py/data_batch_2',
                   'cifar-10-batches-py/data_batch_3', 
                   'cifar-10-batches-py/data_batch_4',
                   'cifar-10-batches-py/data_batch_5']

train_images, raw_train_labels = [], []
for filename in batch_filenames:
    current_batch = unpickle(filename)
    images, labels = process_batch(current_batch)
    train_images.extend(images)
    raw_train_labels.extend(labels)

In [252]:
def one_hot_encoder(labels: list[int]) -> list[np.ndarray]:
    encoded: list[np.ndarray] = []
    for label in labels:
        encoded.append(np.zeros(10))
        encoded[-1][label] = 1.0
    return encoded

In [253]:
train_labels = one_hot_encoder(raw_train_labels)

In [254]:
test_batch = unpickle('cifar-10-batches-py/test_batch')
test_images, raw_test_labels = process_batch(test_batch)

In [255]:
test_labels = one_hot_encoder(raw_test_labels)

In [256]:
meta_data = unpickle('cifar-10-batches-py/batches.meta')
label_names = list(map(bytes.decode, meta_data[b'label_names']))

In [257]:
from abc import ABC, abstractmethod


class Layer(ABC):
    @abstractmethod
    def __call__(self, x: np.ndarray, train=False) -> np.ndarray:
        pass
    
    @abstractmethod
    def backward(self, error: np.ndarray, learning_rate: float) -> np.ndarray:
        pass


class Linear(Layer):
    def __init__(self, input_size: int, output_size: int):
        self.weights = np.random.randn(input_size, output_size)
        self.bias = np.random.randn(1, output_size)
        
        self.input: np.ndarray = np.array([])
        self.output: np.ndarray = np.array([])
    
    def __call__(self, x: np.ndarray, train=False) -> np.ndarray:
        if train:
            self.input = x
            self.output = x @ self.weights + self.bias
            return self.output
        else:
            return x @ self.weights + self.bias
    
    def backward(self, error: np.ndarray, learning_rate: float) -> np.ndarray:
        input_error = error @ self.weights.T
        weights_error = self.input.T @ error
        
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * np.sum(error, axis=0)
        return input_error

In [258]:
class Activation(ABC):
    @abstractmethod
    def __call__(self, x: np.ndarray, train=False) -> np.ndarray:
        pass
    
    @abstractmethod
    def backward(self, error: np.ndarray, learning_rate: float) -> np.ndarray:
        pass


class Sigmoid(Activation):
    def __call__(self, x: np.ndarray, train=False) -> np.ndarray:
        if train:
            self.output = np.piecewise(
                x, 
                [x < 0, x >= 0], 
                [lambda a: np.exp(a) / (1 + np.exp(a)), lambda a: 1 / (1 + np.exp(-a))]
            )
            return self.output
        else:
            return np.piecewise(
                x, 
                [x < 0, x >= 0], 
                [lambda a: np.exp(a) / (1 + np.exp(a)), lambda a: 1 / (1 + np.exp(-a))]
            )
    
    def backward(self, error: np.ndarray, learning_rate: float) -> np.ndarray:
        return error * self.output * (1 - self.output)


class Softmax(Activation):
    def __call__(self, x: np.ndarray, train=False, normalized=True) -> np.ndarray:
        # print(f'{x=}')
        if normalized:
            exponents = np.exp(x - np.max(x, axis=1, keepdims=True))
        else:
            exponents = np.exp(x)
        
        probabilities = exponents / np.sum(exponents, axis=1, keepdims=True)
        # print(f'{probabilities=}')
        
        return probabilities
    
    def backward(self, error: np.ndarray, learning_rate: float) -> np.ndarray:
        return error  # only if it comes before cross entropy


class ReLU(Activation):
    def __call__(self, x: np.ndarray, train=False):
        self.output = np.maximum(0, x)
        return self.output
    
    def backward(self, error: np.ndarray, learning_rate: float) -> np.ndarray:
        return error * (self.output > 0)

In [259]:
class Loss(ABC):
    @abstractmethod
    def __call__(self, predicted: np.ndarray, actual: np.ndarray):
        pass
    
    @abstractmethod
    def get_gradient(self):
        pass


class CrossEntropyLoss(Loss):
    def __init__(self):
        self.gradient = None
    
    def __call__(self, predicted: np.ndarray, actual: np.ndarray):
        log_predicted = np.log(np.clip(predicted, 1e-7, 1 - 1e-7))
        self.gradient = (predicted - actual) / len(actual)
        return -np.sum(actual * log_predicted, axis=1)
    
    def get_gradient(self):
        return self.gradient

In [260]:
class Model:
    def __init__(self, 
                 layers: list[Layer | Activation],
                 loss: Loss):
        self.layers = layers
        self.loss = loss
    
    def __call__(self, x: np.ndarray, train=False) -> np.ndarray:
        result = x
        for layer in self.layers:
            result = layer(result, train=train)
        return result
    
    def backpropagation(self, learning_rate=0.001):
        error = self.loss.get_gradient()
        for layer in self.layers[::-1]:
            error = layer.backward(error, learning_rate)
    
    def fit(self, x, y, epochs, learning_rate=0.001):
        for i in range(epochs):
            loss = 0
            for batch_x, batch_y in zip(x, y):
                output = self(batch_x, train=True)
                loss += np.mean(self.loss(output, batch_y))
                self.backpropagation(learning_rate)
            print(f'epoch: {i} loss: {loss / len(x)}')
            

In [261]:
train_images = np.array(list(map(np.ndarray.flatten, train_images))) / 255.0
train_labels = np.array(train_labels)

In [262]:
train_images_batched = np.array_split(train_images, train_images.shape[0] // 64 + 1)
train_labels_batched = np.array_split(train_labels, train_images.shape[0] // 64 + 1)

In [267]:
model = Model([
    Linear(3072, 128),
    ReLU(),
    Linear(128, 128),
    ReLU(),
    Linear(128, 10),
    Softmax()],
    loss=CrossEntropyLoss()
)

In [271]:
model.fit(train_images_batched, train_labels_batched, 25, learning_rate=0.001)

epoch: 0 loss: 2.1544491036726323
epoch: 1 loss: 2.1510033462018185
epoch: 2 loss: 2.147736602008187
epoch: 3 loss: 2.144678413754983
epoch: 4 loss: 2.1417571416277084
epoch: 5 loss: 2.1389054254002704
epoch: 6 loss: 2.1360601023126167
epoch: 7 loss: 2.1334167874725827
epoch: 8 loss: 2.1308846204156953
epoch: 9 loss: 2.1283952281940914
epoch: 10 loss: 2.125969176847191
epoch: 11 loss: 2.123508346778144
epoch: 12 loss: 2.1211029618030426
epoch: 13 loss: 2.1187537281843336
epoch: 14 loss: 2.1164548457271746
epoch: 15 loss: 2.1141815139913294
epoch: 16 loss: 2.111978672629273
epoch: 17 loss: 2.1097852353855813
epoch: 18 loss: 2.1077111636249977
epoch: 19 loss: 2.1056874639701686
epoch: 20 loss: 2.1037209749013988
epoch: 21 loss: 2.101715520330901
epoch: 22 loss: 2.0996572458895866
epoch: 23 loss: 2.0976644901365966
epoch: 24 loss: 2.0958503939032007


In [272]:
test_images = list(map(np.ndarray.flatten, test_images))

In [273]:
correct = 0
for image, label in zip(test_images, test_labels):
    prediction = model(np.array([image]) / 255.)
    if prediction.argmax() == label.argmax():
        correct += 1
print(f'correct_predictions: {correct}/{len(test_images)}\naccuracy: {correct / len(test_images)}')

correct_predictions: 2436/10000
accuracy: 0.2436


In [249]:
for image in test_images[:10]:
    print(model(np.array([image]) / 255.).argmax())

3
3
3
3
3
3
3
3
3
3


In [250]:
for label in test_labels[:10]:
    print(label.argmax())

3
8
8
0
6
6
1
6
3
1
