<a href="https://colab.research.google.com/github/Wxyxixixi/comp5329/blob/minibatch/MLP_4layers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
import h5py
import numpy as np
import time


auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

#download files from google drive
download = drive.CreateFile({'id':'1fGT2vvmDcGJkh_Z2QZ-9DqCs_zNXgAU0'})
download.GetContentFile('train_128.h5')
download = drive.CreateFile({'id':'1T5dR0YruZ9yGOiEkq-Dj0Xj6PzdnRJSw'})
download.GetContentFile('train_label.h5')
download = drive.CreateFile({'id':'1X-xKBwhtWu17e7rrro1esNtsbGgmsUlr'})
download.GetContentFile('test_128.h5')

#read files
train_data = h5py.File('train_128.h5','r')
label_data = h5py.File('train_label.h5','r')
test_data = h5py.File('test_128.h5','r')

#get raw data
input_data = train_data['data'][:]  # training data 
output_data = label_data['label'][:] # training data label
test_data = test_data['data'][:] # testing data

In [0]:
class Activation(object):
    ''' Three activation functions are defined here: tanh, sigmoid and leaky relu '''
    def __tanh(self, x):
        return np.tanh(x)

    def __tanh_deriv(self, a):
        # a = np.tanh(x)
        return 1.0 - a**2
    def __logistic(self, x):
        return 1.0 / (1.0 + np.exp(-x))

    def __logistic_deriv(self, a):
        # a = logistic(x)
        return  a * (1 - a )

    def __relu(self,x,alpha=0.05):
        return np.where(x>=0, x, 0)

    def __relu_deriv(self,a,alpha=0.05):
        # a = relu(x)
        return np.where(a > 0, 1, 0.001)

    def __init__(self,activation='tanh'):
        if activation == 'logistic':
            self.f = self.__logistic
            self.f_deriv = self.__logistic_deriv
        elif activation == 'tanh':
            self.f = self.__tanh
            self.f_deriv = self.__tanh_deriv
        elif activation == 'relu':
            self.f = self.__relu
            self.f_deriv = self.__relu_deriv
        

In [0]:
class HiddenLayer(object):
    def __init__(self, n_in, n_out,
                 activation_last_layer='tanh', activation='tanh', W=None, b=None):

        self.input = None
        self.outputD = n_out
        self.activation = Activation(activation).f

        # activation deriv of last layer
        self.activation_deriv = None
        if activation_last_layer:
            self.activation_deriv = Activation(activation_last_layer).f_deriv

        self.W = np.random.uniform(
            low=-np.sqrt(6. / (n_in + n_out)),
            high=np.sqrt(6. / (n_in + n_out)),
            size=(n_in, n_out)
        )
        if activation == 'logistic':
            self.W *= 4

        if activation == 'relu':
            self.W = np.random.uniform(
                low=-np.sqrt(6. / n_in),
                high=np.sqrt(6. / n_in),
                size=(n_in, n_out)
            )

        self.b = np.zeros(n_out, )

        # initialize parameters for momentum
        self.Vp = np.zeros(self.W.shape)
        self.V = np.zeros(self.W.shape)

        self.grad_W = np.zeros(self.W.shape)
        self.grad_b = np.zeros(self.b.shape)

    def forward(self, input, dropout=0.2):
        '''
        :type input: numpy.array
        :param input: a symbolic tensor of shape (n_in,)
        '''

        ''' randomly generate a random sample from an array with input size for dropout'''
        # index = np.random.choice(np.arange(input.size), replace=False, size=int(input.size * dropout))
        # input[index] = 0

        lin_output = np.dot(input, self.W) + self.b
        self.output = (
            lin_output if self.activation is None
            else self.activation(lin_output)
        )
        self.input = input
        return self.output

    def backward(self, delta, output_layer=False):
        self.grad_W = np.atleast_2d(self.input).T.dot(np.atleast_2d(delta))
        for i in range(len(delta[0])):
            self.grad_b[i] = delta.T[i].mean()
        if self.activation_deriv:
            delta = delta.dot(self.W.T) * self.activation_deriv(self.input)
        return delta

    '''generate batches avoiding training all data for a single training time'''

    def generate_batches(X, y, batch_size):

        rand = np.random.choice(len(y), len(y), replace=False)
        X_shuffled = X[rand]
        y_shuffled = np.array(y)[rand.astype(int)]
        batches = [(X_shuffled[i:i + batch_size, :], y_shuffled[i:i + batch_size]) for i in
                   range(0, len(y), batch_size)]
        return batches

    def softmax(x):
        # x is one dimensional
        if x.ndim == 1:
            return np.exp(x) / np.sum(np.exp(x))

        result = np.zeros(x.shape)
        sum = np.zeros(len(x))
        for i in range(len(x)):
            result[i] = np.exp(x[i])
            sum[i] = result[i].sum()

        for i in range(len(result)):
            result[i] = result[i] / sum[i]

        return result

    def crossentropy_loss(y, y_hat, epsilon=1e-3):
        loss = 0
        y_softmax = HiddenLayer.softmax(y_hat)
        if y_softmax.ndim == 1:
            return np.log(y_softmax[y[0]])
        # y_hat = np.clip(y_hat, epsilon, 1-epsilon)
        N = y_hat.shape[0]
        # add a super small number 1e-5 avoiding the log0
        for i in range(N):
            loss = loss + np.log(y_softmax[i][y[i]])
        # loss = -np.sum(np.log(y_hat+1e-5))/N
        loss = - loss / N
        return loss

In [0]:
class MLP:

    def __init__(self, layers, activation, learning_rate, momentum, weight_decay, epochs, batch_size, dropout):
        """
        :param layers: A list containing the number of units in each layer.
        Should be at least two values
        :param activation: The activation function to be used. Can be
        "logistic" or "tanh"
        """
        ### initialize parameters
        self.layers = []
        self.params = []
        self.activation = activation
        self.learning_rate = learning_rate
        self.momentum = momentum
        self.weight_decay = weight_decay
        self.epochs = epochs
        self.batch_size = batch_size
        self.dropout = dropout
        self.output = layers[-1]

        for i in range(len(layers) - 1):
            self.layers.append(HiddenLayer(layers[i], layers[i + 1], activation[i], activation[i + 1]))

    def forward(self, input):
        for layer in self.layers:
            output = layer.forward(input)
            input = output
        return output

    def criterion_CrossEntropyLoss(self, y, y_hat, epsilon):
        activation_deriv = Activation(self.activation[-1]).f_deriv
        error = np.zeros(y_hat.shape)
        for i in range(len(y_hat)):
            for j in range(len(y_hat[0])):
                error[i][j] = -y_hat[i][j]
            error[i][y[i]] = 1-(-1)*error[i][y[i]]

        # error = y - y_hat
        loss = HiddenLayer.crossentropy_loss(y=y, y_hat=y_hat, epsilon=epsilon)
        # calculate the delta of the output layer
        delta = -error * activation_deriv(y_hat)
        # return loss and delta
        return loss, delta

    def backward(self, delta):
        delta = self.layers[-1].backward(delta, output_layer=True)
        for layer in reversed(self.layers[:-1]):
            delta = layer.backward(delta)

    # update weight and bias with momentum
    def update(self, lr):
        for layer in self.layers:
            layer.W -= lr * layer.grad_W
            layer.b -= lr * layer.grad_b

    def get_grads(self):
        layer_grad_W = []
        layer_grad_b = []
        for j in range(len(self.layers)):
            layer_grad_W.append(self.layers[j].grad_W)
            layer_grad_b.append(self.layers[j].grad_b)
        return layer_grad_W, layer_grad_b

    def batch_update(self, dW, db):
        for j in range(len(self.layers)):
            V = self.momentum * self.layers[j].Vp - self.learning_rate * self.weight_decay * self.layers[
                j].W - self.learning_rate * dW[j]
            self.layers[j].Vp = self.layers[j].V
            self.layers[j].V = V
            self.layers[j].W += self.layers[j].V
            self.layers[j].b -= self.learning_rate * db[j]

    def fit(self, X, y):

        X = np.array(X)
        y = np.array(y)
        to_return = np.zeros(self.epochs)

        size = np.ceil(X.shape[0] / batch_size)
        loss = np.zeros(int(size))

        for k in range(self.epochs):
            # applying mini batch
            batches = HiddenLayer.generate_batches(X, y, self.batch_size)
            loss = np.zeros(len(batches))
            i = 0
            for batch in batches:
                X_batch = np.array(batch[0])
                Y_batch = np.array(batch[1])
                y_hat = np.zeros((len(X_batch), self.output))

                #for i in range(X_batch.shape[0]):
                    # forward pass
                y_hat = self.forward(X_batch)
                    # backward pass
                loss[k], delta = self.criterion_CrossEntropyLoss(Y_batch, y_hat, epsilon=1e-3)
                self.backward(delta)
                self.update(self.learning_rate)
            # layer_grad_W, layer_grad_b = self.get_grads()
            #     dW.append(layer_grad_W)
            #     db.append(layer_grad_b)
            #     loss[i] = np.mean(loss)
            #     i += 1
            #     gradients_W = {}
            #     gradients_b = {}
            #     for i in range(len(self.layers)):  # could replace with len(self.layers)
            #         gradients_W[i] = np.array([j[i] for j in dW]).mean(axis=0)
            #         gradients_b[i] = np.array([j[i] for j in db]).mean(axis=0)
            #     DW = [i for j, i in gradients_W.items()]
            #     Db = [i for j, i in gradients_b.items()]
            #     # update weights with batch gradient
            #     self.batch_update(DW, Db)
            # print("第%d个epoch"%k)
        to_return = np.mean(loss)
        return to_return

    def predict(self, x):
        data = np.array(x)
        value = self.forward(data)
        #print("1", value.shape)
        value = np.argmax(value,axis=1)
        #print(value.shape)
        return value

In [0]:
def train_val_split(data, label, ratio=0.75, shuffle=False,):
    # ratio为train/validation,默认75%
    # shuffle为是否要打乱顺序，默认否
    # train,train_label为训练数据
    # val, val_label为验证数据
    if shuffle:
        state = np.random.get_state()
        np.random.shuffle(data)
        np.random.set_state(state)
        np.random.shuffle(label)
    train = data[0: int(len(data)*ratio)]
    val = data[int(len(data)*ratio):]
    train_label = label[0: int(len(label)*ratio)]
    val_label = label[int(len(label)*ratio):]
    return train, val, train_label, val_label


def accuracy(truelabel, prdlabel):
    # assume truelabel[i] respect to prdlabel[i]
    count = 0
    value = np.argmax(np.array(prdlabel),axis=1)
    for i in range(len(truelabel)):
        if truelabel[i] == value[i]:
            count = count + 1
    return count/len(truelabel)


# def accuracy2()
#     y_train_max = np.argmax(np.array(y_train),axis=1)
#     yhat_train_max = np.argmax(yhat_train,axis=1)
#     accuracy_train = (np.sum(y_train_max == yhat_train_max)) / (y_train.shape[0])
#     return accuracy_train()

In [0]:
# parameter setting
layers = [128,50,40,10]
activation = [None,'tanh','tanh','relu']
learning_rate=0.01
momentum=0.9
weight_decay=0.05
epochs=100
batch_size=128
dropout = 0.2

In [0]:
input_data, val_data, output_data, val_label = train_val_split(input_data,output_data)
nn = MLP(layers, activation,learning_rate, momentum, weight_decay, epochs, batch_size, dropout)

In [55]:
start =time.clock()

crossentropy = nn.fit(input_data, output_data)
print('loss : %f'%crossentropy)
result = nn.predict(val_data)

accuracy =accuracy(val_label, result)
print("accuarcy:%.4f"%accuracy)
end = time.clock()
print('Running time: %s Seconds'%(end-start))



IndexError: ignored

In [0]:
'''
This code refers to COMP5329 MLP-v2 tutorial
'''