In [1]:
import torch
from torchvision.models.feature_extraction import get_graph_node_names

import numpy as np

In [2]:
train = np.load('fashion_train.npy')
test = np.load('fashion_test.npy')

X_train, y_train = train[:, :784], train[:, 784]
X_test, y_test = test[:, :784], test[:, 784]

In [3]:
#reshape the X_train and X_test to 28x28
X_train = X_train.reshape(-1, 28, 28)
X_test = X_test.reshape(-1, 28, 28)

In [4]:
import torch.nn as nn
import torch.nn.functional as F

In [5]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


net = Net()


In [6]:
#apply X_train to the model
X_train = torch.from_numpy(X_train).float()
X_train = X_train.unsqueeze(1)
#flatten the X_train to 1D
X_train = torch.flatten(X_train, 1)

X_test = torch.from_numpy(X_test).float()
X_test = X_test.unsqueeze(1)
X_test = torch.flatten(X_test, 1)

In [7]:
class MiniBatchGD:
    def __init__(self, X, y, batch_size=32):
        self.X = X
        self.y = y
        self.batch_size = batch_size

    def sample(self):
        ''' Sample a batch of data '''
        idx = np.random.choice(self.X.shape[0], self.batch_size, replace=False)
        return self.X[idx], self.y[idx]

In [8]:
class ActivationFunction:
    def __init__(self, name, lr=0.01):
        self.name = name
        self.lr = lr

    def calculate(self, x, derivative=False):
        if self.name == 'sigmoid':
            return self.sigmoid(x, derivative=derivative)
        elif self.name == 'relu':
            return self.relu(x, derivative=derivative)
        elif self.name == 'softmax':
            return self.softmax(x, derivative=derivative)

    def sigmoid(self, x, derivative=False):
        ''' 
            sigmoid activation function and its derivative 
        '''
        if not derivative:
            return 1 / (1 + np.exp (-x))
        else:
            out = self.sigmoid(x)
            return out * (1 - out)

    def relu(self, x, derivative=False):
        ''' 
            relu activation function and its derivative 
        '''
        if not derivative:
            return np.where(x > 0, x, 0)
        else:
            return np.where(x > 0, 1, 0)
    
    def softmax(self, x, derivative=False):
        ''' 
            softmax activation function and its derivative 
        '''
        if not derivative:
            exps = np.exp(x - np.max(x))
            return exps / np.sum(exps)
        else:
            out = self.softmax(x)
            return out * (1 - out)

In [9]:

class NeuralNetwork:
    def __init__(self, input_size=784, hidden_size=300, output_size=5, layers_num=3, learning_rate=0.01, test=False, activation_name='sigmoid'):
        self.input_size = input_size
        self.output_size = output_size
        self.hidden_size = hidden_size 
        self.layers_num = layers_num
        self.lr = learning_rate
        self.activation = ActivationFunction(activation_name, lr=learning_rate)

        self.weights = []
        self.bias = []

        self.weights.append(np.random.randn(self.input_size, self.hidden_size))
        self.bias.append(np.random.randn(1, self.hidden_size))

        self.weights.append(np.random.randn(self.hidden_size, self.output_size))
        self.bias.append(np.random.randn(1, self.output_size))
        
    def forward_pass(self, X):
        ''' 
            conduct the forward pass on the network 
        '''
        #X = X / 255
        self.z1 = np.dot(X, self.weights[0]) + self.bias[0]
        self.a1 = self.activation.calculate(self.z1)

        self.z2 = np.dot(self.a1, self.weights[1]) + self.bias[1]
        self.a2 = self.activation.calculate(self.z2)

        self.outputs = np.zeros((len(self.a2), self.output_size))
        for i in range(len(self.a2)):
            self.outputs[i][np.argmax(self.a2[i])] = 1

        self.outputs = np.array(self.outputs)


    def backward_pass(self, X, y):
        '''
            conduct the backward pass on the network
        '''
        #X = X / 255
        y_mtrix = np.zeros((len(y), int(self.output_size))) 
        #change y into 1-hot encoding by assigning 1 to the index of the label
        for i in range(len(y)):
            y_mtrix[i][y[i]] = 1

        #loss, used to check the accuracy of the network
        self.loss = np.sum((self.outputs - y_mtrix)**2) / (2*y_mtrix.size)

        #accuracy, used to check the accuracy of the network
        self.accuracy = np.sum(np.argmax(self.outputs, axis=1) == y) / len(y)

        #calculate the error of the hidden layer
        self.e1 = self.a2 - y_mtrix
        dw1 = self.e1 * self.activation.calculate(self.a2, True)
        
        #calculate the error of the input layer
        self.e2 = np.dot(dw1, self.weights[1].T)
        dw2 = self.e2 * self.activation.calculate(self.a1, True)

        #update the weights
        w2_update = np.dot(self.a1.T, dw1) / len(X)
        w1_update = np.dot(X.T, dw2) / len(X)

        #update the biases
        b2_update = self.lr * np.sum(dw1, axis=0, keepdims=True) / len(X)
        b1_update = self.lr * np.sum(dw2, axis=0, keepdims=True) / len(X) 

        self.weights[1] -= self.lr * w2_update
        self.weights[0] -= self.lr * w1_update

        self.bias[1] -= self.lr * b2_update
        self.bias[0] -= self.lr * b1_update

        
    def TRAIN(self, X, y, epochs=5, testing=False):
        '''
            train the network for a given number of epochs
        '''
        for epoch in range(epochs):
            X_sample, y_sample = MiniBatchGD(X, y, batch_size=64).sample()
            self.forward_pass(X_sample)
            self.backward_pass(X_sample, y_sample)
            if testing: print(f'Epoch {epoch}, loss: {self.loss}, accuracy: {self.accuracy}')

    def TEST(self, X, y):
        '''
            test the network
        '''
        self.forward_pass(X)
        self.backward_pass(X, y)
        print(f'loss: {self.loss}, accuracy: {self.accuracy}')
    

In [10]:
nn = NeuralNetwork(activation_name='sigmoid')
nn.TRAIN(X_train, y_train, epochs=100, testing=True)
nn.TEST(X_test, y_test)

  return 1 / (1 + np.exp (-x))


Epoch 0, loss: 0.178125, accuracy: 0.109375
Epoch 1, loss: 0.15625, accuracy: 0.21875
Epoch 2, loss: 0.140625, accuracy: 0.296875
Epoch 3, loss: 0.134375, accuracy: 0.328125
Epoch 4, loss: 0.11875, accuracy: 0.40625
Epoch 5, loss: 0.15, accuracy: 0.25
Epoch 6, loss: 0.125, accuracy: 0.375
Epoch 7, loss: 0.125, accuracy: 0.375
Epoch 8, loss: 0.109375, accuracy: 0.453125
Epoch 9, loss: 0.10625, accuracy: 0.46875
Epoch 10, loss: 0.13125, accuracy: 0.34375
Epoch 11, loss: 0.0875, accuracy: 0.5625
Epoch 12, loss: 0.090625, accuracy: 0.546875
Epoch 13, loss: 0.05625, accuracy: 0.71875
Epoch 14, loss: 0.128125, accuracy: 0.359375
Epoch 15, loss: 0.053125, accuracy: 0.734375
Epoch 16, loss: 0.065625, accuracy: 0.671875
Epoch 17, loss: 0.071875, accuracy: 0.640625
Epoch 18, loss: 0.05625, accuracy: 0.71875
Epoch 19, loss: 0.05625, accuracy: 0.71875
Epoch 20, loss: 0.05, accuracy: 0.75
Epoch 21, loss: 0.08125, accuracy: 0.59375
Epoch 22, loss: 0.06875, accuracy: 0.65625
Epoch 23, loss: 0.084375,