In [1]:
import numpy as np
import pandas as pd
# from tqdm import tqdm

In [None]:
class SkipConn(Layer):
    def __init__(self, layer1, layer2, input_size, output_size, activation="ReLU"):
        self.__super__()
        self.weight1 = np.random.rand(layer1.get_input_size(), output_size) - 0.5
        self.weight2 = np.random.rand(layer2.get_input_size(), output_size) - 0.5
        self.bias1 = np.random.rand(1, output_size) - 0.5
        self.bias2 = np.random.rand(1, output_size) - 0.5

    # returns output for a given input
    def forward(self, A_prev1, A_prev2):
        """
        A_prev1 : Input to the layer that is output of previous layer
        A_prev2 : Input to the layer that is skip connection of past layer
        """
        self.linear_cache = (A_prev1, A_prev2)
        
        self.activation_cache = (np.dot(A_prev1, self.weight1) + self.bias1, np.dot(A_prev2, self.weight2)+self.bias2)

        if self.activation == "ReLU":
            Z = (self._relu(self.activation_cache[0]), self._relu(self.activation[1]))

        elif self.activation == "sigmoid":
            Z = (self._sigmoid(self.activation_cache[0]), self._sigmoid(self.activation[1]))

        elif self.activation == "tanh":
            Z = (self._tanh(self.activation_cache[0]), self._tanh(self.activation[1]))

        assert Z[0].shape == Z[1].shape

        return Z[0] + Z[1]

    # computes dE/dW, dE/dB for a given output_error=dE/dY. Returns input_error=dE/dX.
    def backward(self, dA, learning_rate):
        dZ = None
        if self.activation == "ReLU":
            dZ = (self._relu_backward(self.activation_cache[0]) * dA, self._relu_backward(self.activation_cache[1]))

        elif self.activation == "sigmoid":
            dZ = (self._sigmoid_backward(self.activation_cache[0]) * dA, self._sigmoid_backward(self.activation_cache[1]))
        
        elif self.activation == "tanh":
            dZ = (self._tanh_backward(self.activation_cache[0]) * dA, self._tanh_backward(self.activation_cache[1]))
        

        dW1 = np.dot((self.linear_cache[0]).T, dZ[0])
        dW2 = np.dot((self.linear_cache[1]).T, dZ[1])

        # dBias = output_error
        dB1 = np.sum(dZ[0], axis=0, keepdims=True) / dZ[0].shape[0]
        dB2 = np.sum(dZ[1], axis=0, keepdims=True) / dZ[1].shape[0]

        dA1 = np.dot(dA[0], self.weight[0].T)        
        dA2 = np.dot(dA[1], self.weight[1].T)

        # update parameters
        self.weight1 -= learning_rate * dW1
        self.bias1 -= learning_rate * dB1

        self.weight2 -= learning_rate * dW2
        self.bias2 -= learning_rate * dB2

        return dA1, dA2


In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm

class Layer:
    def __init__(self, input_size, output_size, activation="ReLU"):
        self.input_size = input_size
        self.output_size = output_size
        self.activation = activation

    def get_input_size(self):
        return self.input_size

    def get_output_size(self):
        return self.output_size

    def _relu(self, Z):
        return np.maximum(0, Z)

    def _sigmoid(self, Z):
        return 1/(1 + np.exp(-Z))

    def _pseudosigmoid(self, Z):
        return 1/(1 + np.exp(Z))

    def _tanh(self, Z):
      return np.tanh(Z)

    def forward(self, A_prev):
        raise NotImplementedError()

    def _relu_backward(self, activation_cache):
        return np.where(activation_cache > 0, 1, 0)

    def _sigmoid_backward(self, activation_cache):
        _sig = self._sigmoid(activation_cache)
        return (_sig * (1 - _sig))
    
    def _pseudosigmoid_backward(self, activation_cache):
        _sig = self._pseudosigmoid(activation_cache)
        return (_sig * (_sig - 1))

    def _tanh_backward(self, activation_cache):
        # _sig = self._tanh(self.activation_cache)
        return ((1-np.tanh(activation_cache)**2))

    def backward(self, dA, learning_rate):
        raise NotImplementedError()


class Dense(Layer):
    # input_size = number of input neurons
    # output_size = number of output neurons
    def __init__(self, input_size, output_size, activation="ReLU"):
        Layer.__init__(self, input_size, output_size, activation)
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5
        self.activation = activation

    def summary(self):
        print("--------------------------------------------------------------")
        print(f"Weights : {self.weights.shape} \t bias : {self.bias.shape}")
        print(f"Input Shape : (None, {self.input_size})\nOutput Shape : (None, {self.output_size})")

    # returns output for a given input
    def forward(self, A_prev):
        self.linear_cache = A_prev
        self.activation_cache = np.dot(self.linear_cache, self.weights) + self.bias

        if self.activation == "ReLU":
            Z = self._relu(self.activation_cache)

        elif self.activation == "sigmoid":
            Z = self._sigmoid(self.activation_cache)

        elif self.activation == "pseudosigmoid":
            Z = self._pseudosigmoid(self.activation_cache)

        elif self.activation == "tanh":
            Z = self._tanh(self.activation_cache)

        return Z

    # computes dE/dW, dE/dB for a given output_error=dE/dY. Returns input_error=dE/dX.
    def backward(self, dA, learning_rate):
        dZ = None
        if self.activation == "ReLU":
            dZ = self._relu_backward(self.activation_cache) * dA
        
        elif self.activation == "sigmoid":
            dZ = self._sigmoid_backward(self.activation_cache) * dA

        elif self.activation == "pseudosigmoid":
            dZ = self._pseudosigmoid_backward(self.activation_cache) * dA

        elif self.activation == "tanh":
            dZ = self._tanh_backward(self.activation_cache) * dA
        
        dW = np.dot(self.linear_cache.T, dZ)
        # dBias = output_error
        dB = np.sum(dZ, axis=0, keepdims=True) / dZ.shape[0]

        dA = np.dot(dA, self.weights.T)
        # update parameters
        self.weights -= learning_rate * dW
        self.bias -= learning_rate * dB

        return dA

class SkipConn(Layer):
    def __init__(self, layer1, layer2, input_size, output_size, activation="ReLU"):
        self.__super__()
        self.weight1 = np.random.rand(layer1.get_input_size(), output_size) - 0.5
        self.weight2 = np.random.rand(layer2.get_input_size(), output_size) - 0.5
        self.bias1 = np.random.rand(1, output_size) - 0.5
        self.bias2 = np.random.rand(1, output_size) - 0.5

    # returns output for a given input
    def forward(self, A_prev1, A_prev2):
        """
        A_prev1 : Input to the layer that is output of previous layer
        A_prev2 : Input to the layer that is skip connection of past layer
        """
        self.linear_cache = (A_prev1, A_prev2)
        
        temp_A_prev2 = np.zeros_like(A_prev1)
        shape = A_prev1.shape[1] if A_prev1.shape[1]<A_prev2.shape[1] else A_prev2.shape[1]
        temp_A_prev2[:,:shape] = temp_A_prev2[:,:shape] + A_prev2[:,:shape]
        
        assert A_prev1.shape == temp_A_prev2.shape
        
        self.activation_cache = np.dot(A_prev1+temp_A_prev2, self.weights) + self.bias

        if self.activation == "ReLU":
            Z = self._relu(self.activation_cache)

        elif self.activation == "sigmoid":
            Z = self._sigmoid(self.activation_cache)

        elif self.activation == "tanh":
            Z = self._tanh(self.activation_cache)

        return Z

    # computes dE/dW, dE/dB for a given output_error=dE/dY. Returns input_error=dE/dX.
    def backward(self, dA, learning_rate):
        dZ = None
        if self.activation == "ReLU":
            dZ = self._relu_backward() * dA
        
        elif self.activation == "sigmoid":
            dZ = self._sigmoid_backward() * dA
        
        elif self.activation == "tanh":
            dZ = self._tanh_backward() * dA

        temp_A_prev2 = np.zeros_like(self.linear_cache[0])
        shape = self.linear_cache[0].shape[1] if self.linear_cache[0].shape[1]<self.linear_cache[1].shape[1] else self.linear_cache[1].shape[1]
        temp_A_prev2[:,:shape] = temp_A_prev2[:,:shape] + self.linear_cache[1][:,:shape]
        
        
        dW = np.dot((self.linear_cache[0]+temp_A_prev2).T, dZ)
        # dBias = output_error
        dB = np.sum(dZ, axis=0, keepdims=True) / dZ.shape[0]

        dA = np.dot(dA, self.weights.T)
        # update parameters
        self.weights -= learning_rate * dW
        self.bias -= learning_rate * dB

        return dA


def batch_iterator(X, y, batch_size=2):
	n_samples = X.shape[1]
	X, y = shuffle_data(X, y)
	for i in np.arange(0, n_samples, batch_size):
		begin, end = i, min(i+batch_size-1, n_samples)

		yield X[begin:end,:], y[begin:end,:]

def shuffle_data(X, y, seed=None):
	if not seed:
		seed = X.shape[0]

	np.random.seed(seed)

	# idx  = np.arange(X.shape[0])
	idx  = np.arange(1000)
	np.random.shuffle(idx)
	return X[idx], y[idx]

class Model:
    def __init__(self):
        self.layers = []

    # add layer to network
    def add(self, layer):
        self.layers.append(layer)

    def add_skip_conn(self, layer1, layer2):
        pass

    # predict output for given input
    def predict(self, input_data):
        # sample dimension first
        samples = len(input_data)
        result = []

        output = input_data
        for layer in self.layers:
          output = layer.forward(output)

        return output

    def compute_cost(self, y_true, y_pred):
        return np.mean(np.square(y_true-y_pred)) / 2

    def compute_cost_grad(self, y_true, y_pred):
        return (y_pred - y_true)/y_true.shape[1]

    # train the network
    def train(self, x_train, y_train, epochs=50, batch_size=2, learning_rate=0.0001):
        # sample dimension first
        samples = len(x_train)
        
        #saving epoch and error in list
        loss_list = []

        # training loop
        for i in tqdm(range(epochs)):
            err = 0
            # for j in range(samples):
            for x, y in batch_iterator(x_train, y_train, batch_size):
                # forward propagation
                output = x
                for layer in self.layers:
                    output = layer.forward(output)

                # compute loss (for display purpose only)
                loss_list.append(self.compute_cost(y, output))
                err += self.compute_cost(y, output)

                # backward propagation
                grad = self.compute_cost_grad(y, output)
                for layer in reversed(self.layers):
                    grad = layer.backward(grad, learning_rate)

            # calculate average error on all samples
            err /= samples
            # print('epoch %d/%d   error=%f' % (i+1, epochs, err))
        return loss_list

from keras.datasets import mnist
from keras.utils import np_utils

#load MNIST from server
(x_train, y_train),(x_test, y_test) = mnist.load_data()

#training data
#reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0],28*28)
x_train = x_train.astype('float32')
x_train /= 255

#ecnoding output
y_train = np_utils.to_categorical(y_train)

#same for test data
x_test = x_test.reshape(x_test.shape[0],28*28)
x_test = x_test.astype('float32')
x_test /= 255

#ecnoding output
y_test = np_utils.to_categorical(y_test)

#Network
BATCH_SIZE = 2
model = Model()
model.add(Dense(28*28, 50, activation="sigmoid"))
model.add(Dense(50, 10, activation="sigmoid"))

model.train(mse, mse_prime)

history = model.fit(x_train[:2000], y_train[:2000],epochs=50,batch_size=BATCH_SIZE,learning_rate=0.1)

y_pred = model.predict(x_test[:10])

print('true values: ')
print(np.argmax(y_test[0:10], axis=1))

print('predicted values: ')
print(np.argmax(y_pred[0:10], axis=1))



class SkipConn(Layer):
    def __init__(self, layer1, layer2, input_size, output_size, activation="ReLU"):
        Layer.__init__(self,input_size=input_size, output_size=output_size, activation=activation)
        self.weight1 = np.random.rand(layer1.get_input_size(), output_size) - 0.5
        self.weight2 = np.random.rand(layer2.get_input_size(), output_size) - 0.5
        self.bias1 = np.random.rand(1, output_size) - 0.5
        self.bias2 = np.random.rand(1, output_size) - 0.5

    # returns output for a given input
    def forward(self, A_prev1, A_prev2):
        """
        A_prev1 : Input to the layer that is output of previous layer
        A_prev2 : Input to the layer that is skip connection of past layer
        """
        self.linear_cache = (A_prev1, A_prev2)
        
        self.activation_cache = (np.dot(A_prev1, self.weight1) + self.bias1, np.dot(A_prev2, self.weight2)+self.bias2)

        if self.activation == "ReLU":
            Z = (self._relu(self.activation_cache[0]), self._relu(self.activation[1]))

        elif self.activation == "sigmoid":
            Z = (self._sigmoid(self.activation_cache[0]), self._sigmoid(self.activation[1]))

        elif self.activation == "pseudosigmoid":
            Z = (self._pseudosigmoid(self.activation_cache[0]), self._pseudosigmoid(self.activation[1]))

        elif self.activation == "tanh":
            Z = (self._tanh(self.activation_cache[0]), self._tanh(self.activation[1]))

        assert Z[0].shape == Z[1].shape

        return Z[0] + Z[1]

    def summary(self):
      print(f"""--------------------------------------------------------------\n
          Weight1 : {self.weight1.shape} \t bias1 : {self.bias1.shape}\n
          Weight2 : {self.weight2.shape} \t bias2 : {self.bias2.shape}\n
          Input Shape : (None, {self.weight1.shape[0]+self.weight2.shape[0]})\n
          Output Shape : (None, {self.output_size})""")

    # computes dE/dW, dE/dB for a given output_error=dE/dY. Returns input_error=dE/dX.
    def backward(self, dA, learning_rate):
        dZ = None
        if self.activation == "ReLU":
            dZ = (self._relu_backward(self.activation_cache[0]) * dA, self._relu_backward(self.activation_cache[1]))

        elif self.activation == "sigmoid":
            dZ = (self._sigmoid_backward(self.activation_cache[0]) * dA, self._sigmoid_backward(self.activation_cache[1]))

        elif self.activation == "pseudosigmoid":
            dZ = (self._pseudosigmoid_backward(self.activation_cache[0]) * dA, self._pseudosigmoid_backward(self.activation_cache[1]))
        
        elif self.activation == "tanh":
            dZ = (self._tanh_backward(self.activation_cache[0]) * dA, self._tanh_backward(self.activation_cache[1]))
        

        dW1 = np.dot((self.linear_cache[0]).T, dZ[0])
        dW2 = np.dot((self.linear_cache[1]).T, dZ[1])

        # dBias = output_error
        dB1 = np.sum(dZ[0], axis=0, keepdims=True) / dZ[0].shape[0]
        dB2 = np.sum(dZ[1], axis=0, keepdims=True) / dZ[1].shape[0]

        dA1 = np.dot(dA[0], self.weight[0].T)        
        dA2 = np.dot(dA[1], self.weight[1].T)

        # update parameters
        self.weight1 -= learning_rate * dW1
        self.bias1 -= learning_rate * dB1

        self.weight2 -= learning_rate * dW2
        self.bias2 -= learning_rate * dB2

        return dA1, dA2


layers = []

layers.append(Dense(28*28, 512, activation="sigmoid"))
layers.append(Dense(512, 256, activation="sigmoid"))
layers.append(Dense(256, 128, activation="sigmoid"))
layers.append(SkipConn(layers[1], layers[2], 0, 64, activation="sigmoid"))
layers.append(Dense(64, 10, activation="sigmoid"))


for i in range(20):
  for x, Y in batch_iterator(x_train, y_train):
      # forward propagation
      temp_skip = layers[0].forward(X)
      temp = layers[1].forward(temp_skip)
      temp = layers[2].forward(temp)
      temp = layers[3].forward(temp, temp_skip)
      temp = layers[4].forward(temp)

      # loss calculation
      loss = np.mean(np.square(Y-temp)) / 2

      # Gradient of loss w.r.t temp
      grad = (temp - Y) / Y.shape[1]

      grad = layers[4].backward(grad, 0.1)
      grad, grad_skip = layers[4].backward(grad, 0.1)
      grad = layers[4].backward(grad, 0.1)
      grad = layers[4].backward(grad, 0.1)
      grad = layers[4].backward(grad+grad_skip, 0.1)



for layer in layers:
  layer.summary()

