<a href="https://colab.research.google.com/github/ducbao811/diveintocode-ml/blob/master/1D_Convolutional_Neural_Network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [Problem 1]

In [1]:
import numpy as np

In [2]:
class ConvolutionalLayer:
  def __init__(self, W, b):
    self.W = W
    self.b = b
  
  def forward(self, X):
    self.X = X
    filterSize = len(self.W)
    A = np.empty(filterSize - 1)
    for i in range(filterSize - 1):
      A[i] = X[ i : i + filterSize] @ self.W + self.b
    return A
  
  def backward(self, dA):
    
    """
        Backward
        Parameters
        ----------
        dA : The error receives from the later layer 
        Returns
        ----------
        dX : Gradient to flow forward
    """

    self.dB = np.sum(dA, axis = 0)
    self.dW = np.empty(len(self.W))

    for i in range(len(self.dW)):
      self.dW[i] = dA @ self.X[ i : i + len(dA) ]
    
    dX = np.empty(len(self.X))

    for j in range(len(dX)):

      for s in range(len(self.W)):
        if s < j + 1 and s > j - len(dA):
          dX[j] += dA[j-s] * self.W[s]    

    return self.dW, self.dB, dX

# [Problem 2]

In [3]:
def output_size_calculation(n_in, P, F, S):
    return int((n_in + 2*P - F) / S + 1)

# [Problem 3]

In [4]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])
delta_a = np.array([10, 20])

In [5]:
SCD1 = ConvolutionalLayer(w,b)
a = SCD1.forward(x)
print(a)
db, dw, dx = SCD1.backward(delta_a)
print(db)
print(dw)
print(dx)

[35. 50.]
[ 50.  80. 110.]
30
[ 31. 111. 171. 140.]


# [Problem 4]

In [6]:
class GetMiniBatch:

    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
        
    def __len__(self):
        return self._stop
    
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1] 
    
    def __iter__(self):
        self._counter = 0
        return self
    
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [7]:
class SimpleInitializer:

    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, *shape):
        W = self.sigma * np.random.randn(*shape)
        return W
    
    def B(self, *shape):
        B = self.sigma * np.random.randn(*shape)
        return B

In [8]:
class SGD:
    """
    Stochastic gradient descent
    Parameters
    ----------
    lr : Learning rate
    """
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        """
        Update weights and biases for a layer
        Parameters
        ----------
        layer : Instance of the layer before update
        """
        layer.W -= self.lr * layer.dW
        layer.b -= self.lr * layer.dB

        return layer

In [9]:
class AdaGrad:
    def __init__(self, lr):
        self.lr = lr
        self.delta = 1e-7
    def update(self, layer):
        """
        Update weights and biases for a layer
        Parameters
        ----------
        layer : Instance of the layer before update
        """
        # Update sum of squares of the gradient including the current iteration 
        layer.HW += layer.dW * layer.dW
        layer.HB += layer.dB * layer.dB

        # Update parameters
        layer.W -= self.lr * layer.dW / (np.sqrt(layer.HW) + self.delta)
        layer.b -= self.lr * layer.dB / (np.sqrt(layer.HB) + self.delta)

        return layer

In [38]:
def output_size_calculation(n_in, F, P=0, S=1):
    n_out = int((n_in + 2*P - F) / S + 1)
    return n_out

a one-dimensional convolutional layer class that does not limit the number of channels

In [105]:
class ConvolutionalLayer1D:
    
    def __init__(self, batch_size, optimizer=AdaGrad, initializer=SimpleInitializer, 
                 n_in_channels=1, n_out_channels=1, padding=0):
        self.batch_size = batch_size
        self.optimizer = optimizer
        self.W = initializer.W(n_out_channels, n_in_channels, batch_size)
        self.b = initializer.B(n_out_channels)
        self.padding = padding
        self.n_out_channels = n_out_channels
        self.n_in_channels = n_in_channels
        self.n_out = None
    
    def forward(self, X):
        self.n_in = X.shape[-1] # number of features in input 
        # Calcualte number of features in output
        self.n_out = output_size_calculation(n_in=self.n_in, F = self.batch_size, 
                                             P=self.padding)
        X = X.reshape(self.n_in_channels, self.n_in)
        self.X = np.pad(X, ((0,0), ((self.batch_size-1), 0))) # Add padding to input
        self.X1 = np.zeros((self.n_in_channels, self.batch_size, self.n_in+(self.batch_size-1)))
        for i in range(self.batch_size):
            self.X1[:, i] = np.roll(self.X, -i, axis=-1)
        A = np.sum(self.X1[:, :, self.batch_size-1-self.padding : self.n_in + self.padding] * 
                   self.W[:, :, :, np.newaxis], axis=(1, 2)) + self.b.reshape(-1,1)
        return A
    
    def backward(self, dA):
        self.dW = np.sum(np.dot(dA, self.X1[:, :, self.batch_size - 1 - self.padding : self.n_in+self.padding, np.newaxis]),
                         axis=-1)
        self.dB = np.sum(dA, axis=1)
        self.dA = np.pad(dA, ((0,0), (0, (self.b_size-1))))
        self.dA1 = np.zeros((self.n_out_channels, self.batch_size, self.dA.shape[-1]))
        for i in range(self.b_size):
            self.dA1[:, i] = np.roll(self.dA, i, axis=-1)
        dX = np.sum(self.W@self.dA1, axis=0)
        self.optimizer.update(self)
        return dX

In [102]:
test = ConvolutionalLayer1D(batch_size=3, initializer=SimpleInitializer(0.01), 
                            optimizer=SGD(0.01), n_in_channels=2, n_out_channels=3, padding=0)

In [103]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) 
test.W = np.ones((3, 2, 3), dtype=float)
test.B = np.array([1, 2, 3], dtype=float)

In [104]:
testing = test.forward(x)
testing

array([[15.00914143, 21.00914143],
       [14.98904867, 20.98904867],
       [14.99111435, 20.99111435]])

#[Problem 7]

In [122]:
class ConvolutionalLayer1D_Stride:
    
    def __init__(self, b_size, initializer, optimizer, n_in_channels=1, n_out_channels=1, pa=0, stride=1):
        self.b_size = b_size
        self.optimizer = optimizer
        self.pa = pa
        self.stride = stride
        self.W = initializer.W(n_out_channels, n_in_channels, b_size)
        self.B = initializer.B(n_out_channels)
        self.n_in_channels = n_in_channels
        self.n_out_channels = n_out_channels
        self.n_out = None
        
    def forward(self, X):
        self.n_samples = X.shape[0]
        self.n_in = X.shape[-1]
        self.n_out = output_size_calculation(self.n_in, self.b_size, self.pa, self.stride)
        X = X.reshape(self.n_samples, self.n_in_channels, self.n_in)
        self.X = np.pad(X, ((0,0), (0,0), ((self.b_size-1), 0)))
        self.X1 = np.zeros((self.n_samples, self.n_in_channels, self.b_size, self.n_in+(self.b_size-1)))
        for i in range(self.b_size):
            self.X1[:, :, i] = np.roll(self.X, -i, axis=-1)
        A = np.sum(self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride]*self.W[:, :, :, np.newaxis], axis=(2, 3)) + self.B.reshape(-1,1)
        return A
    
    def backward(self, dA):
        self.dW = np.sum(dA[:, :, np.newaxis, np.newaxis]*self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride], axis=(0, -1))
        self.dB = np.sum(dA, axis=(0, -1))
        self.dA = np.pad(dA, ((0,0), (0,0), (0, (self.b_size-1))))
        self.dA1 = np.zeros((self.n_samples, self.n_out_channels, self.b_size, self.dA.shape[-1]))
        for i in range(self.b_size):
            self.dA1[:, :, i] = np.roll(self.dA, i, axis=-1)
        dX = np.sum(self.W[:, :, :, np.newaxis]*self.dA1[:, :, np.newaxis], axis=(1,3))
        self.optimizer.update(self)
        return dX

# [Problem 8]

Preparing the dataset

In [123]:
# Importing dataset

from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Reshaping for fitting
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)

# Coverting value to 0 and 1
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])

# Splitting data for training and validation
from sklearn.model_selection import train_test_split
X_training, X_val, y_training, y_val = train_test_split(X_train, y_train_one_hot, test_size=0.2)

class Sigmoid:
    
    def forward(self, A):
        self.A = A
        return self.sigmoid(A)
    
    def backward(self, dZ):
        _sig = self.sigmoid(self.A)
        return dZ * (1 - _sig)*_sig
    
    def sigmoid(self, X):
        return 1 / (1 + np.exp(-X))

class Tanh:
    
    def forward(self, A):
        self.A = A
        return np.tanh(A)
    
    def backward(self, dZ):
        return dZ * (1 - (np.tanh(self.A))**2)

class Softmax:
    
    def forward(self, X):
        self.Z = np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1,1)
        return self.Z
    
    def backward(self, Y):
        self.loss = self.loss_func(Y)
        return self.Z - Y
    
    def loss_func(self, Y, Z=None):
        if Z is None:
            Z = self.Z
        return (-1)*np.average(np.sum(Y*np.log(Z), axis=1))

class ReLU:
    
    def forward(self, A):
        self.A = A
        return np.clip(A, 0, None)
    
    def backward(self, dZ):
        return dZ * np.clip(np.sign(self.A), 0, None)

class FC:

    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        
    def forward(self, X):
        self.X = X
        A = X@self.W + self.B
        return A
    
    def backward(self, dA):
        dZ = dA@self.W.T
        self.dB = np.sum(dA, axis=0)
        self.dW = self.X.T@dA
        self.optimizer.update(self)
        return dZ

class XavierInitializer:
    
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(1 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B
    
class HeInitializer():
    
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(2 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B

class SGD:

    def __init__(self, lr):
        self.lr = lr
    
    def update(self, layer):
        layer.W -= self.lr * layer.dW
        layer.B -= self.lr * layer.dB
        return

class AdaGrad:
    
    def __init__(self, lr):
        self.lr = lr
        self.HW = 1
        self.HB = 1
    
    def update(self, layer):
        self.HW += layer.dW**2
        self.HB += layer.dB**2
        layer.W -= self.lr * np.sqrt(1/self.HW) * layer.dW
        layer.B -= self.lr * np.sqrt(1/self.HB) * layer.dB

In [128]:
import math
class ScratchCNNClassifier():
    """
    Simple three-layer Convolutional network classifier
    Parameters
    ----------
    Attributes
    ----------
    """
    def __init__(self, verbose = False, bias = True, lr =0.01, n_features = 784,
                 n_nodes1 = 400, n_nodes2 = 200, n_output = 10, sigma = 0.02, 
                 activate_func = Tanh, initializer = SimpleInitializer, optimizer=SGD):
        self.verbose = verbose
        self.has_bias = bias
        self.lr = lr
        self.n_features = n_features
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.n_output = n_output
        self.activation_function = activate_func
        self.batch_size = 20
        self.epoch = 10
        self.sigma = sigma
        self.initializer = initializer
        self.optimizer = optimizer

    def fit(self, X, y, X_val=None, y_val=None):
        """
        Learn a neural network classifier.
        Parameters
        ----------
        X : The following forms of ndarray, shape (n_samples, n_features)
            Features of training data
        y : The following form of ndarray, shape (n_samples,)
            Correct answer value of training data
        X_val : The following forms of ndarray, shape (n_samples, n_features)
            Features of verification data
        y_val : The following form of ndarray, shape (n_samples,)
            Correct value of verification data
        """
        self.training_loss = []
        self.testing_loss = []

        # Initilize optimizer and layers
        optimizer = self.optimizer(self.lr)
        self.Con1D = ConvolutionalLayer1D_Stride(b_size=7, initializer=SimpleInitializer(0.01), 
                                              optimizer=self.optimizer(self.lr), 
                                              n_in_channels=1, n_out_channels=1, pa=3, stride=2)
        self.Con1D.n_out = output_size_calculation(X.shape[-1], self.Con1D.b_size, self.Con1D.pa,
                                                   self.Con1D.stride)
        self.activation1 = self.activation_function()
        self.FC2 = FC(self.Con1D.n_out, self.n_nodes2, self.initializer(self.sigma), optimizer)
        self.activation2 = self.activation_function()
        self.FC3 = FC(self.n_nodes2, self.n_output, self.initializer(self.sigma), optimizer)
        self.activation3 = Softmax()

        for i in range (self.epoch):
          
          get_mini_batch = GetMiniBatch(X,y,batch_size=20)

          for mini_X_train, mini_y_train in get_mini_batch:
              self.forward_propagation(mini_X_train)
              self.backward_propagation(mini_X_train, mini_y_train)

          self.forward_propagation(X)
          self.training_loss.append(self._cross_entropy(y,self.Z3))

          if X_val is not None:
            self.forward_propagation(X_val)
            self.testing_loss.append(self._cross_entropy(y_val,self.Z3))
          
          if self.verbose:
            print("Epoch {}:\nTraining_loss: {}".format(i,self.training_loss[-1]))
            if X_val is not None:
              print("Validation loss: {}".format(self.testing_loss[-1]))

    def forward_propagation(self, X):
        """
        Implement forward propagation when training neural network
        """
        A1 = self.Con1D.forward(X)
        self.A1 = A1.reshape(A1.shape[0], A1.shape[-1])
        self.Z1 = self.activation1.forward(self.A1)
        self.A2 = self.FC2.forward(self.Z1)
        self.Z2 = self.activation2.forward(self.A2)
        self.A3 = self.FC3.forward(self.Z2)
        self.Z3 = self.activation3.forward(self.A3)

    def backward_propagation(self, X, y):
        """
        Implement backward propagation 
        """
        dA3 = self.activation3.backward(y) # The cross entropy error and softmax are matched.
        dZ2 = self.FC3.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC2.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dA1 = dA1[:, np.newaxis]
        dZ0 = self.Con1D.backward(dA1)
 


    def _cross_entropy(self, y, Z):
        return -np.sum(y*np.log(Z)) / len(y)


    def predict(self, X):
        """
        Estimate using a neural network classifier.
        Parameters
        ----------
        X : The following forms of ndarray, shape (n_samples, n_features)
            sample
        Returns
        -------
            The following form of ndarray, shape (n_samples, 1)
            Estimated result
        """
        self.forward_propagation(X)
        return np.argmax(self.Z3,axis=1)

In [129]:
model = ScratchCNNClassifier(verbose=True)
model.fit(X_training,y_training, X_val, y_val)

Epoch 0:
Training_loss: 0.14756797059721863
Validation loss: 0.15660655347662342
Epoch 1:
Training_loss: 0.10030125771699551
Validation loss: 0.12255594994263476
Epoch 2:
Training_loss: 0.08477536582851722
Validation loss: 0.11885066012967202
Epoch 3:
Training_loss: 0.07374055782868626
Validation loss: 0.11750401038569529
Epoch 4:
Training_loss: 0.059041328104462075
Validation loss: 0.1109224404467596
Epoch 5:
Training_loss: 0.04705336350288988
Validation loss: 0.10526581648741923
Epoch 6:
Training_loss: 0.03917914268363305
Validation loss: 0.1036922321907295
Epoch 7:
Training_loss: 0.03543416039515492
Validation loss: 0.10416897943203943
Epoch 8:
Training_loss: 0.025351647144638997
Validation loss: 0.09584269553024603
Epoch 9:
Training_loss: 0.018465606511038908
Validation loss: 0.0900642028036753


In [130]:
from sklearn.metrics import accuracy_score

training_pred_default = model.predict(X=X_train)
test_pred_default = model.predict(X=X_test)

print("Accuracy score for training set with default model: {}".format(accuracy_score(y_true=y_train, y_pred=training_pred_default)))
print("Accuracy score for testing set with default model: {}".format(accuracy_score(y_true=y_test,y_pred=test_pred_default)))


Accuracy score for training set with default model: 0.9905833333333334
Accuracy score for testing set with default model: 0.9726
