In [None]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt

# Getting the normalized dataset:

In [None]:
# Get the mean, std of the pixel intensity values
def get_mean_and_std():
  # Get unnormalized data with pixel values [0,1]
  dataset = datasets.FashionMNIST(
    root='./data',
    train=True,
    download=True,
    transform=transforms.ToTensor()
  )

  # Split into 60_000/1_000 = 60 batches for faster processing
  loader = DataLoader(dataset, batch_size=1000, shuffle=False)
  mean, std, num_batches = 0.0, 0.0, 0

  for images, _ in loader:
    batch_samples = images.size(0)

    # Convert 28x28 arrray to 784x1
    images = images.view(batch_samples, -1)

    # Get mean and std for pixel values of the batch
    mean += images.mean(dim=1).sum()
    std += images.std(dim=1).sum()
    num_batches += batch_samples

  mean /= num_batches
  std /= num_batches

  return mean.item(), std.item()

In [None]:
# Get the train and test datasets loaders
def get_datasets(is_cnn = False):
  # Get the normalization mean and std
  mean, std = get_mean_and_std()

  # Convert and Normalize the data to a tensor (pixel values between 0 and 1) and flatten it to a 784 element list
  ds_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,)),
    transforms.Lambda(lambda x: x.view(-1))
  ])

  # Same as before but keep the 28x28 structure if its a CNN
  if is_cnn:
    ds_transform = transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize((mean,), (std,)),
    ])

  # Get the transformed train and test datset
  complete_train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=ds_transform)
  complete_test_dataset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=ds_transform)

  train_loader = DataLoader(complete_train_dataset, batch_size=100, shuffle=True)
  test_loader = DataLoader(complete_test_dataset, batch_size=100, shuffle=False)

  return train_loader, test_loader

# Model implementation

In [None]:
# activation functions and their derivatives
class ReLU:
  def activation(self,x):
    return np.maximum(0,x)

  def derivative(self,x):
    return np.where(x > 0, 1, 0)


class Tanh:
  def activation(self,x):
    return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))

  def derivative(self, x):
    return 1 - np.square(self.activation(x))


class LeakyReLU:
  def __init__(self, y):
    self.gamma = y

  def activation(self, x):
    return np.maximum(0,x) + self.gamma*np.minimum(0,x)

  def derivative(self, x):
    return np.where(x > 0, 1, self.gamma)


class Softmax:
  def activation(self,x):
    sum = np.sum(np.exp(x), axis=0, keepdims=True) # should work for a matrix now
    return np.exp(x) / sum

  def derivative(self,x): #assumes that x is already a softmax vector
    n = x.shape[-1]
    jacobian = np.zeros((n,n))
    for i in range(0,n):
      for j in range(0,n):
        if i == j:
          jacobian[i,j] = x[i] * (1-x[i])
        else:
          jacobian[i,j] = -x[i] * x[j]
    return jacobian

class MultiClassLoss:
  def loss(self, pred, true): # assumes pred and true are vectors of the same size
    return -np.sum(true * np.log(pred))

  def derivative(self, pred, true): #returns the partial derivatives w.r.t. all the class probabilies
    return -true / pred

In [None]:
class MLPBackpropagation:

    """
    Parameters:
    hidden_sizes = an int list that describes the number of units in each hidden layer
    inner_activation = an object that will be the activation function for all hidden layers
    final_activation = an object that will be the activation function for the output
    loss = an object that represents the loss function
    """
    def __init__(self, hidden_sizes, inner_activation, final_activation, loss, bias = True):
        self.hidden_sizes = hidden_sizes
        self.w_matrices = [None]
        self.biases = [None]

        for k in range(len(hidden_sizes)-1): #initialize V later when we recieved x
          w = np.random.randn(hidden_sizes[k+1], hidden_sizes[k]) * .01   #intialize with random gaussian noise
          self.w_matrices.append(w)

          if (bias):
            b = np.ones((1,hidden_sizes[k+1])) # keep separate for easier gradient calulation
            self.biases.append(b)

        # intialize the inner activation and the last activation (both are objects)
        self.inner_fn = inner_activation
        self.outer_fn = final_activation

        #need to intialize what is the loss function
        self.loss_fn = loss


    """
    given the features of 1 instance, compute the prediction
    """
    def forward(self, x):
      self.hidden_units = []
      self.activated_units = []

      units = x.reshape(-1, 1)

      #go through the hidden layers
      for i in range(len(self.w_matrices) - 1):
        # linear transformation on input
        units = np.dot(self.w_matrices[i], units).T + self.biases[i]
        units = units.reshape(-1, 1)
        self.hidden_units.append(units.copy())

        # activate
        units = self.inner_fn.activation(units)
        self.activated_units.append(units.copy())

      #produce the prediction
      y = np.dot(self.w_matrices[-1], units) + self.biases[-1].reshape(-1,1)
      self.hidden_units.append(y.copy())

      y = self.outer_fn.activation(y)
      self.activated_units.append(y.copy())

      return y


    """
    calculate the gradient of the weights using the given instance
    """
    def backward(self, x, pred_y, true_y): # assumes we are given just 1 instance
      # calculate the loss w.r.t. the output
      dy_hat = self.loss_fn.derivative(pred_y,true_y)

      # calculate the loss w.r.t. the outer activation
      if (isinstance(self.outer_fn, Softmax)):
        # simplify the formula so we dont need to calculate the jacobian
        da_list = [(pred_y.reshape(-1, 1) - true_y.reshape(-1, 1)).T]
      else:
        da_list = [np.dot(dy_hat,self.outer_fn.derivative(self.hidden_units[-1])).T]

      # check if the model has no hidden layers
      if (len(self.w_matrices) == 1):
        dw_list = [np.dot(x.reshape(-1,1), da_list[0]).T]
        db_list = [da_list[0]]
        return dw_list, db_list

      # calculate for the outer layer -> y = fn(a) = f(Wc + b) = f(W * g(d) + b) = ....
      dw_list = [np.dot(self.activated_units[-2], da_list[0]).T]
      db_list = [da_list[0]]

      # follow a similar formula for the hidden layers
      tot = len(self.activated_units)
      for i in range(1, tot - 1):
        da = np.dot(da_list[-1], self.w_matrices[tot-i])
        da = self.inner_fn.derivative(self.hidden_units[tot-i-1]).T * da
        da_list.append(da)
        dw_list.append(np.dot(self.activated_units[tot-i-2], da).T)
        db_list.append(da)

      #repeat one more time for the input layer
      da = np.dot(da_list[-1], self.w_matrices[1])
      da = self.inner_fn.derivative(self.hidden_units[0]).T * da
      dw_list.append(np.dot(x.reshape(-1,1),da).T)
      db_list.append(da)

      return dw_list, db_list


    """
    fits the model using X and Y and evaluate the perfomance per epoch if given a test set
    """
    def fit(self, X, Y, epoch, learning_rate = 0.05, testX = None, testY = None):
      features = X.shape[-1]
      classes = Y.shape[-1]

      # initalize the input weight matrix and bias
      if (self.hidden_sizes != []):
        self.w_matrices[0] = np.random.randn(self.hidden_sizes[0], features) * .01
        self.biases[0] = np.ones((1,self.hidden_sizes[0]))

        # intialize the output weight matrix
        self.w_matrices.append(np.random.randn(classes, self.hidden_sizes[-1]) * .01)
        self.biases.append(np.ones((1,classes)))

      else: # no hidden layers
        self.w_matrices[0] = np.random.randn(classes, features) * .01
        self.biases[0] = np.ones((1,classes))

      # training setup

      # determine how many weights to update
      matrices = len(self.w_matrices)

      # if given a test set, evaluate the performance of the model
      evaluate = isinstance(testX, np.ndarray) and isinstance(testY, np.ndarray)

      # initialize train_acc and test_acc (perfomance with no training)
      if (evaluate):
        train_result = self.predict(X)
        test_result = self.predict(testX)
        self.train_acc = [self.evaluate_acc(train_result, Y)]
        self.test_acc = [self.evaluate_acc(test_result, testY)]

      # determine how many instances we have
      if (X.ndim == 1):
        instances = 1
      else:
        instances = X.shape[0]

      # SDG
      for i in range(epoch):
        for j in range(instances):

          # get gradient for each instance
          if (X.ndim == 1):
            pred = self.forward(X).flatten()
            grad_w, grad_b = self.backward(X, pred.reshape(-1,1), Y)
          else:
            pred = self.forward(X[j,:]).flatten()
            grad_w, grad_b = self.backward(X[j,:], pred.reshape(-1,1), Y[j,:])

          #update the weight backwards due to how grad_w and grad_b are stored
          for k in range(matrices):
            self.w_matrices[k] -= learning_rate * grad_w[matrices-1-k]
            self.biases[k] -= learning_rate * grad_b[matrices-1-k]

        # calculate performance per epoch
        if (evaluate):
          train_result = self.predict(X)
          test_result = self.predict(testX)
          self.train_acc.append(self.evaluate_acc(train_result, Y))
          self.test_acc.append(self.evaluate_acc(test_result, testY))



    """
    given a set of predictions and true labels, determine the accuracy of the model
    """
    def evaluate_acc(self, pred, true):
      # assumes model is doing mulit-classification where classes = 1,2,...,k
      # and that the true labels are one hot encoded
      correct = 0
      total = len(pred)
      for i in range(total):
        # determine which class is chosen
        c = np.argmax(pred[i])
        t = np.argmax(true[i])

        # check true label
        if c == t:
          correct += 1
      return correct / total


    """
    given a set of instances, compute the prediction
    """
    def predict(self, X):
      if (X.ndim == 1):
        return self.forward(X)
      else:
        # need to do more than 1 forward pass
        predictions = np.zeros((X.shape[0],10))

        for i in range(X.shape[0]):
          p = self.forward(X[i,:]).flatten()
          predictions[i,:] = self.forward(X[i,:]).flatten()

        return predictions


In [None]:
# example usage:
a = ReLU()
b = Softmax()
loss = MultiClassLoss()
model = MLPBackpropagation([3,4], a, b, loss)

# dataset
c = np.array([[1,2], [3,2]])
d = np.array([[0,1,0,0,0,0,0,0,0,0], [0,0,0,0,1,0,0,0,0,0]])
u = np.array([[1,2], [3,2]])
v = np.array([[0,1,0,0,0,0,0,0,0,0], [0,0,0,0,1,0,0,0,0,0]])

model.fit(c,d,2,0.05,u,v)

print(model.w_matrices)
print(model.biases)

# gives the performance of the model after each epoch (starts at epoch = 0)
print(model.train_acc)
print(model.test_acc)





[array([[-0.00887012, -0.0006326 ],
       [-0.00266154, -0.02137475],
       [-0.00374831, -0.01258845]]), array([[-0.00148664, -0.00201177,  0.01248697],
       [ 0.00076109, -0.00546214,  0.00258449],
       [ 0.01771792,  0.00789625,  0.00988787],
       [-0.02205463,  0.00132387, -0.00295447]]), array([[-0.01793693, -0.04234122, -0.01373082, -0.01655797],
       [ 0.0565294 ,  0.08362823,  0.09009713,  0.06711527],
       [-0.0240732 , -0.01543212, -0.0223693 , -0.01481796],
       [-0.02161826, -0.01977443, -0.00334565,  0.00182939],
       [ 0.08474325,  0.07628665,  0.08728935,  0.07434883],
       [-0.02897312, -0.02374642, -0.0130974 , -0.00786239],
       [-0.00971053, -0.01511436, -0.01293287, -0.02424816],
       [-0.00645742, -0.03341212, -0.01950461, -0.0350789 ],
       [-0.0144764 , -0.03204569,  0.00543394, -0.01485557],
       [-0.02896831, -0.02589097, -0.01973847, -0.02130086]])]
[array([[0.99998553, 0.9999694 , 1.00003303]]), array([[1.00199548, 1.0045735 , 1.0033

# Additional preprocessing to the dataset: Encoding the categorical labels

In [None]:
def oneHotEncoding(Y):
  # given an array of labels, one hot encode them
  n = Y.shape[0]
  encoded = np.zeros((n,10))

  # one hot encoding (if class = k, set the k-th value = 1)
  for i in range(n):
    c = Y[i]
    encoded[i,c] = 1

  return encoded


# get our normalized dataset
training_data, testing_data = get_datasets()

# split data to inputs and labels
trainX, trainY = next(iter(training_data))
testX, testY = next(iter(testing_data))

# convert to numpy array
trainX = trainX.numpy()
trainY = trainY.numpy()
testX = testX.numpy()
testY = testY.numpy()

# one hot encode the labels
trainY = oneHotEncoding(trainY)
testY = oneHotEncoding(testY)