In [None]:
import numpy

def convolve(image, kernel):
  # print(image[:10], image.shape)
  image = image.reshape(28, 28)
  # print(image)
  img_h = img_w = len(image)
  ker_h, ker_w = kernel.shape
  out_img_h = img_h - ker_h + 1
  out_img_w = img_w - ker_w + 1
  output = np.zeros((out_img_h, out_img_w))

  for y in range(out_img_h):
    for x in range(out_img_w):
      window = image[y:y + ker_h, x:x + ker_w]
      output[y, x] = np.sum(window * kernel)
  return output


def max_pooling(image, kernel_size=2, stride=2):
  height, width = image.shape
  new_height = (height - kernel_size) // stride + 1
  new_width = (width - kernel_size) // stride + 1
  pooled = np.zeros((new_height, new_width))

  for y in range(0, height - kernel_size + 1, stride):
      for x in range(0, width - kernel_size + 1, stride):
          window = image[y:y+kernel_size, x:x+kernel_size]
          pooled[y // stride, x // stride] = np.max(window)

  return pooled



def apply_convolution_and_pooling(df):
  kernel = np.array([[1, 0, -1], [1, 0, -1], [1, 0, -1]])
  pooling_kernel_size = 2
  pooling_stride = 2
  processed_images = []
  for image in range(len(df)):
      convolved = convolve(df[image], kernel)
      pooled = max_pooling(convolved, pooling_kernel_size, pooling_stride)
      processed_images.append(pooled)
      if not image % 1000:
        print(f'Image: {image} done')

  return processed_images

In [None]:
import pandas as pd
import numpy as np


data = pd.read_csv('/content/sample_data/mnist_train_small.csv')[:5000]

# data.columns.tolist()
data['6'] = np.where(data['6'] == 5, 1, 0) # binary classification 5 or not 5

data = np.array(data)


X = data[:, 1:]
# print(X.shape, X[0].shape)
y = data[:, 0]

processed = apply_convolution_and_pooling(X)

X = np.array([image.flatten() for image in processed])


X.shape

Image: 0 done
Image: 1000 done
Image: 2000 done
Image: 3000 done
Image: 4000 done


(5000, 169)

In [None]:
class Layer_Dense:
  def __init__(self, n_inputs, n_neurons):
    self.weights = 0.01 * np.random.randn(n_inputs, n_neurons)
    self.biases = np.zeros((1, n_neurons))


  def forward(self, inputs):
    self.inputs = inputs
    self.output = np.dot(inputs, self.weights) + self.biases


  def backward(self, dvalues):
    self.dweights = np.dot(self.inputs.T, dvalues)
    self.dbiases = np.sum(dvalues, axis=0, keepdims=True)
    self.dinputs = np.dot(dvalues, self.weights.T)


In [None]:

class Activation_ReLU:
  def forward(self, inputs):
    self.inputs = inputs
    self.output = np.maximum(0, inputs)


  def backward(self, dvalues):
    self.dinputs = dvalues.copy()
    self.dinputs[self.inputs <= 0] = 0


class Activation_Softmax:
  def forward(self, inputs):
    self.inputs = inputs
    exp_values = np.exp(inputs - np.max(inputs, axis=1, keepdims=True))
    probabilities = exp_values / np.sum(exp_values, axis=1, keepdims=True)

    self.output = probabilities


  def backward(self, dvalues):
    self.dinputs = np.empty_like(dvalues)

    for index, (single_output, single_dvalues) in enumerate(zip(self.output, dvalues)):
      single_output = single_output.reshape(-1, 1)
      jacobian_matrix = np.diagflat(single_output) - np.dot(single_output, single_output.T)

      self.dinputs[index] = np.dot(jacobian_matrix, single_dvalues)

In [None]:

class Optimizer_SGD:
  def __init__(self, learning_rate=1., decay=0., momentum=0.):
    self.learning_rate = learning_rate
    self.current_learning_rate = learning_rate
    self.decay = decay
    self.iterations = 0
    self.momentum = momentum

  def pre_update_params(self):
    if self.decay:
      self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))


  def update_params(self, layer):

    if self.momentum:
      if not hasattr(layer, 'weight_momentums'):
        layer.weight_momentums = np.zeros_like(layer.weights)
        layer.bias_momentums = np.zeros_like(layer.biases)

      weight_updates = \
        self.momentum * layer.weight_momentums - \
        self.current_learning_rate * layer.dweights
      layer.weight_momentums = weight_updates

      bias_updates = \
        self.momentum * layer.bias_momentums - \
        self.current_learning_rate * layer.dbiases
      layer.bias_momentums = bias_updates
    else:
      weight_updates = -self.current_learning_rate * layer.dweights
      bias_updates = -self.current_learning_rate * layer.dbiases


    layer.weights += weight_updates
    layer.biases += bias_updates

  def post_update_params(self):
      self.iterations += 1


In [None]:

class Loss:
  def calculate(self, output, y):
    sample_losses = self.forward(output, y)
    data_loss = np.mean(sample_losses)
    return data_loss


class Loss_CategoricalCrossentropy(Loss):
  def forward(self, y_pred, y_true):
    samples = len(y_pred)
    y_pred_clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)
    if len(y_true.shape) == 1:
        correct_confidences = y_pred_clipped[
            range(samples),
            y_true
        ]
    elif len(y_true.shape) == 2:
        correct_confidences = np.sum(
            y_pred_clipped * y_true,
            axis=1
        )

    negative_log_likelihoods = -np.log(correct_confidences)
    return negative_log_likelihoods

  def backward(self, dvalues, y_true):
    samples = len(dvalues)
    labels = len(dvalues[0])
    if len(y_true.shape) == 1:
        y_true = np.eye(labels)[y_true]
    self.dinputs = -y_true / dvalues
    self.dinputs = self.dinputs / samples



class Activation_Softmax_Loss_CategoricalCrossentropy():
  def __init__(self):
    self.activation = Activation_Softmax()
    self.loss = Loss_CategoricalCrossentropy()

  def forward(self, inputs, y_true):
    self.activation.forward(inputs)
    self.output = self.activation.output
    return self.loss.calculate(self.output, y_true)

  def backward(self, dvalues, y_true):
    samples = len(dvalues)
    if len(y_true.shape) == 2:
        y_true = np.argmax(y_true, axis=1)

    self.dinputs = dvalues.copy()
    self.dinputs[range(samples), y_true] -= 1
    self.dinputs = self.dinputs / samples



In [None]:

dense1 = Layer_Dense(169, 64)
activation1 = Activation_ReLU()

dense2 = Layer_Dense(64, 2)
loss_activation = Activation_Softmax_Loss_CategoricalCrossentropy()

optimizer = Optimizer_SGD(learning_rate=0.05, decay=5e-7)

for epoch in range(31):
  dense1.forward(X)
  activation1.forward(dense1.output)
  dense2.forward(activation1.output)

  loss = loss_activation.forward(dense2.output, y)

  predictions = np.argmax(loss_activation.output, axis=1)
  if len(y.shape) == 2:
      y = np.argmax(y, axis=1)
  accuracy = np.mean(predictions==y)

  if epoch % 10 == 0:
      print(f'epoch: {epoch}, ' + f'acc: {accuracy:.3f}, ' + f'loss: {loss:.3f}, ' + f'lr: {optimizer.current_learning_rate}')


  loss_activation.backward(loss_activation.output, y)
  dense2.backward(loss_activation.dinputs)
  activation1.backward(dense2.dinputs)
  dense1.backward(activation1.dinputs)


  optimizer.pre_update_params()
  optimizer.update_params(dense1)
  optimizer.update_params(dense2)
  optimizer.post_update_params()

epoch: 0, acc: 0.567, loss: 0.881, lr: 0.05
epoch: 10, acc: 0.913, loss: 0.551, lr: 0.0499997750010125
epoch: 20, acc: 0.913, loss: 0.496, lr: 0.04999952500451246
epoch: 30, acc: 0.913, loss: 0.445, lr: 0.04999927501051235
