<a href="https://colab.research.google.com/github/VygintasMar/Neural-networks-from-scratch/blob/main/CNNClassifierWithMaxPool.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from scipy import signal

In [None]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward(self, input):
        # TODO: return output
        pass

    def backward(self, output_gradient, learning_rate):
        # TODO: update parameters and return input gradient
        pass

layer architecture below

In [None]:
class Convolutional(Layer):
    def __init__(self, input_shape, kernel_size, depth):
        input_depth, input_height, input_width = input_shape
        self.depth = depth
        self.input_shape = input_shape
        self.input_depth = input_depth
        self.output_shape = (depth, input_height - kernel_size + 1, input_width - kernel_size + 1)
        self.kernels_shape = (depth, input_depth, kernel_size, kernel_size)
        self.kernels = np.random.randn(*self.kernels_shape)
        self.biases = np.random.randn(*self.output_shape)

    def forward(self, input):
        self.input = input

        self.output = np.copy(self.biases)
        for i in range(self.depth):
            for j in range(self.input_depth):
                self.output[i] += signal.correlate2d(self.input[j], self.kernels[i, j], "valid")


        return self.output

    def backward(self, output_gradient, learning_rate):
        kernels_gradient = np.zeros(self.kernels_shape)
        input_gradient = np.zeros(self.input_shape)


        for i in range(self.depth):
            for j in range(self.input_depth):
                kernels_gradient[i, j] = signal.correlate2d(self.input[j], output_gradient[i], "valid")
                input_gradient[j] += signal.convolve2d(output_gradient[i], self.kernels[i, j], "full")

        self.kernels -= learning_rate * kernels_gradient
        self.biases -= learning_rate * output_gradient
        return input_gradient


In [None]:
class Dense(Layer):
    def __init__(self, input_size, output_size):
        self.weights = np.random.randn(output_size, input_size)
        self.bias = np.random.randn(output_size, 1)

    def forward(self, input):
        self.input = input
        #print(np.shape(input), 'shape before dense')
        return np.dot(self.weights, self.input) + self.bias

    def backward(self, output_gradient, learning_rate):
        weights_gradient = np.dot(output_gradient, self.input.T)
        input_gradient = np.dot(self.weights.T, output_gradient)
        self.weights -= learning_rate * weights_gradient
        self.bias -= learning_rate * output_gradient
        return input_gradient

In [None]:
class Reshape(Layer):
    def __init__(self, input_shape, output_shape):
        self.input_shape = input_shape
        self.output_shape = output_shape

    def forward(self, input):
        #print(np.shape(input), 'shape before reshape')
        #print(np.shape(np.reshape(input, self.output_shape)), 'shape after reshape')
        return np.reshape(input, self.output_shape)

    def backward(self, output_gradient, learning_rate):
        #print(np.shape(input), 'shape after reshape')
        return np.reshape(output_gradient, self.input_shape)

In [None]:
import numpy as np

class MaxPool:
    def __init__(self, pool_size, stride):
        self.pool_size = pool_size  # Size of the pooling window (e.g., 2 for 2x2)
        self.stride = stride  # Stride with which the window moves across the input
        self.cache = None  # Cache to store information needed for the backward pass

    def forward(self, X):
        # Check if the input has 3 dimensions and reshape it to have a single channel

        if X.ndim == 3:
            X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)

        self.cache = X  # Store the input value for use in the backward pass

        n, h, w, c = X.shape
        h_out = 1 + (h - self.pool_size) // self.stride
        w_out = 1 + (w - self.pool_size) // self.stride

        output = np.zeros((n, h_out, w_out, c))

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self.stride
                h_end = h_start + self.pool_size
                w_start = j * self.stride
                w_end = w_start + self.pool_size

                output[:, i, j, :] = np.max(X[:, h_start:h_end, w_start:w_end, :], axis=(1, 2))

        # Reshape the output back to 3 dimensions if the original input was 3D
        if X.shape[3] == 1:
            output = output.reshape(n, h_out, w_out)


        return output


    def backward(self, d_out, learning_rate):

        X = self.cache

        n, h, w, c = X.shape
        h_out, w_out, _ = d_out.shape

        d_X = np.zeros_like(X)

        for i in range(h_out):
          for j in range(w_out):
              h_start = i * self.stride
              w_start = j * self.stride
              h_end = min(h_start + self.pool_size, h)
              w_end = min(w_start + self.pool_size, w)

              for c_i in range(c):  # Assuming 'c' is the number of channels
                  a = X[h_start:h_end, w_start:w_end, c_i]
                  if a.size > 0:  # Ensure 'a' is not empty
                      max_idx = np.unravel_index(np.argmax(a, axis=None), a.shape)
                      d_X[h_start:max_idx[0]+h_start, w_start:max_idx[1]+w_start, c_i] += d_out[i, j, c_i]

        return d_X


activation below

In [None]:
class Activation(Layer):
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime

    def forward(self, input):
        self.input = input

        return self.activation(self.input)

    def backward(self, output_gradient, learning_rate):
        return np.multiply(output_gradient, self.activation_prime(self.input))

In [None]:
class Tanh(Activation):
    def __init__(self):
        def tanh(x):
            return np.tanh(x)

        def tanh_prime(x):
            return 1 - np.tanh(x) ** 2

        super().__init__(tanh, tanh_prime)

class Sigmoid(Activation):
    def __init__(self):
        def sigmoid(x):
            return 1 / (1 + np.exp(-x))

        def sigmoid_prime(x):
            s = sigmoid(x)
            return s * (1 - s)

        super().__init__(sigmoid, sigmoid_prime)

class Softmax(Layer):
    def forward(self, input):
        tmp = np.exp(input)
        self.output = tmp / np.sum(tmp)
        return self.output

    def backward(self, output_gradient, learning_rate):
        # This version is faster than the one presented in the video
        n = np.size(self.output)
        return np.dot((np.identity(n) - self.output.T) * self.output, output_gradient)
        # Original formula:
        # tmp = np.tile(self.output, n)
        # return np.dot(tmp * (np.identity(n) - np.transpose(tmp)), output_gradient)

errors below

In [None]:
def mse(y_true, y_pred):

    return np.mean(np.power(y_true - y_pred, 2))

def mse_prime(y_true, y_pred):
    return 2 * (y_pred - y_true) / np.size(y_true)

def binary_cross_entropy(y_true, y_pred):
    return np.mean(-y_true * np.log(y_pred) - (1 - y_true) * np.log(1 - y_pred))

def binary_cross_entropy_prime(y_true, y_pred):
    return ((1 - y_true) / (1 - y_pred) - y_true / y_pred) / np.size(y_true)

training loop below

In [None]:
def predict(network, input):
    output = input
    for layer in network:
        output = layer.forward(output)
    return output

def train(network, loss, loss_prime, x_train, y_train, epochs = 1000, learning_rate = 0.1, verbose = True):
    for e in range(epochs):
        error = 0
        print(e, "  epoch    ")
        for x, y in zip(x_train, y_train):
            # forward
            att=0
            output = predict(network, x)


            # error
            error += loss(y, output)
            #print(error, "  error  ")

            # backward
            grad = loss_prime(y, output)
            ctt=0
            for layer in reversed(network):
                grad = layer.backward(grad, learning_rate)
                #print(grad, 'ep ', ctt)
                ctt+=1

        error /= len(x_train)
        if verbose:
            print(f"{e + 1}/{epochs}, error={error}")

procces data

In [None]:
#!unzip '/content/sample_data/train.zip'

In [None]:
dataDir = '/content/sample_data/tr'

In [None]:
X = []
y = []

import os
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from keras.preprocessing.image import img_to_array

for folder in os.listdir(dataDir):
    if os.path.isdir(os.path.join(dataDir, folder)):
        for image_filename in os.listdir(os.path.join(dataDir, folder)):
            image_path = os.path.join(dataDir, folder, image_filename)
            image = Image.open(image_path).resize((32, 32))
            image = img_to_array(image)  # Convert image to numpy array
            X.append(image)
            y.append(folder)  # Using folder name as label



In [None]:
X = np.array(X)/255
X_train = X.reshape(len(X), 3, 32, 32)


y = np.array(y)
y = np.array(y).reshape(-1, 1)
encoder = OneHotEncoder(sparse=False)
y_encoded = encoder.fit_transform(y)
print(np.shape(X_train))

y_train = y_encoded.reshape(len(y_encoded), 2, 1)

permutation = np.random.permutation(len(X_train))

# Apply the permutation to X_train and y_train
X_train = X_train[permutation]
y_train = y_train[permutation]

print(y_train)

#for testing

testDir='/content/sample_data/test'

a=[]
b=[]

for folder in os.listdir(testDir):
    if os.path.isdir(os.path.join(testDir, folder)):
        for image_filename in os.listdir(os.path.join(testDir, folder)):
            image_path = os.path.join(testDir, folder, image_filename)
            image = Image.open(image_path).resize((32, 32))
            image = img_to_array(image)  # Convert image to numpy array
            a.append(image)
            b.append(folder)  # Using folder name as label

a = np.array(a)/255
b = np.array(b).reshape(-1, 1)
encoder = OneHotEncoder(sparse=False)
b_encoded = encoder.fit_transform(b)


y_test = b_encoded.reshape(len(b_encoded), 2, 1)
X_test = a.reshape(len(a), 3, 32, 32)
print(y_test)


(240, 3, 32, 32)
[[[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[1.]
  [0.]]

 [[0.]
  [1.]]

 [[1.]




network arch

In [None]:
network = [
    Convolutional((3, 32, 32), 3, 5),
    maxPool(3,3),
    Sigmoid(),
    Convolutional((5, 30, 30), 3, 5),
    Sigmoid(),
    maxPool(3,3),
    Reshape((5, 28, 28), (5 * 28 * 28, 1)),
    Dense(5 * 28 * 28, 100),
    Tanh(),
    Dense(100, 2),
    Softmax()
]

# train
train(
    network,
    binary_cross_entropy,
    binary_cross_entropy_prime,
    X_train,
    y_train,
    epochs=100,
    learning_rate=0.1
)

# test
for x, y in zip(X_test, y_test):
    output = predict(network, x)
    print(f"pred: {np.argmax(output)}, true: {np.argmax(y)}")