In [1]:
import cv2
from matplotlib import pyplot as plt
import numpy as np
import random
import math


In [2]:
from preprocess import dataset

In [3]:
def kaiming_kernel(dim, depth=1):
    arr = np.zeros((depth,dim,dim))
    for i in range(dim):
        for j in range(dim):
            for k in range(depth):
                arr[k][i][j]= random.gauss(0,math.sqrt(2/(dim*dim*depth)))
    return arr


def convolve(image, kernel, padding=0, strides=1):
    
    dim = kernel.shape[1]

    if(len(image.shape)==2):
        image = image.reshape((1,image.shape[0],image.shape[1]))
    if(len(kernel.shape)==2):
        kernel = kernel.reshape((1,dim,dim))

    depth = kernel.shape[0]
    xOutput = int(((image.shape[1] - dim + 2 * padding) / strides) + 1)
    yOutput = int(((image.shape[2] - dim + 2 * padding) / strides) + 1)

    output = np.zeros((xOutput, yOutput))
    imagePadded = np.pad(image, ((0,0),(padding,padding),(padding,padding)), 'constant')

    # Iterate through image
    for y in range(image.shape[2] - dim +1):
        if y % strides == 0:
            for x in range(image.shape[1] - dim + 1):
                if x % strides == 0:
                    for z in range(depth):
                        output[x, y] += (kernel[z,:,:] * imagePadded[z, x : x + dim, y : y + dim]).sum()

    return output


In [4]:
def ReLU(x):
    return 0 if x < 0 else x


def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=0)


def cross_entropy(x):
    return -np.log(x)


def regularized_cross_entropy(layers, lam, x):
    loss = cross_entropy(x)
    for layer in layers:
        loss += lam * (np.linalg.norm(layer.get_weights()) ** 2)
    return loss


def ReLU_derivative(x):
    return 0 if x < 0 else 1


In [5]:
class Convolutional:
    
    def __init__(self,name,depth,num_filters=1,size=3,padding=0,stride=1):
        self.depth = depth
        self.name = name
        self.size = size
        self.padding = padding
        self.stride = stride
        self.out = None
        self.num_filters =num_filters
        self.ReLU = np.vectorize(ReLU)
        self.last_input = None
        self.filters = np.zeros((self.num_filters,self.depth,self.size,self.size))
        for i in range(num_filters):
            self.filters[i]=kaiming_kernel(self.size,self.depth)

        
        
    def forward(self,image):
        print(self.name,"forward done\n")
        self.depth = image.shape[0]
        self.last_input = image
        
        self.out = np.zeros((self.num_filters, math.floor((image.shape[1] - self.size + 2*self.padding)/self.stride) + 1, math.floor((image.shape[2] - self.size + 2*self.padding)/self.stride) + 1))
        
        for i in range(self.num_filters):
            self.out[i] = convolve(image,self.filters[i],self.padding,self.stride)
            
        self.out = self.ReLU(self.out)
    
        return self.out
    
    def plot_filters(self, n_filters=10):
        x = self.filters.shape[1]
        fig = plt.figure(figsize=(20,18))
        for i in range(n_filters):
            for j in range(x):
                fig.add_subplot(10,x,i*x + j +1)
                plt.xticks([])
                plt.yticks([])
                plt.imshow(self.filters[i][j])
        plt.show()
    
    def backward(self, din, learn_rate=0.005):
        print(self.name,"backward done\n")
        input_dimension = self.last_input.shape[1]          # input dimension
                      # back propagate through ReLU
        self.ReLU_derivative(din)

        dout = np.zeros(self.last_input.shape)              # loss gradient of the input to the convolution operation
        dfilt = np.zeros(self.filters.shape)                # loss gradient of filter

        for f in range(self.filters.shape[0]):              # loop through all filters
            tmp_y = out_y = 0
            while tmp_y + self.size <= input_dimension:
                tmp_x = out_x = 0
                while tmp_x + self.size <= input_dimension:
                    patch = self.last_input[:, tmp_y:tmp_y + self.size, tmp_x:tmp_x + self.size]
                    dfilt[f] += np.sum(din[f, out_y, out_x] * patch, axis=0)
                    dout[:, tmp_y:tmp_y + self.size, tmp_x:tmp_x + self.size] += din[f, out_y, out_x] * self.filters[f]
                    tmp_x += self.stride
                    out_x += 1
                tmp_y += self.stride
                out_y += 1
        self.filters -= learn_rate * dfilt                  # update filters using SGD
        return dout                                         # return the loss gradient for this layer's inputs

    def get_weights(self):
        return np.reshape(self.filters, -1)

        
        

In [6]:
img = cv2.imread("Group_28/train/ketch/image_0007.jpg")
img = cv2.resize(img,(224,224))
img = np.transpose(img, (2,0,1))


In [7]:
class Pooling:                                              # max pooling layer using pool size equal to 2
    def __init__(self, name, stride=2, size=2):
        self.name = name
        self.last_input = None
        self.stride = stride
        self.size = size

    def forward(self, image):
        print(self.name,"forward done\n")
        self.last_input = image                             # keep track of last input for later backward propagation

        num_channels, h_prev, w_prev = image.shape
        h = int((h_prev - self.size) / self.stride) + 1     # compute output dimensions after the max pooling
        w = int((w_prev - self.size) / self.stride) + 1

        downsampled = np.zeros((num_channels, h, w))        # hold the values of the max pooling

        for i in range(num_channels):                       # slide the window over every part of the image and
            curr_y = out_y = 0                              # take the maximum value at each step
            while curr_y + self.size <= h_prev:             # slide the max pooling window vertically across the image
                curr_x = out_x = 0
                while curr_x + self.size <= w_prev:         # slide the max pooling window horizontally across the image
                    patch = image[i, curr_y:curr_y + self.size, curr_x:curr_x + self.size]
                    downsampled[i, out_y, out_x] = np.max(patch)       # choose the maximum value within the window
                    curr_x += self.stride                              # at each step and store it to the output matrix
                    out_x += 1
                curr_y += self.stride
                out_y += 1

        return downsampled

    def backward(self, din, learning_rate):
        print(self.name,"backward done\n")
        num_channels, orig_dim, *_ = self.last_input.shape      # gradients are passed through the indices of greatest
                                                                # value in the original pooling during the forward step

        dout = np.zeros(self.last_input.shape)                  # initialize derivative

        for c in range(num_channels):
            tmp_y = out_y = 0
            while tmp_y + self.size <= orig_dim:
                tmp_x = out_x = 0
                while tmp_x + self.size <= orig_dim:
                    patch = self.last_input[c, tmp_y:tmp_y + self.size, tmp_x:tmp_x + self.size]    # obtain index of largest
                    (x, y) = np.unravel_index(np.nanargmax(patch), patch.shape)                     # value in patch
                    dout[c, tmp_y + x, tmp_x + y] += din[c, out_y, out_x]
                    tmp_x += self.stride
                    out_x += 1
                tmp_y += self.stride
                out_y += 1

        return dout

    def get_weights(self):                          # pooling layers have no weights
        return 0


class FullyConnected:                               # fully-connected layer
    def __init__(self, name, nodes1, nodes2, activation):
        self.name = name
        self.weights = np.random.normal(0, 1/math.sqrt(nodes1), size=(nodes1, nodes2))
        self.biases = np.zeros(nodes2)
        self.activation = activation
        self.last_input_shape = None
        self.last_input = None
        self.last_output = None
        self.ReLU = np.vectorize(ReLU)
        self.ReLU_derivative = np.vectorize(ReLU_derivative)

    def forward(self, input):
        print(self.name,"forward done\n")
        self.last_input_shape = input.shape         # keep track of last input shape before flattening
                                                    # for later backward propagation

        input = input.flatten()                                 # flatten input

        output = np.dot(input, self.weights) + self.biases      # forward propagate
                        # apply ReLU activation function
        self.ReLU(output)

        self.last_input = input                     # keep track of last input and output for later backward propagation
        self.last_output = output

        return output

    def backward(self, din, learning_rate=0.005):                           # back propagate through ReLU
        print(self.name,"backward done\n")
        self.ReLU_derivative(din)

        self.last_input = np.expand_dims(self.last_input, axis=1)
        din = np.expand_dims(din, axis=1)

        dw = np.dot(self.last_input, np.transpose(din))           # loss gradient of final dense layer weights
        db = np.sum(din, axis=1).reshape(self.biases.shape)       # loss gradient of final dense layer biases

        self.weights -= learning_rate * dw                        # update weights and biases
        self.biases -= learning_rate * db

        dout = np.dot(self.weights, din)
        return dout.reshape(self.last_input_shape)

    def get_weights(self):
        return np.reshape(self.weights, -1)


class Dense:                                        # dense layer with softmax activation
    def __init__(self, name, nodes, num_classes):
        self.name = name
        self.weights = np.random.normal(0, 1/math.sqrt(nodes), size=(nodes, num_classes))
        self.biases = np.zeros(num_classes)
        self.last_input_shape = None
        self.last_input = None
        self.last_output = None

    def forward(self, input):
        print(self.name,"forward done\n")
        self.last_input_shape = input.shape         # keep track of last input shape before flattening
                                                    # for later backward propagation

        input = input.flatten()                                 # flatten input

        output = np.dot(input, self.weights) + self.biases      # forward propagate

        self.last_input = input                     # keep track of last input and output for later backward propagation
        self.last_output = output

        return softmax(output)

    def backward(self, din, learning_rate=0.005):
        print(self.name,"backward done\n")
        for i, gradient in enumerate(din):
            if gradient == 0:                   # the derivative of the loss with respect to the output is nonzero
                continue                        # only for the correct class, so skip if the gradient is zero

            t_exp = np.exp(self.last_output)                      # gradient of dout[i] with respect to output
            dout_dt = -t_exp[i] * t_exp / (np.sum(t_exp) ** 2)
            dout_dt[i] = t_exp[i] * (np.sum(t_exp) - t_exp[i]) / (np.sum(t_exp) ** 2)

            dt = gradient * dout_dt                               # gradient of loss with respect to output

            dout = self.weights @ dt                              # gradient of loss with respect to input

            # update weights and biases
            self.weights -= learning_rate * (np.transpose(self.last_input[np.newaxis]) @ dt[np.newaxis])
            self.biases -= learning_rate * dt

            return dout.reshape(self.last_input_shape)            # return the loss gradient for this layer's inputs

    def get_weights(self):
        return np.reshape(self.weights, -1)

In [8]:
def plot_accuracy_curve(accuracy_history, val_accuracy_history):
    plt.plot(accuracy_history, 'b', linewidth=3.0, label='Training accuracy')
    plt.plot(val_accuracy_history, 'r', linewidth=3.0, label='Validation accuracy')
    plt.xlabel('Iteration', fontsize=16)
    plt.ylabel('Accuracy rate', fontsize=16)
    plt.legend()
    plt.title('Training Accuracy', fontsize=16)
    plt.savefig('training_accuracy.png')
    plt.show()


def plot_learning_curve(loss_history):
    plt.plot(loss_history, 'b', linewidth=3.0, label='Cross entropy')
    plt.xlabel('Iteration', fontsize=16)
    plt.ylabel('Loss', fontsize=16)
    plt.legend()
    plt.title('Learning Curve', fontsize=16)
    plt.savefig('learning_curve.png')
    plt.show()


def plot_sample(image, true_label, predicted_label):
    plt.imshow(image)
    if true_label and predicted_label is not None:
        if type(true_label) == 'int':
            plt.title('True label: %d, Predicted Label: %d' % (true_label, predicted_label))
        else:
            plt.title('True label: %s, Predicted Label: %s' % (true_label, predicted_label))
    plt.show()


def plot_histogram(layer_name, layer_weights):
    plt.hist(layer_weights)
    plt.title('Histogram of ' + str(layer_name))
    plt.xlabel('Value')
    plt.ylabel('Number')
    plt.show()


In [11]:


class Network:
    def __init__(self):
        self.layers = []

    def add_layer(self, layer):
        self.layers.append(layer)

    def build_model(self):
        self.add_layer(Convolutional(name='conv1', num_filters=32,depth = 3))
        self.add_layer(Convolutional(name='conv2', num_filters=64, depth = 32))
        self.add_layer(Pooling(name='pool1', stride=1, size=2))
        self.add_layer(FullyConnected(name='fullyconnected', nodes1=64*219*219, nodes2=128, activation='relu'))
        self.add_layer(Dense(name='dense', nodes=218, num_classes=3))

    def forward(self, image, plot_feature_maps):                # forward propagate
        for layer in self.layers:
            if plot_feature_maps:
                image = (image * 255)[0, :, :]
                plot_sample(image, None, None)
            image = layer.forward(image)
        return image

    def backward(self, gradient, learning_rate):                # backward propagate
        for layer in reversed(self.layers):
            gradient = layer.backward(gradient, learning_rate)

    def train(self, dataset, num_epochs, learning_rate, validate, regularization, plot_weights, verbose):
        history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []}
        for epoch in range(1, num_epochs + 1):
            print('\n--- Epoch {0} ---'.format(epoch))
            loss, tmp_loss, num_corr = 0, 0, 0
            for i in range(len(dataset['train_images'])):
#                 if i % 100 == 99:
                accuracy = (num_corr / (i + 1)) * 100       # compute training accuracy and loss up to iteration i
                loss = tmp_loss / (i + 1)

                history['loss'].append(loss)                # update history
                history['accuracy'].append(accuracy)

                
                if verbose:
                    print('[Step %05d/%03d]: Loss %02.3f | Accuracy: %02.3f ' % (i + 1, len(dataset['train_images']), loss, accuracy))
                
                
                
                image = dataset['train_images'][i]
                label = dataset['train_labels'][i]

                tmp_output = self.forward(image, plot_feature_maps=0)       # forward propagation

                # compute (regularized) cross-entropy and update loss
                tmp_loss += regularized_cross_entropy(self.layers, regularization, tmp_output[label])

                if np.argmax(tmp_output) == label:                          # update accuracy
                    num_corr += 1

                gradient = np.zeros(10)                                     # compute initial gradient
                gradient[label] = -1 / tmp_output[label] + np.sum([2 * regularization * np.sum(np.absolute(layer.get_weights())) for layer in self.layers])


                self.backward(gradient, learning_rate)                      # backward propagation
                
                
        if validate:
            print('Validation\n')
            indices = np.random.permutation(dataset['validation_images'].shape[0])
            val_loss, val_accuracy = self.evaluate(
                dataset['validation_images'][indices, :],
                dataset['validation_labels'][indices],
                regularization=0,
                plot_correct=0,
                plot_missclassified=0,
                plot_feature_maps=0,
                verbose=0
            )
            history['val_loss'].append(val_loss)
            history['val_accuracy'].append(val_accuracy)

            if verbose:
                print('[Step %05d]: Loss %02.3f | Accuracy: %02.3f | '
                      'Validation Loss %02.3f | Validation Accuracy: %02.3f' %
                      (i + 1, loss, accuracy, val_loss, val_accuracy))
                        
                        
        if verbose:
            
            print('Train Loss: %02.3f' % (history['loss'][-1]))
            print('Train Accuracy: %02.3f' % (history['accuracy'][-1]))
            plot_learning_curve(history['loss'])
            plot_accuracy_curve(history['accuracy'], history['val_accuracy'])

        if plot_weights:
            for layer in self.layers:
                if 'pool' not in layer.name:
                    plot_histogram(layer.name, layer.get_weights())

    def evaluate(self, X, y, regularization, plot_correct, plot_missclassified, plot_feature_maps, verbose):
        loss, num_correct = 0, 0
        for i in range(len(X)):
            tmp_output = self.forward(X[i], plot_feature_maps)              # forward propagation

            # compute cross-entropy update loss
            loss += regularized_cross_entropy(self.layers, regularization, tmp_output[y[i]])

            prediction = np.argmax(tmp_output)                              # update accuracy
            if prediction == y[i]:
                num_correct += 1
                if plot_correct:                                            # plot correctly classified digit
                    image = (X[i] * 255)[0, :, :]
                    plot_sample(image, y[i], prediction)
                    plot_correct = 1
            else:
                if plot_missclassified:                                     # plot missclassified digit
                    image = (X[i] * 255)[0, :, :]
                    plot_sample(image, y[i], prediction)
                    plot_missclassified = 1

        test_size = len(X)
        accuracy = (num_correct / test_size) * 100
        loss = loss / test_size
        if verbose:
            print('Test Loss: %02.3f' % loss)
            print('Test Accuracy: %02.3f' % accuracy)
        return loss, accuracy

In [None]:
model = Network()
model.build_model()

num_epochs = 10
learning_rate = 0.05
validate = 1
regularization = 0
verbose = 1
plot_weights = 0
plot_correct = 0
plot_missclassified = 0
plot_feature_maps = 0
    
print('\n--- Training the model ---')                                   # train model
model.train(
    dataset,
    num_epochs,
    learning_rate,
    validate,
    regularization,
    plot_weights,
    verbose
)

print('\n--- Testing the model ---')                                    # test model
model.evaluate(
    dataset['test_images'],
    dataset['test_labels'],
    regularization,
    plot_correct,
    plot_missclassified,
    plot_feature_maps,
    verbose
)



--- Training the model ---

--- Epoch 1 ---
[Step 00001]: Loss 0.000 | Accuracy: 0.000 
conv1 for

conv2 for

