In [167]:
# import the dependencies
import numpy as np

#### [Problem 1] Creating a one-dimensional convolutional layer class that limits the number of channels to one

In [168]:
class SimpleConv1d():

  def forward(self, X, W, b):
    a = []
    for i in range(len(W)-1):
      a.append((np.matmul(X[i:i+len(W)],W))+b[0])
    return np.array(a)

  def backward(self, X, W, dA):
    db = np.sum(dA)
    dW = []
    for i in range(len(W)):
      dW.append(np.matmul(dA,X[i:i+len(dA)]))
    dW = np.array(dW)
    dX = []
    new_W = np.insert(W[::-1],0,0)
    new_W = np.append(new_W,0)
    for i in range(len(new_W)-1):
      dX.append(np.matmul(new_W[i:i+len(dA)],dA))
    dX = np.array(dX[::-1])
    return db, dW, dX

#### [Problem 2] Output size calculation after one-dimensional convolution

In [169]:
def output_size_calc(input_size, f, padding=0, stride=1):
  out_size = ((input_size+2*padding-f)/stride)+1
  return int(out_size)

#### [Problem 3] Experiment of one-dimensional convolutional layer with small array

In [170]:
X = np.array([1,2,3,4])
W = np.array([3, 5, 7])
b = np.array([1])
delta_a = np.array([10, 20])

In [171]:
s1dconv = SimpleConv1d()
forward_prop = s1dconv.forward(X, W, b)
db, dW, dX = s1dconv.backward(X, W, delta_a)
print(forward_prop)
print(db)
print(dW)
print(dX)

[35 50]
30
[ 50  80 110]
[ 30 110 170 140]


##### [Problem 4], [Problem 5], [Problem 6]


In [172]:
class SimpleInitializer:

    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, *shape):
        W = self.sigma * np.random.randn(*shape)
        return W
    
    def b(self, *shape):
        b = self.sigma * np.random.randn(*shape)
        return b

In [173]:
class SGD:
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        layer.W -= self.lr * layer.dW / len(layer.Z)
        layer.b -= self.lr * layer.dB / len(layer.Z)
        return layer

In [174]:
class Adagrad:
  def __init__(self,lr):
    self.lr = lr
    self.hW = 0
    self.hb = 0

  def update(self, layer):
    self.hW += layer.dW*layer.dW
    self.hb += layer.db*layer.db
    layer.W -= self.lr * layer.dW
    layer.b -= self.lr * layer.db
    return layer

In [175]:
class ReLU:
    
    def forward(self, A):
        self.A = A
        return np.clip(A, 0, None)
    
    def backward(self, dZ):
        return dZ * np.clip(np.sign(self.A), 0, None)

In [176]:
class Sigmoid():
  def __init__(self):
    pass
  
  def calc(self,X):
    return 1/(1+np.exp(-X))

class Tanh:
  def forward(self, A):
      self.A = A
      return np.tanh(A)
  
  def backward(self, dZ):
      return dZ * (1 - (np.tanh(self.A))**2)

class Softmax:
  def forward(self, X):
      self.Z = np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1,1)
      return self.Z

  def backward(self, Y):
      self.loss = self.loss_func(Y)
      return self.Z - Y

  def loss_func(self, Y, Z=None):
      if Z is None:
          Z = self.Z
      return (-1)*np.average(np.sum(Y*np.log(Z), axis=1))
  
  def calc(self,X):
    return np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1, 1)

In [177]:
import math

class XavierInitializer:
    
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(1 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def b(self, n_nodes2):
        b = self.sigma * np.random.randn(n_nodes2)
        return b

In [178]:
# mini batch
class GetMiniBatch:
    """
Iterator to get a mini-batch
    Parameters
    ----------
    X : The following forms of ndarray, shape (n_samples, n_features)
      Training data
    y : The following form of ndarray, shape (n_samples, 1)
      Correct answer value
    batch_size : int
      Batch size
    seed : int
      NumPy random number seed
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [179]:
class FC:

    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.b = initializer.b(n_nodes2)
        
    def forward(self, X):
        self.X = X
        A = X@self.W + self.b
        return A
    
    def backward(self, dA):
        dZ = dA@self.W.T
        self.dB = np.sum(dA, axis=0)
        self.dW = self.X.T@dA
        self.optimizer.update(self)
        return dZ

In [180]:
class Conv1d:
  
  def __init__(self, batch_size, initializer, optimizer, input_size_channel=1, output_size_channel=1, padding=0):
    self.batch_size = batch_size
    self.input_size_channel = input_size_channel
    self.output_size_channel = output_size_channel
    self.padding = padding
    self.initializer =  initializer
    self.optimizer = optimizer
    self.W = initializer.W(input_size_channel, output_size_channel, batch_size)
    self.b = initializer.b(output_size_channel)

  def forward(self, X):
    self.n_in = X.shape[-1]
    self.n_out = output_size_calc(self.n_in, self.batch_size, self.padding)
    X = X.reshape(self.input_size_channel, self.n_in)
    self.X = np.pad(X, ((0,0), ((self.batch_size-1), 0)))
    self.X1 = np.zeros((self.input_size_channel, self.batch_size, self.n_in+(self.batch_size-1)))
    for i in range(self.batch_size):
        self.X1[:, i] = np.roll(self.X, -i, axis=-1)
    A = np.sum(self.X1[:, :, self.batch_size-1-self.padding:self.n_in+self.padding]*self.W[:, :, :, np.newaxis], axis=(1, 2)) + self.b.reshape(-1,1)
    return A

  def backward(self, dA):
    self.dW = np.sum(np.dot(dA, self.X1[:, :, self.batch_size-1-self.padding:self.input_size+self.padding, np.newaxis]), axis=-1)
    self.dB = np.sum(dA, axis=1)
    self.dA = np.pad(dA, ((0,0), (0, (self.batch_size-1))))
    self.dA1 = np.zeros((self.output_size_channel, self.batch_size, self.dA.shape[-1]))
    for i in range(self.batch_size):
        self.dA1[:, i] = np.roll(self.dA, i, axis=-1)
    dX = np.sum(self.W@self.dA1, axis=0)
    self.optimizer.update(self)
    return dX

In [181]:
model_1 = Conv1d(3, SimpleInitializer(0.01), SGD(0.01), 2, 3, 0)

In [182]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) 
model_1.W = np.ones((3, 2, 3), dtype=float)
model_1.b = np.array([1, 2, 3], dtype=float)
model_1_forward = model_1.forward(x)
model_1_forward

array([[16., 22.],
       [17., 23.],
       [18., 24.]])

#### [Problem 7] (Advance assignment) Arbitrary number of strides

In [183]:
class AdvConv1d:

  def __init__(self, b_size, initializer, optimizer, n_in_channels=1, n_out_channels=1, pa=0, stride=1):
    self.b_size = b_size
    self.optimizer = optimizer
    self.pa = pa
    self.stride = stride
    self.W = initializer.W(n_out_channels, n_in_channels, b_size)
    self.b = initializer.b(n_out_channels)
    self.n_in_channels = n_in_channels
    self.n_out_channels = n_out_channels
    self.n_out = None
        
  def forward(self, X):
    self.n_samples = X.shape[0]
    self.n_in = X.shape[-1]
    self.n_out = output_size_calc(self.n_in, self.b_size, self.pa, self.stride)
    X = X.reshape(self.n_samples, self.n_in_channels, self.n_in)
    self.X = np.pad(X, ((0,0), (0,0), ((self.b_size-1), 0)))
    self.X1 = np.zeros((self.n_samples, self.n_in_channels, self.b_size, self.n_in+(self.b_size-1)))
    for i in range(self.b_size):
        self.X1[:, :, i] = np.roll(self.X, -i, axis=-1)
    A = np.sum(self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride]*self.W[:, :, :, np.newaxis], axis=(2, 3)) + self.b.reshape(-1,1)
    return A
  
  def backward(self, dA):
    self.dW = np.sum(dA[:, :, np.newaxis, np.newaxis]*self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride], axis=(0, -1))
    self.dB = np.sum(dA, axis=(0, -1))
    self.dA = np.pad(dA, ((0,0), (0,0), (0, (self.b_size-1))))
    self.dA1 = np.zeros((self.n_samples, self.n_out_channels, self.b_size, self.dA.shape[-1]))
    for i in range(self.b_size):
        self.dA1[:, :, i] = np.roll(self.dA, i, axis=-1)
    dX = np.sum(self.W[:, :, :, np.newaxis]*self.dA1[:, :, np.newaxis], axis=(1,3))
    self.optimizer.update(self)
    return dX

In [184]:
model_2 = AdvConv1d(3, SimpleInitializer(0.01), SGD(0.01), 2, 3, 0, 2)
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) 
model_2.W = np.ones((3, 2, 3), dtype=float)
model_2.b = np.array([1, 2, 3], dtype=float)
model_2_forward = model_1.forward(x)
model_2_forward

array([[16., 22.],
       [17., 23.],
       [18., 24.]])

#### [Problem 8] Learning and estimation

In [185]:
class SGD:
  def __init__(self, lr):
      self.lr = lr
  
  def update(self, layer):
      layer.W -= self.lr * layer.dW
      layer.b -= self.lr * layer.dB
      return

In [186]:
import math
class ScratchCNNClassifier:
    
    def __init__(self, num_epoch=10, lr=0.01, batch_size=20, n_features=784, n_nodes1=400, n_nodes2=200, n_output=10, verbose=True, Activater=Tanh, Optimizer=SGD):
        self.num_epoch = num_epoch
        self.lr = lr
        self.verbose = verbose  
        self.batch_size = batch_size 
        self.n_features = n_features 
        self.n_nodes2 = n_nodes2 
        self.n_output = n_output 
        self.Activater = Activater
        if Activater == Sigmoid or Activater == Tanh:
            self.Initializer = XavierInitializer
        elif Activater == ReLU:
            self.Initializer = HeInitializer
        self.Optimizer = Optimizer
    
    def fit(self, X, y, X_val=None, y_val=None):
        self.val_enable = False
        if X_val is not None:
            self.val_enable = True
        self.AdvConv1d = AdvConv1d(b_size=7, initializer=SimpleInitializer(0.01), optimizer=self.Optimizer(self.lr), n_in_channels=1, n_out_channels=1, pa=3, stride=2)
        self.AdvConv1d.n_out = output_size_calc(X.shape[-1], self.AdvConv1d.b_size, self.AdvConv1d.pa, self.AdvConv1d.stride)
        self.activation1 = self.Activater()
        self.FC2 = FC(1*self.AdvConv1d.n_out, self.n_nodes2, self.Initializer(), self.Optimizer(self.lr))
        self.activation2 = self.Activater()
        self.FC3 = FC(self.n_nodes2, self.n_output, self.Initializer(), self.Optimizer(self.lr))
        self.activation3 = Softmax()
        
        self.loss = []
        self.loss_epoch = [self.activation3.loss_func(y, self.forward_propagation(X))]
        
        for _ in range(self.num_epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            self.iter = len(get_mini_batch)
            for mini_X, mini_y in get_mini_batch:
                self.forward_propagation(mini_X)
                self.back_propagation(mini_X, mini_y)
                self.loss.append(self.activation3.loss)
            self.loss_epoch.append(self.activation3.loss_func(y, self.forward_propagation(X)))
        
    def predict(self, X):
        return np.argmax(self.forward_propagation(X), axis=1)
    
    def forward_propagation(self, X):
        A1 = self.AdvConv1d.forward(X)
        A1 = A1.reshape(A1.shape[0], A1.shape[-1])
        Z1 = self.activation1.forward(A1)
        A2 = self.FC2.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)
        return Z3
        
    def back_propagation(self, X, y_true):
        dA3 = self.activation3.backward(y_true) 
        dZ2 = self.FC3.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC2.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dA1 = dA1[:, np.newaxis]
        dZ0 = self.AdvConv1d.backward(dA1) 

In [187]:
# importing dependencies
import matplotlib.pyplot as plt
from keras.datasets import mnist
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# get subsets
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# flattening the subsets
X_train = X_train.reshape(-1,784)
X_test = X_test.reshape(-1,784)

# pre processing
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

# splitting our subsets into train and validation subsets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)

enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_val[:, np.newaxis])

X_train_, X_val, y_train_, y_val = train_test_split(X_train, y_train_one_hot, test_size=0.2)

In [190]:
model_3 = ScratchCNNClassifier(num_epoch=20, lr=0.01, batch_size=20, n_features=784, n_nodes1=400, n_nodes2=400, n_output=10, verbose=True, Activater=Tanh, Optimizer=SGD)
model_3.fit(X_train_, y_train_)
y_pred = model_3.predict(X_test)

In [191]:
accuracy_score(y_test, y_pred)

0.9758