Skip to content

Deep Convolutional Neural Network from scratch. Just NumPy.

Notifications You must be signed in to change notification settings

ciCciC/dcnn-from-scratch

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Deep Convolutional Neural Network

Neuro & Computer Science

from scratch

(c) Koray Poyraz

The hierarchical structure of layers including the transformations and operations mimick the biological computation of neurons in image processing.

TODO:

  • Derivative function of Cross Entropy
  • Backpropagation for the reversed L-1 layer
from tensorflow import keras #only for loading data
import numpy as np
import matplotlib.pyplot as plt

Convolution

  • filter - threshold - max pooling - normalisation
# black and white images
# (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# input_shape = (28, 28, 1)

# colored images
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
test_image = x_test[20]

input_shape = test_image.shape
filter_size = (3, 3)

print('I think its a horse')
plt.imshow(test_image);
filter_vertical_edge = np.array([
    [-1, 1, 0],
    [-2, 1, 0],
    [-1, 1, 0]])

filter_horizontal_edge = np.array([
    [-1, -2, -1],
    [0, 0, 0],
    [1, 2, 1]])


def conv_multi(source_pixels, filter_pixels):
    # apply multiplication stuff incl. sum
    return np.sum(np.multiply(source_pixels, filter_pixels))


def convolution_operation(image, input_shape, filters, kernel_size=(3, 3)):
    # apply convolution operation to 1 image with n filters and n filter size (aka kernel size)
    if filters is None:
        print('Please apply number of filters')
        return

    filter_output_size = input_shape[0]-kernel_size[0]+1

    depth = input_shape[-1] if len(input_shape) > 2 else 1
    boundary = kernel_size[0]

    feature_maps = []

    for filter_idx in range(0, len(filters)):
        filter_pixels = filters[filter_idx]
        feature_map = np.zeros((filter_output_size, filter_output_size))

        for zy in range(0, filter_output_size):
            for zx in range(0, filter_output_size):
                source_pixels = 0
                result = 0

                if depth > 1:
                    colors = []
                    for z in range(0, depth):
                        source_pixels = image[zy:boundary+zy, zx:boundary+zx, z]
                        color_output = conv_multi(source_pixels, filter_pixels)
                        colors.append(color_output)
                    result = sum(colors)
                else:
                    source_pixels = image[zy:boundary+zy, zx:boundary+zx]
                    result = conv_multi(source_pixels, filter_pixels)

                feature_map[zy, zx] = result

        feature_maps.append(feature_map)

    return feature_maps


def convolution_layer(images, input_shape, filters, kernel_size=(3, 3)):
    # apply convolution layer to all images with n filters and n filter size (aka kernel size)
    feature_maps_all_images = []

    depth = images.shape[-1] > 4

    for image in images:
        feature_maps = []

        if depth:
            for feature_map in images[0]:
                tmp_feature_maps = convolution_operation(feature_map, input_shape, filters, kernel_size)
                feature_maps += tmp_feature_maps
        else:
            feature_maps = convolution_operation(image, input_shape, filters, kernel_size)

        feature_maps_all_images.append(np.array(feature_maps))

    return np.array(feature_maps_all_images)
# for now we just use 1 image for testing and demonstration purposes
feature_maps = convolution_layer(np.array([test_image]), input_shape, [filter_vertical_edge, filter_horizontal_edge], filter_size)
feature_maps.shape
def plot_process_step_image(images, title, figsize, cmap='viridis'):
    fig, axs = plt.subplots(1, 3, figsize=figsize)
    axs[0].set_title('filter_vertical_edge')
    axs[0].imshow(images[0], cmap=cmap)
    axs[1].set_title('filter_horizontal_edge')
    axs[1].imshow(images[1], cmap=cmap)
    axs[2].set_title('original image')
    axs[2].imshow(images[2])
    fig.suptitle(title)
    fig.tight_layout()
images = [feature_maps[0][0], feature_maps[0][1], test_image]
plot_process_step_image(images, 'Convolution operation', (12, 5))
test = convolution_layer(feature_maps, feature_maps[0][0].shape, [filter_vertical_edge, filter_horizontal_edge], filter_size)
test.shape
images = [test[0][0], test[0][1], test_image]
plot_process_step_image(images, 'Convolution operation', (12, 5), cmap='Greys')
images = [test[0][2], test[0][3], test_image]
plot_process_step_image(images, 'Convolution operation', (12, 5), cmap='Greys')

Rectified linear (relu) activation over a whole feature map, with a maximum activation mimicking neuron firing rate

def activation_relu(x, max_activation=None):
    # ReLu activation function, max_activation is equal to the firing limit of a neuron
    return max(0.0, x) if max_activation is None else min(max(0.0, x), max_activation)

activation_relu_fn = np.vectorize(activation_relu)

def activation(feature_maps, max_activation=None):
    # apply ReLu on 1 image with n feature maps
    return [activation_relu_fn(feature_map.flatten(), max_activation).reshape(feature_map.shape) for feature_map in feature_maps]

def activation_layer(feature_maps_all_images, max_activation=None):
    # apply ReLu on all images with n feature maps
    return np.array([activation(feature_maps, max_activation) for feature_maps in feature_maps_all_images])
activation_maps = activation_layer(feature_maps, 3)

activation_maps.shape
images = [activation_maps[0][0], activation_maps[0][1], test_image]
plot_process_step_image(images, 'ReLu Threshold operation', (12, 5), cmap='Greys')

Max pooling, the spatial extent of the pooling

def max_pooling(activation_maps, pooling_filter_size=2):
    # apply max pooling on 1 image with n feature maps
    pooling_maps = []

    output_size = int(activation_maps.shape[-1]/pooling_filter_size)

    for activation_map in activation_maps:
        pooling_map = np.zeros((output_size, output_size))

        for y in range(0, output_size):
            for x in range(0, output_size):
                activation_map_subpixels = activation_map[
                                           y*pooling_filter_size:pooling_filter_size*(y+1),
                                           x*pooling_filter_size:pooling_filter_size*(x+1)]
                pooling_map[y,x] = activation_map_subpixels.max()

        pooling_maps.append(pooling_map)
    return np.array(pooling_maps)


def max_pooling2d(activation_maps_all_images, pooling_size=2):
    # apply max pooling on all image with n feature maps
    return np.array([max_pooling(activation_maps, pooling_size) for activation_maps in activation_maps_all_images])
pooling_maps = max_pooling2d(activation_maps, 2)
pooling_maps.shape
images = [pooling_maps[0][0], pooling_maps[0][1], test_image]
plot_process_step_image(images, 'Max Pooling operation', figsize=(10, 5), cmap='Greys')

Normalisation

def normalisation(pooling_maps):
    # apply normalisation on 1 image with n feature maps
    normalisation_maps = []

    for pooling_map in pooling_maps:
        tmp = pooling_map - pooling_map.mean()
        normalised_map = tmp / pooling_map.std()
        normalisation_maps.append(normalised_map)

    return np.array(normalisation_maps)


def normalisation2d(pooling_maps_all_images):
    # apply normalisation on all image with n feature maps
    return np.array([normalisation(pooling_maps) for pooling_maps in pooling_maps_all_images])
normalisation_maps = normalisation2d(pooling_maps)
normalisation_maps.shape
images = [normalisation_maps[0][0], normalisation_maps[0][1], test_image]
plot_process_step_image(images, 'Normalisation mean:0 and std:1 operation', figsize=(10, 5), cmap='Greys')
print('Vertical edge')
plt.hist(normalisation_maps[0][0].flatten())
plt.show()
print('Horizontal edge')
plt.hist(normalisation_maps[0][1].flatten())
plt.show()

Flattening + Fully-connected layer

def flatten_layer(feature_maps_all_images):
    # for flattening the feature maps
    return np.array([feature_maps.flatten() for feature_maps in feature_maps_all_images])


def activation_fully_connect_layer(flatten_input_neurons_all_images, max_activation=3):
    # After flattening and fully connection we apply ReLu for activation
    return np.array([activation_relu_fn(input_neurons, max_activation) for input_neurons in flatten_input_neurons_all_images])


def fully_connect(input_flat_feature_map, output_neurons):
    # apply fully connect layer on 1 image with flattened feature map and calc weights (random) and bias (random)
    weights = np.random.rand(len(input_flat_feature_map), output_neurons) - 0.5
    bias = np.random.rand(1, output_neurons) - 0.5
    output = np.dot(input_flat_feature_map, weights) + bias
    return output


def fully_connect_layer(input_flatten_feature_map_all_images, output_neurons):
    # apply fully connect layer on all image with flattened n feature maps and calc weights (random) and bias (random)
    return np.array([fully_connect(flat_feature_map, output_neurons)[0] for flat_feature_map in input_flatten_feature_map_all_images])
# here we first flat the feature maps
flattened_layer = flatten_layer(normalisation_maps)

# then apply the flat as input for the full connection with eg 128 output neurons
fully_connected = fully_connect_layer(flattened_layer, output_neurons=128)

# then apply ReLu activation on the fully connected layer
activation_fully_connected = activation_fully_connect_layer(fully_connected, max_activation=3)

# dimensions
print(f'Flatten shape: {flattened_layer.shape}')
print(f'Fully connected 1st layer shape: {fully_connected.shape}')
print(f'ReLu activation 1st layer shape: {activation_fully_connected.shape}')

Algorithmic expression of a softmax (normalised exponential) function

def softmax(input_activations):
    # here we apply the softmax activation function within the output layer of the neural network
    numerator = np.exp(input_activations)
    denominator = np.sum(np.exp(input_activations))
    softmax_output = numerator/denominator
    return softmax_output

def softmax_layer(input_activations_all_images):
    # image X classes
    return np.array([softmax(input_activation) for input_activation in input_activations_all_images])
# then apply the flat as input for the full connection with eg 128 output neurons
classes = 10
output_layer_fully_connected = fully_connect_layer(activation_fully_connected, output_neurons=classes)
softmax_results = softmax_layer(output_layer_fully_connected)

# get 1 image
one_image = softmax_results[0]
chosen_class = np.argmax(one_image)
print(f'Predicted class: {chosen_class}')
print('All probability:')
softmax_results

Set of functions for achieving backpropagation of error to affect the convolutional filter (kernel) structure

def relu_fn(x):
    return max(0, x)


relu = np.vectorize(relu_fn)


def relu_layer(feature_maps):
    # apply ReLu on 1 image with n feature maps
    return [relu(feature_map.flatten()).reshape(feature_map.shape) for feature_map in feature_maps]


def relu_derivative_fn(x):
    return 1 if x > 0 else 0


relu_derivative = np.vectorize(relu_derivative_fn)


def relu_derivative_layer(feature_maps):
    # apply ReLu on 1 image with n feature maps
    return [relu_derivative(feature_map.flatten()).reshape(feature_map.shape) for feature_map in feature_maps]


def softmax(input_activations):
    # here we apply the softmax activation function within the output layer of the neural network
    numerator = np.exp(input_activations)
    denominator = np.sum(numerator)
    softmax_output = numerator/denominator
    return softmax_output


def softmax_derivative(softmax_output):
    # here we apply the derivative of softmax function
    dS = softmax_output[0] * np.sum(softmax_output[1:])
    dX = np.sum(softmax_output)**2
    return dS/dX


def loss_cross_entropy(y, yhat):
    # someSoftmax = np.array([0.7, 0.1, 0.2])
    # y = np.array([1,0,0])
    # loss_cross_entropy(y, someSoftmax)
    return -(y*np.log(yhat))

# TODO
def loss_cross_entropy_derivative(y, yhat):
    return -(y*np.log(yhat))
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    # computes feed forward
    def forward_propagation(self, input):
        raise NotImplementedError

    # computes dE/dX for a given dE/dY
    def backward_propagation(self, output_error, learning_rate):
        raise NotImplementedError

    def print_dimension(self):
        raise NotImplementedError


class ConvolutionLayer(Layer):
    def __init__(self, n_filters, kernel_size, activation, activation_prime):
        super().__init__()
        self.n_filters = n_filters
        self.kernel_size = kernel_size
        self.activation = activation
        self.activation_prime_func = activation_prime
        self.filters = [np.random.rand(kernel_size[0], kernel_size[1]) for _ in range(0, n_filters)]


    def conv_multi(self, source_pixels, filter_pixels):
        # apply multiplication stuff incl. sum
        return np.sum(np.multiply(source_pixels, filter_pixels))


    def convolution_operation(self, image):
        # apply convolution operation to 1 image with n filters and n filter size (aka kernel size)
        if self.filters is None:
            print('Please apply number of filters')
            return

        input_shape = image.shape

        filter_output_size = input_shape[0]-self.kernel_size[0]+1

        depth = input_shape[-1] if len(input_shape) > 2 else 1
        boundary = self.kernel_size[0]

        feature_maps = []

        for filter_idx in range(0, len(self.filters)):
            filter_pixels = self.filters[filter_idx]
            feature_map = np.zeros((filter_output_size, filter_output_size))

            for zy in range(0, filter_output_size):
                for zx in range(0, filter_output_size):
                    source_pixels = 0
                    result = 0

                    if depth > 1:
                        colors = []
                        for z in range(0, depth):
                            source_pixels = image[zy:boundary+zy, zx:boundary+zx, z]
                            color_output = self.conv_multi(source_pixels, filter_pixels)
                            colors.append(color_output)
                        result = sum(colors)
                    else:
                        source_pixels = image[zy:boundary+zy, zx:boundary+zx]
                        result = self.conv_multi(source_pixels, filter_pixels)

                    feature_map[zy, zx] = result

            feature_maps.append(feature_map)

        return feature_maps

    def convolution_layer(self, image):
        depth = image.shape[-1] > 4
        feature_maps = []

        if depth:
            for feature_map in image:
                tmp_feature_maps = self.convolution_operation(feature_map)
                feature_maps += tmp_feature_maps
        else:
            feature_maps = self.convolution_operation(image)

        return np.array(feature_maps)

    def normalisation(self, feature_maps):
        # apply normalisation on 1 image with n feature maps
        normalisation_maps = []

        for feature_map in feature_maps:
            tmp = feature_map - feature_map.mean()
            normalised_map = tmp / feature_map.std()
            normalisation_maps.append(normalised_map)

        return np.array(normalisation_maps)


    # computes feed forward
    def forward_propagation(self, input):
        self.input = input
        self.feature_maps = self.convolution_layer(self.input) #z
        self.a = self.activation(self.feature_maps) #a
        self.normalised = self.normalisation(self.a) #n
        return self.normalised


    # computes dE/dX for a given dE/dY
    def backward_propagation(self, output_error, learning_rate):
        delta_per_activation = self.activation_prime_func(self.a) * output_error #back-propagation error, relu * error
        delta_per_feature = [np.dot(feature_map, err) for feature_map, err in zip(self.feature_maps, delta_per_activation)] #back-propagation error, feature map * error

        input_errors = [np.dot(weight_filter, err) for weight_filter, err in zip(self.filters, delta_per_feature)] #back-propagation error, filter * error and becomes the input error for L-1

        self.filters = [np.min(filter, err) for filter, err in zip(self.filters, input_errors)] # tune the current weights

        return input_errors # error for L-1


    def print_dimension(self):
        return f'convolution layer: {self.kernel_size}'


class Flatten(Layer):
    def __init__(self):
        super().__init__()

    def forward_propagation(self, feature_maps):
        self.input = feature_maps
        # self.output = np.array([feature_maps.flatten() for feature_maps in self.input])
        self.output = self.input.flatten()
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # because it is only flattening we can pass the error to L-1
        return output_error


    def print_dimension(self):
        return f'flatten: unknown'


class FullyConnect(Layer):
    state = True

    def __init__(self, output_size, activation_func, activation_prime_func):
        super().__init__()
        self.output_size = output_size
        self.activation_func = activation_func
        self.activation_prime_func = activation_prime_func


    def init_weights_bias(self, flattened_input):
        self.weights = np.random.rand(len(flattened_input), self.output_size) - 0.5
        self.bias = np.random.rand(1, self.output_size) - 0.5

    def forward_propagation(self, flattened_input):

        if self.state:
            self.init_weights_bias(flattened_input)
            self.state = False

        self.input = flattened_input
        self.z = np.dot(self.input, self.weights) + self.bias
        self.a = self.activation_func(self.z)
        return self.a


    def back_propagation(self, output_error, learning_rate):
        delta = self.activation_prime_func(self.a) * output_error #back-propagation error
        input_error = np.dot(delta, self.weights.T) #back-propagation error for the previous input, L-1
        weights_error = np.dot(self.input, delta) #dJdW

        self.weights -= weights_error * learning_rate
        self.bias -= delta * learning_rate

        return input_error


    def print_dimension(self):
        return f'output dense: {self.output_size}'


class DCNNetwork:
    def __init__(self):
        self.layers = []


    def add_layer(self, layer):
        self.layers.append(layer)


    def compile(self, loss_function, loss_function_prime, learning_rate):
        self.loss_function = loss_function
        self.loss_function_prime = loss_function_prime
        self.learning_rate = learning_rate

        [print(layer.print_dimension()) for layer in self.layers]


    def fit(self, x_train, y_train, epochs):

        L = len(x_train)

        for e in range(epochs):

            err = 0

            for j in range(L):

                output = x_train[j]

                for layer in self.layers:
                    output = layer.forward_propagation(output)

                err = self.loss_function(y_train[j], output)
                error = self.loss_function_prime(y_train[j], output)

                for back_layer in reversed(self.layers):
                    error = back_layer.back_propagation(error, self.learning_rate)

            print(f'epoch: {e} \t error={err}')
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255

test_image = x_test[0]
colly1 = ConvolutionLayer(n_filters=32, kernel_size=(3, 3), activation=relu_layer, activation_prime=relu_derivative)
propagated1 = colly1.forward_propagation(test_image)
images = [propagated1[0], propagated1[2], test_image]
plot_process_step_image(images, 'Convolution operation', (12, 5))

Implementation of Convolutional neural network with two convolutional layers, a fully connected layer, and an output layer (pooling, thresholding and normalisation)

model = DCNNetwork()

model.add_layer(ConvolutionLayer(n_filters=32, kernel_size=(3, 3), activation=relu_layer, activation_prime=relu_derivative))
model.add_layer(ConvolutionLayer(n_filters=32, kernel_size=(3, 3), activation=relu_layer, activation_prime=relu_derivative))

model.add_layer(Flatten())

model.add_layer(FullyConnect(128, relu, relu_derivative))
model.add_layer(FullyConnect(10, softmax, softmax_derivative))

model.compile(loss_cross_entropy, loss_cross_entropy_derivative, learning_rate=0.1)
model.fit(x_train, y_train)

Releases

No releases published

Packages

No packages published