In [88]:
import numpy as np
import h5py
import matplotlib.pyplot as plt

In [89]:
# Load and open the file containing the data
myFile = h5py.File('data-Mini Project 2.h5', 'r+')

# List all groups in the .h5 file
print(f"Keys: {myFile.keys()}")

Keys: <KeysViewHDF5 ['trX', 'trY', 'tstX', 'tstY']>


In [90]:
# print the information about the keys in the data
for key in myFile.keys():
    print(myFile[key])

<HDF5 dataset "trX": shape (3000, 150, 3), type "<f8">
<HDF5 dataset "trY": shape (3000, 6), type "<f8">
<HDF5 dataset "tstX": shape (600, 150, 3), type "<f8">
<HDF5 dataset "tstY": shape (600, 6), type "<f8">


In [91]:
# Extract the data from the file as numpy arrays

n1 = myFile.get('trX')  # trX is the training data
trX = np.array(n1)
print(f"trX shape: {trX.shape}, dtype: {trX.dtype}")

n1 = myFile.get('trY')  # trY is the training labels
trY = np.array(n1)
print(f"trY shape: {trY.shape}, dtype: {trY.dtype}")

n1 = myFile.get('tstX') # tstX is the test data
tstX = np.array(n1)
print(f"tstX shape: {tstX.shape}, dtype: {tstX.dtype}")

n1 = myFile.get('tstY') # tstY is the test labels
tstY = np.array(n1)
print(f"tstY shape: {tstY.shape}, dtype: {tstY.dtype}")

trX shape: (3000, 150, 3), dtype: float64
trY shape: (3000, 6), dtype: float64
tstX shape: (600, 150, 3), dtype: float64
tstY shape: (600, 6), dtype: float64


In [92]:
# close the .h5 file
myFile.close()

# shuffle the training and the test data
indexes = np.arange(trX.shape[0])
np.random.shuffle(indexes)
trX = trX[indexes]
trY = trY[indexes]

indexes = np.arange(tstX.shape[0])
np.random.shuffle(indexes)
tstX = tstX[indexes]
tstY = tstY[indexes]

In [93]:
# tanh activation function for the hidden layer
def tanh_activation(x):
    return np.tanh(x)

# tanh derivative
def tanh_derivative(x):
    return 1.0 - np.tanh(x) ** 2

# sigmoid activation function for the output layer
def sigmoid_activation(x):
    return 1 / (1 + np.exp(-x))

# sigmoid derivative
def sigmoid_derivative(x):
    return x * (1 - x)


In [110]:
class RNN:
    def __init__(self, trX, trY, tstX, tstY, N, learning_rate, mini_batch_size, num_epochs):
        # initialize the data
        self.trX = trX  # 3000 x 150 x 3
        self.trY = trY  # 3000 x 6
        self.tstX = tstX    # 600 x 150 x 3
        self.tstY = tstY    # 600 x 6
        
        # add the bias to the data
        self.trX = np.concatenate((self.trX, np.ones((self.trX.shape[0], self.trX.shape[1], 1))), axis=2)   # 3000 x 150 x 4
        self.tstX = np.concatenate((self.tstX, np.ones((self.tstX.shape[0], self.tstX.shape[1], 1))), axis=2)   # 600 x 150 x 4

        # initialize the hyperparameters
        self.N = N
        self.learning_rate = learning_rate
        self.mini_batch_size = mini_batch_size
        self.num_epochs = num_epochs

        # initialize the weights and biases
        self.Whh = np.random.uniform(-0.1, 0.1, (self.N, self.N))   # N x N
        self.W1h = np.random.uniform(-0.1, 0.1, (self.N, 3+1))  # N x 4
        self.Who = np.random.uniform(-0.1, 0.1, (6, self.N+1))  # 6 x (N+1)

    # forward pass
    def forward_pass(self, x):
        # x is a 150x4 vector where the first 3 elements are the sensor data and the last element is the bias with 150 time steps
        # initialize hidden layer
        h = np.zeros((self.N, 1))   # N x 1
        # initialize hidden layer output
        h_out = np.zeros((self.N, 1))   # N x 1
        # initialize output layer
        y = np.zeros((6, 1))
        # initialize hidden states
        hidden_states = []
        hidden_states.append(np.zeros((self.N, 1))) # initial state
        
        # initialize the predictions
        preds = []

        # loop over the time steps
        for t in range(x.shape[0]):
            # update the hidden layer
            h = np.matmul(self.Whh, h) + np.matmul(self.W1h, x[t].reshape(-1, 1))
            # update the hidden layer output
            h_out = tanh_activation(h)
            hidden_states.append(h_out)
            # add the bias to the hidden layer output
            h_out = np.concatenate((h_out, np.ones((1, 1))), axis=0)
            # update the output layer
            y = np.matmul(self.Who, h_out)
            # apply the sigmoid activation function to the output layer
            y = sigmoid_activation(y)
            preds.append(y)

        return y, hidden_states, preds
    
    # multi category cross entropy loss function
    def loss_function(self, y, pred):
        loss = np.sum(y * np.log(pred) + (1 - y) * np.log(1 - pred))
        return -loss

    # backpropagation
    def backpropagation(self, batch_size):

        for epoch in range(self.num_epochs):
            for i in range(0, self.trX.shape[0], batch_size):
                x_batch = np.array_split(self.trX, batch_size)[i]
                y_batch = np.array_split(self.trY, batch_size)[i]
                
                # initialize the gradients
                dW1h = np.zeros_like(self.W1h)
                dWhh = np.zeros_like(self.Whh)
                dWho = np.zeros_like(self.Who)
                
                dH = np.zeros((self.N, 1)) # initialize the gradient of the hidden layer
                
                previous_state = np.zeros((self.N, 1)) # initialize the previous state as a zero vector (h_-1 = 0)
                
                for t in range(self.trX.shape[1]):
                    ara = np.matmul(self.Whh, previous_state) + np.matmul(self.W1h, x_batch[t].T)
                    previous_state = tanh_activation(ara)
                    # add bias to previous state
                    previous_state = np.concatenate((previous_state, np.ones((1, 150))), axis=0)
                    ara = np.matmul(self.Who, previous_state)
                    pred = sigmoid_activation(ara)
                    d_o = ( pred - y_batch[t].reshape(-1, 1) ) * sigmoid_derivative(pred)
                    dWho += np.matmul(d_o, previous_state.T)
                    dH = np.matmul(self.Who.T, d_o)[:-1] * tanh_derivative(previous_state)[:-1]
                    dWhh += np.matmul(dH, previous_state[:-1].T)
                    dW1h += np.matmul(dH, x_batch[t].reshape(-1, 1).T)
                    
                    if t == self.trX.shape[1] - 1:
                        correctPred = np.sum(np.argmax(pred, axis=0) == np.argmax(y_batch[t].reshape(-1, 1), axis=0))
                
                # update the weights and biases
                self.Whh -= self.learning_rate * dWhh / trX.shape[1]
                self.W1h -= self.learning_rate * dW1h / trX.shape[1]
                self.Who -= self.learning_rate * dWho / trX.shape[1]
                
            accuracy = correctPred / trX.shape[0]
            print(f"Epoch: {epoch}, Accuracy: {accuracy}")
            
            
            

In [111]:
# initialize the network
N = 50
learning_rate = 0.05
mini_batch_size = 30
num_epochs = 50
rnn = RNN(trX, trY, tstX, tstY, N, learning_rate, mini_batch_size, num_epochs)
rnn.backpropagation(mini_batch_size)






ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 1 is different from 150)

In [None]:
sddsd = np.concatenate((hidden_states[149], np.ones((1, 1))), axis=0)
print(f"sddsd: {sddsd}, shape: {sddsd.shape}")

sddsd: [[ 0.02326866]
 [ 0.00384134]
 [-0.18629913]
 [ 0.05566894]
 [-0.1628338 ]
 [-0.09714913]
 [ 0.09012098]
 [ 0.04816209]
 [ 0.11316556]
 [ 0.1126785 ]
 [ 0.05520445]
 [-0.01201024]
 [ 0.02801943]
 [-0.02045145]
 [-0.03128348]
 [ 0.09867042]
 [ 0.02498328]
 [-0.24751347]
 [ 0.0124767 ]
 [ 0.00815191]
 [ 0.08548786]
 [-0.10555946]
 [-0.18262522]
 [-0.00644319]
 [-0.0428866 ]
 [ 0.0815097 ]
 [-0.2047452 ]
 [ 0.02128189]
 [ 0.07166018]
 [ 0.06903144]
 [ 0.02238825]
 [-0.09805824]
 [ 0.05349618]
 [ 0.04149045]
 [-0.07361007]
 [ 0.08843927]
 [-0.02802942]
 [-0.0581105 ]
 [-0.05802994]
 [ 0.01777098]
 [ 0.06871799]
 [-0.1156942 ]
 [ 0.00690742]
 [ 0.06683557]
 [ 0.05482055]
 [-0.00556718]
 [ 0.04620477]
 [ 0.07555757]
 [-0.01169871]
 [ 0.01891714]
 [ 1.        ]], shape: (51, 1)


In [None]:
# implement the training loop and test the network on the test data
# initialize the training loss
tr_loss = np.zeros((num_epochs, 1))
# initialize the test loss
tst_loss = np.zeros((num_epochs, 1))

# initialize the network
N = 50
learning_rate = 0.05
mini_batch_size = 30
num_epochs = 50
rnn = RNN(trX, trY, tstX, tstY, N, learning_rate, mini_batch_size, num_epochs)

# loop over the epochs
for epoch in range(num_epochs):
    # initialize the training loss for this epoch
    tr_loss_epoch = 0
    # initialize the test loss for this epoch
    tst_loss_epoch = 0

    # loop over the training examples
    for i in range(trX.shape[0]):
        # forward pass
        y_hat = rnn.forward_pass(trX[i, :, :])
        # backward pass
        rnn.backward_pass(trX[i, :, :], trY[i, :, :], y_hat)
        # update the weights and biases
        Who = Who - learning_rate * rnn.delta_o * rnn.h.T
        W1h = W1h - learning_rate * rnn.delta_h * rnn.h_out.T
        Whh = Whh - learning_rate * rnn.delta_h * rnn.h.T

        # calculate the training loss for this example
        tr_loss_epoch = tr_loss_epoch + np.sum(np.square(trY[i, :, :] - y_hat))
        # calculate the test loss for this example
        tst_loss_epoch = tst_loss_epoch + np.sum(np.square(tstY[i, :, :] - y_hat))

    # calculate the average training loss for this epoch
    tr_loss[epoch] = tr_loss_epoch / trX.shape[0]
    # calculate the average test loss for this epoch
    tst_loss[epoch] = tst_loss_epoch / tstX.shape[0]

    # print the training and test loss for this epoch
    print(f"Epoch: {epoch}, Training Loss: {tr_loss[epoch]}, Test Loss: {tst_loss[epoch]}")

ValueError: shapes (50,4) and (150,1) not aligned: 4 (dim 1) != 150 (dim 0)

In [None]:
# backward pass
    def backward_pass(self, x, y, pred):
        # initialize the error in the output layer
        delta_o = np.zeros((6, 1))
        # initialize the error in the hidden layer
        delta_h = np.zeros((self.N, 1))

        # loop over the time steps
        for t in reversed(range(x.shape[1])):
            # calculate the error in the output layer
            delta_o = (pred[:, t].reshape(-1, 1) - y[:, t].reshape(-1, 1)) * pred[:, t].reshape(-1, 1) * (1 - pred[:, t].reshape(-1, 1))
            # calculate the error in the hidden layer
            delta_h = np.dot(self.Who.T) * delta_o + np.dot(self.Whh.T, delta_h)

# Backpropagation Through Time (BPTT) function
def bptt(trX, trY, W1h, Whh, Who, learning_rate, N):
    hidden_states, outputs = forward_pass(trX, W1h, Whh, Who, N)
    training_loss = cross_entropy_loss(outputs, trY)  # Compute training loss
    dL_doutputs = outputs - trY

    dW1h = np.zeros_like(W1h)
    dWhh = np.zeros_like(Whh)
    dWho = np.zeros_like(Who)

    for t in reversed(range(trX.shape[1])):
        dWho += np.dot(dL_doutputs.T, hidden_states)
        dhidden = np.dot(dL_doutputs, Who) * dtanh(hidden_states)  # dtanh for derivative of tanh

        for bptt_step in reversed(range(max(0, t - backprop_truncate), t+1)):
            dWhh += np.dot(dhidden.T, hidden_states)
            Xt_bias = np.hstack((trX[:, bptt_step, :], np.ones((trX.shape[0], 1))))
            dW1h += np.dot(dhidden.T, Xt_bias)
            dhidden = np.dot(dhidden, Whh) * dtanh(hidden_states)  # dtanh for derivative of tanh

    W1h -= learning_rate * dW1h
    Whh -= learning_rate * dWhh
    Who -= learning_rate * dWho

    return W1h, Whh, Who, training_loss