<a href="https://colab.research.google.com/github/PigStep/CIFAR-10-based-Content-categorizator/blob/main/notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# What is this notebook about
This notebook tracks experiments, code and data manipulation for model

# Data loading

In [2]:
import torchvision
import torch
from torchvision.transforms import transforms

In [13]:
batch_size = 256

transform = transforms.ToTensor()

# Getting CIFAR-10 dataset with batches
trainset = torchvision.datasets.CIFAR10(root="/content",train=True,transform=transform,download=True)
trainloader = torch.utils.data.DataLoader(dataset = trainset,batch_size=batch_size,shuffle=True)

testset = torchvision.datasets.CIFAR10(root="/content",train=False,transform=transform,download=True)
testloader = torch.utils.data.DataLoader(dataset = testset,batch_size=batch_size,shuffle=True)

In [14]:
image_tensor, label = trainset[0]

print(f"Shape of tensor: {image_tensor.shape}")
print(f"Label: {label}")

Shape of tensor: torch.Size([3, 32, 32])
Label: 6


## Data normalization

This notebook uses torch's `ToTensor() + Normalize()` functions. Firstly we need to get some statistics before Normalization

In [30]:
channels_sum, channels_squared_sum, num_pixels = 0,0,0

for images, _ in trainloader:
  # images: [batch_size, channels, height, width]

  channels_sum += torch.sum(images, dim=[0,2,3])
  channels_squared_sum += torch.sum(images**2, dim = [0,2,3])
  num_pixels += images.numel()

mean = channels_sum / num_pixels
# std = sqrt(E[x^2] - (E[x])^2)
std = torch.sqrt((channels_squared_sum / num_pixels) - mean**2)

print(f"Channels means: {mean}")
print(f"Channels std: {std}")

Channels means: tensor([0.1638, 0.1607, 0.1488])
Channels std: tensor([0.2720, 0.2673, 0.2591])


In [35]:
transform = transforms.Compose(
    transforms.ToTensor(),
    transforms.Normalize(mean=mean,std=std)
)

SyntaxError: positional argument follows keyword argument (ipython-input-2800599942.py, line 3)

# Baseline model: self-coded CNN

In [12]:
import numpy as np

class baseCNN:
  def __init__(self):
    self.X = None

    self.filters_1 = None # For different convolutional layers
    self.filters_2 = None
    self.bias_1 = None
    self.bias_2 = None

    self.bias_dense = None
    self.weights_dense = None

  def convolutinal_layer(image,filters,stride=1):
    """
    Convolves image with filters.
    params:
      `image` - image as numpy array
      `filter` - collection of filters as numpy array
      `stride` - stride for filter window
    """
    (image_H,image_W,image_channels) = image.shape
    (num_filters, filter_H, filter_W, filter_channels) = filters.shape

    out_H = (image_H - filter_H) // stride +1
    out_W = (image_W - filter_W) // stride +1
    feature_map = np.zeros((out_H,out_W,num_filters))

    for y in range(0,image_H - filter_H + 1, stride):
      for x in range(0,image_W - filter_W + 1, stride):
        # Get filter slice of image
        filter_slice = image[y:y+filter_H, x:x+filter_W, :]

        for i, filter in enumerate(filters):
          # Get matrix features for every channels
          filter_matrix_channels = np.sum(filter * filter_slice)
          # Save filter matrix features at coordinates position
          feature_map[y // stride, x // stride, i] = filter_matrix_channels

    return feature_map

  def pooling_layer(features_maps,pooling_size=2,stride=2):
      """
      Max pooling images

      params:
        `feature_maps` - maps of the features
        `pooling_size` - size of the window
        `stride` - stride of the window
      """
      (feature_map_H,feature_map_W,num_filters) = features_maps.shape

      out_H = (feature_map_H - pooling_size) // stride +1
      out_W = (feature_map_W - pooling_size) // stride +1

      pooling_map = np.zeros((out_H,out_W,num_filters))

      for y in range(0,(feature_map_H - pooling_size)//stride +1, stride):
        for x in range(0,(feature_map_W - pooling_size)//stride +1, stride):
          for i in range(num_filters):
            feature_window = features_maps[i][y:y+pooling_size,x:x+pooling_size]
            pooling_map[y // stride,x // stride,i] = np.max(feature_window)

      return pooling_map

  def fully_connected_layer(pooling_map,weights,bias):
      """
      Fully connected layer for weighted sum

      params:
        `pooling_map` - map of the features
        `weights` - weights of the layer
        `bias` - bias of the layer
      """
      vector = pooling_map.flatten()
      return np.dot(vector,weights) + bias

  def softmax(x):
    return np.exp(x) / np.sum(np.exp(x))

  def cross_entropy(y_pred,y_true):
    return - np.sum(y_true * np.log(y_pred))

  def dense_backprop(dL_dz, x, weights, bias, learning_rate = 0.1):
    """
    Backpropagation for dense layer
    params:
    `dL_dz` - derivative of the loss with respect to the output of the layer
    `x` - input
    `weights` - weights of the layer
    `bias` - bias of the layer
    `learning_rate` - learning rate for gradient descent
    """

    dL_dw = dL_dz * x
    dL_db = dL_dz
    dL_dx = dL_dz * weights

    weights = weights - learning_rate * dL_dw
    bias = bias - learning_rate * dL_db

    return weights, bias, dL_dx

  def pooling_backprob(dl_dp,x,pooling_size=2,stride=2):
        """
        Backpropagation for pooling layer
        params:
          `dl_dp` - derivative of the loss with respect to the output of the layer
          `x` - input to pooling layer
          `pooling_size` - size of the window
          `stride` - stride of the window
        """
        (in_H, in_W, in_C) = x.shape
        dL_dx = np.zeros(x.shape)

        for y in range(0, (in_H - pooling_size) // stride +1, stride):
          for x in range(0, (in_W - pooling_size) // stride +1, stride):
            for c in range(in_C):
              feature_window = x[y:y+pooling_size, x:x+pooling_size, c]

              max_index = np.argmax(feature_window)
              (max_y,max_x) = max_index // pooling_size, max_index % pooling_size

              # Fill previous masked points by derrivative
              dL_dx[y + max_y, x + max_x, c] = dl_dp[y // stride, x // stride, c]

        return dL_dx

  def conv_backprob(dL_dconv,X,weights,bias,stride=1,learning_rate=0.1):
    """
    Backpropagation for convolutional layer
    params:
      `dL_dconv` - derivative of the loss with respect to the output of the layer
      `X` - input to convolutional layer
      `weights` - weights of the layer (filters)
      `bias` - bias of the layer
      `stride` - stride of the window
      `learning_rate` - learning rate for gradient descent
    """

    (in_H, in_W, in_C) = X.shape
    (num_filters, filt_H, filt_W, in_C) = weights.shape
    dL_dx = np.zeros(X.shape)
    dL_dW = np.zeros(weights.shape)
    dL_db = np.zeros(num_filters)

    for y in range(dL_dconv.shape[0]):
      for x in range(dL_dconv.shape[1]):
        for f in range(num_filters):
            window = X[y:y+filt_H, x:x+filt_W, :]

            # Get gradient from window and updating weights
            dL_dW[f] += dL_dconv[y,x,f] * window
            dL_db[f] += dL_dconv[y,x,f]

            dL_dx[y:y+filt_H, x:x+filt_W, :] += dL_dconv[y,x,f] * np.flip(weights[f], axis=(0,1))

    weights -= learning_rate * dL_dW
    bias -= learning_rate * dL_db

    return weights, bias, dL_dx

  def forward_pass(self,X):
    X = self.convolutinal_layer(X,self.filters_1)
    self.X_1 = X # Save for backprob

    X = self.pooling_layer(X)
    self.X_2 = X

    X = self.convolutinal_layer(X,self.filters_2)
    self.X_3 = X

    X = self.pooling_layer(X)
    self.X_4 = X

    X = self.fully_connected_layer(X,self.weights_dense,self.bias_dense)
    self.X_5 = X

    X = self.softmax(X)
    self.X_6 = X

    return X

  def backprop(self, y_true):
        dL_dz = self.X_6 - y_true
        self.weights_dense, self.bias_dense, dL_dx = self.dense_backprop(dL_dz, self.X_5,
                                                        self.weights_dense, self.bias_dense, learning_rate = 0.1)

        dL_dx = self.pooling_backprob(dL_dx,self.X_4)
        self.filters_2, self.bias_2, dL_dx = self.conv_backprob(dL_dx,self.X_3,
                                              self.filters_2,self.bias_2,stride=1,learning_rate=0.1)

        dL_dx = self.pooling_backprob(dL_dx,self.X_2)
        self.filters_1, self.bias_1, dL_dx = self.conv_backprob(dL_dx,self.X_1,
                                              self.filters_1,self.bias_1,stride=1,learning_rate=0.1)

  def train(self,X,y, batch_size, epochs=1000):
    epoch = 0
    losses = []

    while epoch <= epochs:
      for batch in range(0,X.shape[0],batch_size):
          X_batch = X[batch:batch+batch_size]
          y_batch = y[batch:batch+batch_size]

          y_pred = self.forward_pass(self,X_batch)

          loss = self.cross_entropy(y_pred, y_batch)
          losses.append(loss)

          if(loss < 0.1):
              return losses

          self.backprop(self, y_batch)
          epoch+=1

    print("Train was comlete")
    return losses