In [None]:
import numpy as np
from typing import Tuple

import torch
from tqdm import tqdm

To begin with, we will create an architecture for reading images. Since we will implement a network with a fairly simple architecture, then the data will be greatly simplified. Namely, all images will be converted to grayscale

In the folder with the dataset, subfolders with pictures of each class should be allocated

CIFAR10 will be used as a training dataset. You can download it [here](http://www.cs.toronto.edu/~kriz/cifar.html) 

In [None]:
PATH_TO_DATASET = # The path to the unpacked dataset is here

In [None]:
from typing import Tuple, List, Union

import albumentations as A
import numpy as np
import torch
import os
import cv2
import numba


def read_images(image_dir_path: str, labels2names: dict = None) -> Tuple[List[np.ndarray], List[int], dict]:
    names2labels = None
    if labels2names is None:
        labels2names = {}
    else:
        names2labels = {v: k for k, v in labels2names.items()}
    labels = []
    images = []

    for i, img_dir in enumerate(os.listdir(image_dir_path)):
        print(img_dir)
        if names2labels is None:
            labels2names[i] = img_dir
        for img in os.listdir(os.path.join(image_dir_path, img_dir)):
            image = cv2.cvtColor(cv2.imread(os.path.join(image_dir_path, img_dir, img)), cv2.COLOR_BGR2GRAY)
            image[image <= 98] = 1
            image[image > 98] = 0
            images.append(crop_borders(image))
            if names2labels is not None:
                labels.append(names2labels[img_dir])
            else:
                labels.append(i)

    return images, labels, labels2names


def crop_borders(image: np.ndarray) -> np.ndarray:
    try:
        mask = image == 0

        coords = np.array(np.nonzero(~mask))
        top_left = np.min(coords, axis=1)
        bottom_right = np.max(coords, axis=1)

        out = image[top_left[0] - 5:bottom_right[0] + 5, top_left[1] - 5:bottom_right[1] + 5]
        if out.shape[0] == 0 or out.shape[1] == 0:
            center_y, center_x = image.shape[0] // 2, image.shape[1] // 2
            return image[center_y - center_y // 2:center_y + center_y // 2,
                   center_x - center_x // 2: center_x + center_x // 2]

        return out
    except Exception:
        return image


@numba.jit(nopython=True)
def compute_new_sample(image:np.ndarray) -> List[Union[int, np.ndarray]]:
    cell_width, cell_height = 2, 2
    result = [np.sum(image[i*cell_height, :]) for i in range(image.shape[0] // cell_height )]
    result.extend([np.sum(image[:, j * cell_width]) for j in range(image.shape[0] // cell_width )])
    return result



class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, image_dir_path: str = PATH_TO_DATASET, dataset_len: int = 60000, labels2names: dict = None):
        images, labels, labels2names = read_images(image_dir_path, labels2names)
        self.images = images
        self.labels = labels
        self.labels2names = labels2names
        self.transform = A.Compose([
            #A.Downscale(p=0.3),
            #A.GlassBlur(p=0.3),
            #A.GaussianBlur(p=0.3),
            A.Resize(32, 32)
        ])
        self.dataset_len = dataset_len

    def __len__(self):
        return self.dataset_len

    def __getitem__(self, idx: int):
        idx = idx % len(self.labels)
        image = self.transform(image=self.images[idx])["image"]
        # cv2.imshow("", (image * 255))
        # cv2.waitKey()
        return compute_new_sample(image), self.labels[idx]

    @property
    def labels2names_(self):
        return self.labels2names


# Linear layer


In [None]:
def linear(X: np.ndarray, W: np.ndarray, b: np.ndarray):
    return np.dot(X, W) + b, (X, W, b)

def linear_backward(d: np.ndarray, prev: tuple):
    X, W, b = prev
    "write a backwardation here. don't forget about dimensions"
    dX = ...
    dW = ...
    db = ...
    return dX, dW, db



# Activation funcs

## ReLU

In [None]:
def relu(X: np.ndarray):
    return np.maximum(0, X), X

def relu_backward(d: np.ndarray, prev: np.ndarray):
    X = prev.copy()
    dx = d.copy()
    "write a backwardation here"
    return ...

In [None]:
def sigmoid(X: np.ndarray):
    return 1 / (1 + np.exp(-X))


def sigmoid_backward(d: np.ndarray, prev: np.ndarray):
    X = prev.copy()
    "write a backwardation here"
    return ...

# Loss funcs

In [None]:
def mse_loss(X: np.ndarray, y: np.ndarray):
    preds = sigmoid(X)
    y_encoded = np.zeros((y.shape[0], y.max(initial=0) + 1))
    y_encoded[np.arange(y.shape[0]), y] = 1
    loss = np.mean((y_encoded - preds) ** 2)
    N = X.shape[0]
    dX = sigmoid_backward(2 * (preds - y_encoded), X)
    return loss, dX


def softmax(X: np.ndarray):
    e_x = np.exp(X - np.max(X))
    if len(e_x.shape) > 1:
        return e_x / np.sum(e_x, axis=1, keepdims=True)
    return e_x / np.sum(e_x)


def softmax_loss(X: np.ndarray, y: np.ndarray):
    logits = X - np.max(X, axis=1, keepdims=True)
    Z = np.sum(np.exp(logits), axis=1, keepdims=True)
    log_probs = logits - np.log(Z)
    probs = np.exp(log_probs)
    N = X.shape[0]
    loss = -np.sum(log_probs[np.arange(N), y]) / N
    dX = probs.copy()
    dX[np.arange(N), y] -= 1
    dX /= N
    return loss, dX

# Optimizer

In [None]:
class Optimizer:
    def __init__(self, optim_type: str = 'sgd', params: dict = None):
        if params is None:
            self.params = {}
        else:
            self.params = params
        self.params.setdefault('lr', 1e-3)
        self.params.setdefault('momentum', 0.9)
        self.params.setdefault("beta1", 0.9)
        self.params.setdefault("beta2", 0.999)

        if optim_type == 'sgd':
            self.optimize = self.sgd
        elif optim_type == 'momentum':
            self.optimize = self.momentum
        elif optim_type == 'adam':
            self.optimize = self.adam

    def __call__(self, W: np.ndarray, dW: np.ndarray, key: str):
        return self.optimize(W, dW, key)

    def sgd(self, W: np.ndarray, dW: np.ndarray, key: str) -> np.ndarray:
        W -= self.params['lr'] * dW
        return W

    def momentum(self, W: np.ndarray, dW: np.ndarray, key: str) -> np.ndarray:
        v = self.params.get("velocity %s" % key, np.zeros_like(W))
        v = self.params["momentum"] * v + self.params["lr"] * dW
        W -= v
        self.params["velocity %s" % key] = v
        return W

    def adam(self, W: np.ndarray, dW: np.ndarray, key: str) -> np.ndarray:
        m = self.params.get("m %s" % key, np.zeros_like(W))
        v = self.params.get("v %s" % key, np.zeros_like(W))
        self.params.setdefault("t %s" % key, 0)
        self.params['t %s' % key] += 1
        self.params['m %s' % key] = self.params["beta1"] * m + (1 - self.params["beta1"]) * dW
        self.params['v %s' % key] = self.params["beta2"] * v + (1 - self.params["beta2"]) * dW ** 2
        mt = self.params['m %s' % key] / (1 - np.power(self.params["beta1"], self.params["t %s" % key]))
        vt = self.params["v %s" % key] / (1 - np.power(self.params["beta2"], self.params["t %s" % key]))
        W -= self.params["lr"] * mt / (np.sqrt(vt) + 1e-8)
        return W


# Model

In [None]:
class NNet:
    def __init__(self, hidden_dims: list = [500], num_cls: int = 10,
                 input_dim: int = 32):
        ## architecture
        # We recommend using at least 300-500 layers for this task.
        # To declare them, just use a loop
        self.params = {}
        std = 1e-4
        self.params['W1'] = std * np.random.randn(input_dim, hidden_dims[0])
        self.params['b1'] = np.zeros(hidden_dims[0])

        """Your layers are here """
        ...
        # Don't forget that the last layer should have an output dimension 
        # equal to the number of classes. In this case - 10
        self.num_layers = len(hidden_dims) + 1
        ## params
        # Don't forget to declare loss functions, optimizer, etc
        self.output_activation = ...
        self.criterion = ...
        self.optimizer = ...
        self.loss_history = []

        ## datasets
        # We will still use some help from pytorch. Of course,
        # it would be possible to divide the data into batches by hand, but now it makes no sense
        traindataset = ImageDataset(dataset_len=30000)
        self.labels2names = traindataset.labels2names_
        self.trainloader = torch.utils.data.DataLoader(
            traindataset, batch_size=50, shuffle=True, num_workers=2
        )

        self.testloader = torch.utils.data.DataLoader(
            ImageDataset(dataset_len=30000, labels2names=self.labels2names),
            batch_size=50, shuffle=True, num_workers=2
        )
        self.valloader = torch.utils.data.DataLoader(
            ImageDataset(dataset_len=30000, labels2names=self.labels2names),
            batch_size=50, shuffle=True, num_workers=2
        )

    

    def forward(self, X: np.ndarray) -> Tuple[list, dict]:
        # Okay, now we need to make a function
        # that will run the data X through all those layers that we created
      
        return ... 

    def backward(self, X: np.ndarray, y: np.ndarray, cache: dict) -> Tuple[float, dict]:
        # Now we need to make a backwardation. 
        # Don't forget that you have already written functions for each layer type
        grads = {}
        loss, dOut = self.criterion(X, y)

        return loss, grads

    def _training_step(self, X_batch: np.ndarray, y_batch: np.ndarray, optimizer: Optimizer):
        # Now the training step. 
        # Everything is simple here, do forward, 
        # then backward and finally apply the optimizer
        
    def check_accuracy(self, X: np.ndarray, y: np.ndarray) -> float:
        preds = self._predict(X)
        preds = np.argmax(preds, axis=1)
        return np.mean(preds == y)

    def train(self, num_epochs: int = 10):
        # Okay, now it's time to make a training loop
        best_params = {}
        best_val_acc = -1
        for num_epoch in range(num_epochs):
            running_accuracy = 0
            i = 0 
            for data in tqdm(self.trainloader):
                # You need to go through the data for several epochs 
                # and perform a training step for each batch
                # Also, do not forget to accumulate accuracy for the subsequent evaluation of the model
                ...
                i += 1
            training_loss = np.mean(self.loss_history)
            train_acc = running_accuracy / i

            ## validation
            val_accuracy = 0
            val_i = 0
            for data in tqdm(self.valloader):
                # During validation, you only need to look at the accuracy of the model
                ...
                val_i += 1
            val_acc = val_accuracy / val_i
            self.loss_history = []

            print("%d epoch:\n training loss: %.4f\n " \
                  "training accuracy: %.4f\n validation accuracy: %.4f" % (num_epoch + 1,
                                                                           training_loss,train_acc, val_acc))
            # If the result achieved in this era is the best, it would be nice to keep the network parameters that provided it                                                              
            if ...:
                ...

        self.params = best_params
    

    def _predict(self, X: np.ndarray):
        Z, _ = self.forward(X)
        return self.output_activation(Z[-1])

    def predict(self, X: np.ndarray) -> str:
        prediction = self._predict(X)
        label = np.argmax(prediction)
        predictions = []
        for i, p in enumerate(prediction):
            predictions.append("%s : %.5f" % (self.labels2names[i], p))

        return "Prediction: %s \nProbabilities:\n%s" % (self.labels2names[label],
                                                        "\n".join(predictions))

    def test(self):
        running_accuracy = 0
        i = 0
        for data in tqdm(self.testloader):
            x, y = data
            y = y.numpy()
            x = torch.stack(x)
            x = torch.t(x).numpy()
            i += 1
            running_accuracy += self.check_accuracy(x, y)
        print( "Test accuracy: %.4f" % (running_accuracy / i))
        return running_accuracy / i 


# Test

Finally, we create a model, train it and test it. Of course, this is a very simple model and it is unlikely that you will get an accuracy of more than 20-25%, but it is still 2 times better than tossing a coin!

In [None]:
model = NNet()

In [None]:
model.train()

In [None]:
assert model.test() > 0.21