In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pickle
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
import os, sys
os.chdir("/content/gdrive/MyDrive")
os.getcwd()

'/content/gdrive/MyDrive'

In [None]:
#reading the .npy files to load the arrays
with open("train_file.npy", "rb") as f:
  train = np.load(f)

with open("valid_file.npy", "rb") as f:
  valid = np.load(f)

with open("test_file.npy", "rb") as f:
  test = np.load(f)

train = train.reshape(63325, 1, 100, 100)
valid = valid.reshape(450, 1, 100, 100)
test = test.reshape(450, 1, 100, 100)

In [None]:
#Reading for the labels
train_labels, valid_labels, test_labels = [], [], []

with open("train.txt", "r") as f:
  lines = [line.rstrip() for line in f]

for line in lines:
  s = line.split(" ")
  train_labels.append(int(s[1]))

with open("test.txt", "r") as f:
  lines = [line.rstrip() for line in f]

for line in lines:
  s = line.split(" ")
  test_labels.append(int(s[1]))

with open("val.txt", "r") as f:
  lines = [line.rstrip() for line in f]

for line in lines:
  s = line.split(" ")
  valid_labels.append(int(s[1]))

train_labels = np.eye(50)[train_labels]
valid_labels = np.eye(50)[valid_labels]
test_labels = np.eye(50)[test_labels]

In [None]:
print(train.shape)
print(valid.shape)
print(test.shape)


(63325, 1, 100, 100)
(450, 1, 100, 100)
(450, 1, 100, 100)


In [None]:
train=train.astype("float32")/255
valid=valid.astype("float32")/255
test=test.astype("float32")/255

In [None]:
class Conv2D:
  def __init__(self, inputs_channel, num_filters, kernel_size, padding, stride, learning_rate, name):
    # weight size: (F, C, K, K)
    # bias size: (F) 
    self.F = num_filters
    self.K = kernel_size
    self.C = inputs_channel

    self.weights = np.zeros((self.F, self.C, self.K, self.K))
    self.bias = np.zeros((self.F, 1))
    for i in range(0,self.F):
        self.weights[i,:,:,:] = np.random.normal(loc=0, scale=np.sqrt(1./(self.C*self.K*self.K)), size=(self.C, self.K, self.K))

    self.p = padding
    self.s = stride
    self.lr = learning_rate
    self.name = name

  def zero_padding(self, inputs, size):
    w, h = inputs.shape[0], inputs.shape[1]
    new_w = 2 * size + w
    new_h = 2 * size + h
    out = np.zeros((new_w, new_h))
    out[size:w+size, size:h+size] = inputs
    return out

  def forward(self, inputs):
    # input size: (C, W, H)
    # output size: (N, F ,WW, HH)
    C = inputs.shape[0]
    W = inputs.shape[1]+2*self.p
    H = inputs.shape[2]+2*self.p
    self.inputs = np.zeros((C, W, H))
    for c in range(inputs.shape[0]):
        self.inputs[c,:,:] = self.zero_padding(inputs[c,:,:], self.p)
    WW = int((W - self.K)/self.s + 1)
    HH = int((H - self.K)/self.s + 1)
    feature_maps = np.zeros((self.F, WW, HH))
    for f in range(self.F):
      for w in range(0, WW, self.s):
        for h in range(0, HH, self.s):
          feature_maps[f,w,h]=np.sum(self.inputs[:,w:w+self.K,h:h+self.K]*self.weights[f,:,:,:])+self.bias[f]

    return feature_maps

  def backward(self, dy):

    C, W, H = self.inputs.shape
    dx = np.zeros(self.inputs.shape)
    dw = np.zeros(self.weights.shape)
    db = np.zeros(self.bias.shape)

    F, W, H = dy.shape
    for f in range(F):
      for w in range(0, W, self.s):
        for h in range(0, H, self.s):
          dw[f,:,:,:]+=dy[f,w,h]*self.inputs[:,w:w+self.K,h:h+self.K]
          dx[:,w:w+self.K,h:h+self.K]+=dy[f,w,h]*self.weights[f,:,:,:]

    for f in range(F):
      db[f] = np.sum(dy[f, :, :])

    self.weights -= self.lr * dw
    self.bias -= self.lr * db
    return dx

  def extract(self):
    return {self.name+'.weights':self.weights, self.name+'.bias':self.bias}

  def feed(self, weights, bias):
    self.weights = weights
    self.bias = bias

class Maxpooling2D:

  def __init__(self, pool_size, stride, name):
    self.pool = pool_size
    self.s = stride
    self.name = name

  def forward(self, inputs):
    C, W, H = inputs.shape
    if (W%2) == 1 and (H%2) == 1:
      input_alt = np.zeros((C,W+1,H+1))
      input_alt[:,0:W,0:H] = inputs
      self.inputs = input_alt
      C, W, H = input_alt.shape
      self.org_odd = True

    else:
      self.inputs = inputs
      self.org_odd = False

    new_width = int((W - (self.pool-1)-1)/self.s + 1)
    new_height = int((H - (self.pool-1)-1)/self.s + 1)
    out = np.zeros((C, new_width, new_height))
    for c in range(C):
      for w in range(new_width):
        for h in range(new_height):
          out[c, w, h] = np.max(self.inputs[c, w*self.s:w*self.s+self.pool, h*self.s:h*self.s+self.pool])
    return out

  def backward(self, dy):
    C, W, H = self.inputs.shape
    dx = np.zeros(self.inputs.shape)
    for c in range(C):
      for w in range(0, W, self.pool):
        for h in range(0, H, self.pool):
          st = np.argmax(self.inputs[c,w:w+self.pool,h:h+self.pool])
          (idx, idy) = np.unravel_index(st, (self.pool, self.pool))
          dx[c, w+idx, h+idy] = dy[c, int(w/self.pool), int(h/self.pool)]
    if self.org_odd == True:
      dx = dx[:,0:(W-1),0:(H-1)]

    return dx

  def extract(self):
    return 

class Dense:

  def __init__(self, num_inputs, num_outputs, learning_rate, name):
    self.weights = 0.01*np.random.rand(num_inputs, num_outputs)
    self.bias = np.zeros((num_outputs, 1))
    self.lr = learning_rate
    self.name = name

  def forward(self, inputs):
    self.inputs = inputs
    self.output = np.dot(self.inputs, self.weights) + self.bias.T
    return self.output

  def backward(self, dy):

    if dy.shape[0] == self.inputs.shape[0]:
        dy = dy.T
    dw = dy.dot(self.inputs)
    db = np.sum(dy, axis=1, keepdims=True)
    dx = np.dot(dy.T, self.weights.T)

    self.weights -= self.lr * dw.T
    self.bias -= self.lr * db

    return dx

  def extract(self):
    return {self.name+'.weights':self.weights, self.name+'.bias':self.bias}

  def feed(self, weights, bias):
    self.weights = weights
    self.bias = bias

class Flatten:
  def __init__(self):
    pass
  def forward(self, inputs):
    self.C, self.W, self.H = inputs.shape
    return inputs.reshape(1, self.C*self.W*self.H)
  def backward(self, dy):
    return dy.reshape(self.C, self.W, self.H)
  def extract(self):
    return

class ReLu:
  def __init__(self):
    pass
  def forward(self, inputs):
    self.inputs = inputs
    self.outputs = np.maximum(0, inputs)
    return self.outputs
  def backward(self, dy):
    dx = np.where(self.inputs > 0, dy, 0)
    return dx
  def extract(self):
    return

class Sigmoid:
  def __init__(self):
    pass
  def forward(self, inputs):
    self.inputs = inputs
    self.outputs = inputs/(1+np.exp(-inputs))
    return self.outputs
  def backward(self, dy):
    f_x = self.outputs
    f_prime_x = (1 / (1 + np.exp(-self.inputs))) - self.x * np.exp(-self.inputs) / (1 + np.exp(-self.inputs))**2
    dx = dy * f_prime_x
    return dx
  def extract(self):
    pass

class Softmax:
  def __init__(self):
      pass
  def forward(self, inputs):
      exp = np.exp(inputs, dtype="float32")
      self.out = exp/np.sum(exp)
      return self.out
  def backward(self, dy):
      return self.out.T - dy.reshape(dy.shape[0],1)
  def extract(self):
      return
    
def cross_entropy(inputs, labels):

  out_num = labels.shape[0]
  p = np.sum(labels.reshape(1,out_num)*inputs)
  loss = -np.log(p+0.0000001)
  return loss

In [None]:
class NN:
  def __init__(self):
    lr = 0.01
    self.layers = []
    self.layers.append(Dense(num_inputs=10000, num_outputs=512, learning_rate=lr, name='fc1'))
    self.layers.append(ReLU())
    self.layers.append(Dense(num_inputs=512, num_outputs=50, learning_rate=lr, name='fc2'))
    self.layers.append(Softmax())
    self.lay_num = len(self.layers)

  def forward(self, x):
    output = x
    for i in range(self.lay_num):
      output = self.layers[i].forward(output)
    return output

  def backward(self, y):
    for i in range(self.lay_num - 1, -1, -1):
      y = self.layers[i].backward(y)
    return y#actually not needed

  def predict(self, data, label):
    total_acc = 0
    data_size = data.shape[0]
    for i in range(data_size):
      x = data[i]
      y = label[i]
      for l in range(self.lay_num):
          output = self.layers[l].forward(x)
          x = output
      if np.argmax(output) == np.argmax(y):
          total_acc += 1
    return float(total_acc)

  def extract(self, weights_file):
    # dump weights and bias
    obj = []
    for i in range(self.lay_num):
      cache = self.layers[i].extract()
      obj.append(cache)
    with open(weights_file, 'wb') as handle:
      pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)

  def train(self, training_data, training_label, batch_size, epoch, weights_file = None):
    self.acc = []
    self.vacc = []
    total_acc = 0
    X_train, X_valid, y_train, y_valid = train_test_split(training_data, training_label, test_size=0.2, stratify=training_label)

    for e in range(epoch):
      for batch_index in range(0, X_train.shape[0], batch_size):#taking datas <= batch size
        if batch_index + batch_size < X_train.shape[0]:
          data = X_train[batch_index:batch_index+batch_size]
          label = training_label[batch_index:batch_index + batch_size]
        else:
          data = X_train[batch_index:X_train.shape[0]]
          label = training_label[batch_index:training_label.shape[0]]
        loss = 0
        acc = 0
        dy = []
        for b in range(batch_size):#use the batch to forward
          x = data[b]
          y = label[b]
          output = self.forward(x)
          loss += cross_entropy(output, y)
          if np.argmax(output) == np.argmax(y):
              acc += 1
              total_acc += 1
          dy.append(y)
        for y in dy:
          self.backward(y)
        # result
        loss /= batch_size
      
  
      training_acc = float(total_acc)/float((batch_index+batch_size)*(e+1))      
      valid_acc = self.predict(X_valid, y_valid)
      self.acc.append(training_acc)
      self.vacc.append(valid_acc)
      print('=== Epoch: {0:d}/{1:d} === Iter:{2:d} === Loss: {3:.2f} ====== TAcc: {4:.2f} ====== VAcc: {5:.2f} ======'.format(e,epoch,batch_index+batch_size,loss,training_acc, valid_acc))

    if (weights_file is not None):
      self.extract(weights_file)

  def acc_plot(self):
    epochs = range(1, len(self.acc) + 1)

    # 畫出兩條線
    plt.plot(epochs, self.acc, 'b', label='Training accuracy')
    plt.plot(epochs, self.vacc, 'r', label='Validation accuracy')

    # 設置標題、x軸和y軸的標籤
    plt.title('Training and validation accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

In [None]:
nn = NN()

train_nn = train.reshape(63325, 1, 10000)
valid_nn = valid.reshape(450, 1, 10000)
test_nn = test.reshape(450, 1, 10000)

nn.train(train_nn, train_labels, 128, 5)
nn.acc_plot()

print(nn.predict(valid_nn, valid_labels))
print(nn.predict(test_nn, test_labels))

In [None]:
class Lenet5:
  def __init__(self):
    lr = 0.01
    self.layers = []
    self.layers.append(Conv2D(inputs_channel=1, num_filters=6, kernel_size=5, padding=0, stride=1, learning_rate=lr, name='conv1'))
    self.layers.append(ReLu())
    self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool2'))
    self.layers.append(Conv2D(inputs_channel=6, num_filters=16, kernel_size=5, padding=0, stride=1, learning_rate=lr, name='conv3'))
    self.layers.append(ReLu())
    self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool4'))
    self.layers.append(Conv2D(inputs_channel=16, num_filters=120, kernel_size=22, padding=0, stride=1, learning_rate=lr, name='conv5'))
    self.layers.append(ReLu())
    self.layers.append(Flatten())
    self.layers.append(Dense(num_inputs=120, num_outputs=84, learning_rate=lr, name='fc6'))
    self.layers.append(ReLu())
    self.layers.append(Dense(num_inputs=84, num_outputs=50, learning_rate=lr, name='fc7'))
    self.layers.append(Softmax())
    self.lay_num = len(self.layers)

  
  def forward(self, x):
    output = x
    for i in range(self.lay_num):
      output = self.layers[i].forward(output)
    return output

  def backward(self, y):
    for i in range(self.lay_num - 1, -1, -1):
      y = self.layers[i].backward(y)
    return y#actually not needed

  def predict(self, data, label):
    total_acc = 0
    data_size = data.shape[0]
    for i in range(data_size):
      x = data[i]
      y = label[i]
      for l in range(self.lay_num):
          output = self.layers[l].forward(x)
          x = output
      if np.argmax(output) == np.argmax(y):
          total_acc += 1
    return float(total_acc)
    #print("Accuracy: ", float(total_acc)/float(data_size))

  def extract(self, weights_file):
    # dump weights and bias
    obj = []
    for i in range(self.lay_num):
      cache = self.layers[i].extract()
      obj.append(cache)
    with open(weights_file, 'wb') as handle:
      pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)

  def train(self, training_data, training_label, batch_size, epoch, weights_file = None):
    self.acc = []
    self.vacc = []
    total_acc = 0
    X_train, X_valid, y_train, y_valid = train_test_split(training_data, training_label, test_size=0.2, stratify=training_label)

    for e in range(epoch):
      for batch_index in range(0, X_train.shape[0], batch_size):#taking datas <= batch size
        if batch_index + batch_size < X_train.shape[0]:
          data = X_train[batch_index:batch_index+batch_size]
          label = training_label[batch_index:batch_index + batch_size]
        else:
          data = X_train[batch_index:X_train.shape[0]]
          label = training_label[batch_index:training_label.shape[0]]
        loss = 0
        acc = 0
        dy = []
        for b in range(batch_size):#use the batch to forward
          x = data[b]
          y = label[b]
          output = self.forward(x)
          loss += cross_entropy(output, y)
          if np.argmax(output) == np.argmax(y):
              acc += 1
              total_acc += 1
          dy.append(y)
        for y in dy:
          self.backward(y)
        # result
        loss /= batch_size
  
      training_acc = float(total_acc)/float((batch_index+batch_size)*(e+1))      
      valid_acc = self.predict(X_valid, y_valid)
      self.acc.append(training_acc)
      self.vacc.append(valid_acc)
      print('=== Epoch: {0:d}/{1:d} === Iter:{2:d} === Loss: {3:.2f} ====== TAcc: {4:.2f} ====== VAcc: {5:.2f} ======'.format(e,epoch,batch_index+batch_size,loss,training_acc, valid_acc))

    if (weights_file is not None):
      self.extract(weights_file)

  def acc_plot(self):
    epochs = range(1, len(self.acc) + 1)

    # 畫出兩條線
    plt.plot(epochs, self.acc, 'b', label='Training accuracy')
    plt.plot(epochs, self.vacc, 'r', label='Validation accuracy')

    # 設置標題、x軸和y軸的標籤
    plt.title('Training and validation accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()


In [None]:
net = Lenet5()
net.train(train, train_labels, 128, 5)
net.acc_plot
print(net.predict(valid, valid_labels))
print(net.predict(test, test_labels))

In [None]:
class Lenet5_imp:
  def __init__(self):
    lr = 0.01
    self.layers = []
    self.layers.append(Conv2D(inputs_channel=1, num_filters=6, kernel_size=3, padding=0, stride=1, learning_rate=lr, name='conv1'))
    self.layers.append(Conv2D(inputs_channel=6, num_filters=16, kernel_size=3, padding=0, stride=1, learning_rate=lr, name='conv2'))
    self.layers.append(Sigmoid())
    self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool3'))
    self.layers.append(Conv2D(inputs_channel=16, num_filters=16, kernel_size=3, padding=0, stride=1, learning_rate=lr, name='conv4'))
    self.layers.append(Sigmoid())
    self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool5'))
    self.layers.append(Conv2D(inputs_channel=16, num_filters=120, kernel_size=23, padding=0, stride=1, learning_rate=lr, name='conv6'))
    self.layers.append(Sigmoid())
    self.layers.append(Flatten())
    self.layers.append(Dense(num_inputs=120, num_outputs=84, learning_rate=lr, name='fc7'))
    self.layers.append(Sigmoid())
    self.layers.append(Dense(num_inputs=84, num_outputs=50, learning_rate=lr, name='fc8'))
    self.layers.append(Softmax())
    self.lay_num = len(self.layers)

  
  def forward(self, x):
    output = x
    for i in range(self.lay_num):
      output = self.layers[i].forward(output)
    return output

  def backward(self, y):
    for i in range(self.lay_num - 1, -1, -1):
      y = self.layers[i].backward(y)
    return y#actually not needed

  def predict(self, data, label):
    total_acc = 0
    data_size = data.shape[0]
    for i in range(data_size):
      x = data[i]
      y = label[i]
      for l in range(self.lay_num):
          output = self.layers[l].forward(x)
          x = output
      if np.argmax(output) == np.argmax(y):
          total_acc += 1
    return float(total_acc)
    #print("Accuracy: ", float(total_acc)/float(data_size))

  def extract(self, weights_file):
    # dump weights and bias
    obj = []
    for i in range(self.lay_num):
      cache = self.layers[i].extract()
      obj.append(cache)
    with open(weights_file, 'wb') as handle:
      pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)

  def train(self, training_data, training_label, batch_size, epoch, weights_file = None):
    self.acc = []
    self.vacc = []
    total_acc = 0
    X_train, X_valid, y_train, y_valid = train_test_split(training_data, training_label, test_size=0.2, stratify=training_label)

    for e in range(epoch):
      for batch_index in range(0, X_train.shape[0], batch_size):#taking datas <= batch size
        if batch_index + batch_size < X_train.shape[0]:
          data = X_train[batch_index:batch_index+batch_size]
          label = training_label[batch_index:batch_index + batch_size]
        else:
          data = X_train[batch_index:X_train.shape[0]]
          label = training_label[batch_index:training_label.shape[0]]
        loss = 0
        acc = 0
        dy = []
        for b in range(batch_size):#use the batch to forward
          x = data[b]
          y = label[b]
          output = self.forward(x)
          loss += cross_entropy(output, y)
          if np.argmax(output) == np.argmax(y):
              acc += 1
              total_acc += 1
          dy.append(y)
        for y in dy:
          self.backward(y)
        # result
        loss /= batch_size
  
      training_acc = float(total_acc)/float((batch_index+batch_size)*(e+1))      
      valid_acc = self.predict(X_valid, y_valid)
      self.acc.append(training_acc)
      self.vacc.append(valid_acc)
      print('=== Epoch: {0:d}/{1:d} === Iter:{2:d} === Loss: {3:.2f} ====== TAcc: {4:.2f} ====== VAcc: {5:.2f} ======'.format(e,epoch,batch_index+batch_size,loss,training_acc, valid_acc))

    if (weights_file is not None):
      self.extract(weights_file)

  def acc_plot(self):
    epochs = range(1, len(self.acc) + 1)

    # 畫出兩條線
    plt.plot(epochs, self.acc, 'b', label='Training accuracy')
    plt.plot(epochs, self.vacc, 'r', label='Validation accuracy')

    # 設置標題、x軸和y軸的標籤
    plt.title('Training and validation accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()


In [None]:
net2 = Lenet5_imp()
net2.train(train, train_labels, 128, 5)
net2.acc_plot()
print(net2.predict(valid, valid_labels))
print(net2.predict(test, test_labels))