In [5]:
import os
import numpy as np
import scipy as sp
import robot_data_treatment

In [6]:
class DataGenerator:
    """
    A class for generating input and output examples for a character-level language model.
    """

    def __init__(self, path):
        """
        Initializes a DataGenerator object.

        Args:
            path (str): The path to the text file containing the training data.
        """
        self.path = path

        # Read in data from file and convert to lowercase
        with open(path) as f:
            data = f.read().lower()

        # Create list of unique characters in the data
        self.chars = list(set(data))

        # Create dictionaries mapping characters to and from their index in the list of unique characters
        self.char_to_idx = {ch: i for (i, ch) in enumerate(self.chars)}
        self.idx_to_char = {i: ch for (i, ch) in enumerate(self.chars)}

        # Set the size of the vocabulary (i.e. number of unique characters)
        self.vocab_size = len(self.chars)

        # Read in examples from file and convert to lowercase, removing leading/trailing white space
        with open(path) as f:
            examples = f.readlines()
        self.examples = [x.lower().strip() for x in examples]

    def generate_example(self, idx):
        """
        Generates an input/output example for the language model based on the given index.

        Args:
            idx (int): The index of the example to generate.

        Returns:
            A tuple containing the input and output arrays for the example.
        """
        example_chars = self.examples[idx]

        # Convert the characters in the example to their corresponding indices in the list of unique characters
        example_char_idx = [self.char_to_idx[char] for char in example_chars]

        # Add newline character as the first character in the input array, and as the last character in the output array
        X = [self.char_to_idx['\n']] + example_char_idx
        Y = example_char_idx + [self.char_to_idx['\n']]

        return np.array(X), np.array(Y)



In [192]:
class RNN_robot:
    """
    A class used to represent a Recurrent Neural Network (RNN).

    Attributes
    ----------
    hidden_size : int
        The number of hidden units in the RNN.
    vocab_size : int
        The size of the vocabulary used by the RNN.
    sequence_length : int
        The length of the input sequences fed to the RNN.
    learning_rate : float
        The learning rate used during training.
    is_initialized : bool
        Indicates whether the AdamW parameters has been initialized.

    Methods
    -------
    __init__(hidden_size, vocab_size, sequence_length, learning_rate)
        Initializes an instance of the RNN class.

    forward(self, X, a_prev)
     Computes the forward pass of the RNN.

    softmax(self, x)
       Computes the softmax activation function for a given input array.

    backward(self,x, a, y_preds, targets)
        Implements the backward pass of the RNN.

   loss(self, y_preds, targets)
     Computes the cross-entropy loss for a given sequence of predicted probabilities and true targets.

    adamw(self, beta1=0.9, beta2=0.999, epsilon=1e-8, L2_reg=1e-4)
       Updates the RNN's parameters using the AdamW optimization algorithm.

    train(self, generated_names=5)
       Trains the RNN on a dataset using backpropagation through time (BPTT).

   predict(self, start)
        Generates a sequence of characters using the trained self, starting from the given start sequence.
        The generated sequence may contain a maximum of 50 characters or a newline character.

    """

    def __init__(self, trainSet, valSet, hidden_size, sequence_length, learning_rate):
        """
        Initializes an instance of the RNN class.

        Parameters
        ----------
        hidden_size : int
            The number of hidden units in the RNN.
        vocab_size : int
            The size of the vocabulary used by the RNN.
        sequence_length : int
            The length of the input sequences fed to the RNN.
        learning_rate : float
            The learning rate used during training.
        """

        # hyper parameters
        self.hidden_size = hidden_size
        self.trainSet = trainSet
        self.valSet = valSet

        self.sequence_length = sequence_length
        self.learning_rate = learning_rate
        self.X = None
        self.output_size = 3
        self.input_size = 5

        # model parameters
        self.Wax = np.random.uniform(-np.sqrt(1. / self.input_size), np.sqrt(1. / self.input_size), (hidden_size, self.input_size))
        self.Waa = np.random.uniform(-np.sqrt(1. / hidden_size), np.sqrt(1. / hidden_size), (hidden_size, hidden_size))
        self.Wya = np.random.uniform(-np.sqrt(1. / hidden_size), np.sqrt(1. / hidden_size), (self.output_size, hidden_size))
        self.ba = np.zeros((hidden_size, 1))
        self.by = np.zeros((self.output_size, 1))

        # Initialize gradients
        self.dWax, self.dWaa, self.dWya = np.zeros_like(self.Wax), np.zeros_like(self.Waa), np.zeros_like(self.Wya)
        self.dba, self.dby = np.zeros_like(self.ba), np.zeros_like(self.by)

        # parameter update with AdamW
        self.mWax = np.zeros_like(self.Wax)
        self.vWax = np.zeros_like(self.Wax)
        self.mWaa = np.zeros_like(self.Waa)
        self.vWaa = np.zeros_like(self.Waa)
        self.mWya = np.zeros_like(self.Wya)
        self.vWya = np.zeros_like(self.Wya)
        self.mba = np.zeros_like(self.ba)
        self.vba = np.zeros_like(self.ba)
        self.mby = np.zeros_like(self.by)
        self.vby = np.zeros_like(self.by)

    def softmax(self, x):
        """
        Computes the softmax activation function for a given input array.

        Parameters:
            x (ndarray): Input array.

        Returns:
            ndarray: Array of the same shape as `x`, containing the softmax activation values.
        """
        # shift the input to prevent overflow when computing the exponentials
        x = x - np.max(x)
        # compute the exponentials of the shifted input
        p = np.exp(x)
        # normalize the exponentials by dividing by their sum
        return p / np.sum(p)

    def forward(self, X, a_prev):
        """
        Compute the forward pass of the RNN.

        Parameters:
        X (ndarray): Input data of shape (seq_length, input_size)
        a_prev (ndarray): Activation of the previous time step of shape (hidden_size, 1)

        Returns:
        x (dict): Dictionary of input data of shape (seq_length, input_size, 1), with keys from 0 to seq_length-1
        a (dict): Dictionary of hidden activations for each time step, with keys from 0 to seq_length-1
        y_pred (dict): Dictionary of output global positions for each time step, with keys from 0 to seq_length-1
        """
        # Initialize dictionaries to store activations and output probabilities.
        x, a, y_pred = {}, {}, {}

        # Store the input data in the class variable for later use in the backward pass.
        self.X = X
        #print("input: ", self.X)
        # Set the initial activation to the previous activation.
        a[-1] = np.copy(a_prev)
        # iterate over each time step in the input sequence
        for t in range(len(self.X)):

            # get the input at the current time step
            x[t] = np.zeros((self.input_size,1))
            #if (self.X[t] != None):
            #    x[t][self.X[t]] = 1
            x[t] = np.array(X[t])
            x[t] = x[t].reshape((self.input_size,1))
            # compute the hidden activation at the current time step
#            print(f"{self.Wax.shape=}")
#            print(f"{x[t].shape=}")
#            print(f"{self.Waa.shape=}")
#            print(f"{a[t - 1].shape=}")
#            print(f"{self.ba.shape=}")
#            print("\n\n")
            a[t] = np.tanh(np.dot(self.Wax, x[t]) + np.dot(self.Waa, a[t - 1]) + self.ba)
            #print("a[t] shape:", a[t].shape)
            # compute the output probabilities at the current time step
            #print("self.Wya shape ", self.Wya.shape)
#            print(f"{a[t].shape=}")
#            print(f"{self.by.shape=}")
#            print(f"{self.Wya.shape=}")
            y_pred[t] = self.softmax(np.dot(self.Wya, a[t]) + self.by)
#            print(f"{y_pred[t].shape=}")
            #print("y[t] shape ", y_pred[t].shape)
            # add an extra dimension to X to make it compatible with the shape of the input to the backward pass
        # return the input, hidden activations, and output probabilities at each time step
 #       print(f"{y_pred[0].shape=}")
        return x, a, y_pred

    def backward(self,x, a, y_preds, targets):
        """
        Implement the backward pass of the RNN.

        Args:
        x -- (dict) of input characters (as one-hot encoding vectors) for each time-step, shape (vocab_size, sequence_length)
        a -- (dict) of hidden state vectors for each time-step, shape (hidden_size, sequence_length)
        y_preds -- (dict) of output probability vectors (after softmax) for each time-step, shape (vocab_size, sequence_length)
        targets -- (list) of integer target characters (indices of characters in the vocabulary) for each time-step, shape (1, sequence_length)

        Returns:
        None

        """
        # Initialize derivative of hidden state for the last time-step
        da_next = np.zeros_like(a[0])

        # Loop through the input sequence backwards
        for t in reversed(range(len(self.X))):
            #print(t)
            #FIXME: FIX THIS DERIVATIVE
            #dy_preds[targets[t]](1/(len(self.X)-t))*np.square(y_preds[i] - targets[i])
            # Calculate derivative of output probability vector
            dy_preds = np.copy(y_preds[t])
            for i in range(len(dy_preds)):
                #print(f"{y_preds[i]=}")
                #print(f"{targets[i]=}")
                dy_preds[i] = (1/(len(self.X)-t))*np.square(y_preds[i][0] - targets[t][i])
            #print(dy_preds)
            #print(len(dy_preds))
            #print(len(y_preds))
            #print(targets[t])
            #print("all targets")
            #print(targets)
            #for ()

            for index in range(len(dy_preds)):
                dy_preds[index] = (2.0/(self.sequence_length-t))*(dy_preds[index] - targets[t][index])

            #dy_preds[targets[t]] -= 1

            # Calculate derivative of hidden state
            da = np.dot(self.Waa.T, da_next) + np.dot(self.Wya.T, dy_preds)
#            print(self.Waa.T.shape, da_next.shape)
#            print(self.Wya.T.shape, dy_preds.shape)
#            print("bbb")
            dtanh = (1 - np.power(a[t], 2))
#            print(self.dba.shape)
            da_unactivated = dtanh * da

            # Calculate gradients
            self.dba += da_unactivated
            self.dWax += np.dot(da_unactivated, x[t].T)
            self.dWaa += np.dot(da_unactivated, a[t - 1].T)

            # Update derivative of hidden state for the next iteration
            da_next = da_unactivated

            # Calculate gradient for output weight matrix
            self.dWya += np.dot(dy_preds, a[t].T)

            # clip gradients to avoid exploding gradients
            for grad in [self.dWax, self.dWaa, self.dWya, self.dba, self.dby]:
                np.clip(grad, -1, 1, out=grad)

    def loss(self, y_preds, targets):
        """
        Computes the cross-entropy loss for a given sequence of predicted probabilities and true targets.

        Parameters:
            y_preds (ndarray): Array of shape (sequence_length, 1) containing the predicted velocities for each time step.
            targets (ndarray): Array of shape (sequence_length, 1) containing the true targets for each time step.

        Returns:
            float: Cross-entropy loss.
        """

        sum = 0
        for i in range(len(y_preds)):
            diff = y_preds[i] - targets[i].reshape((3,1))
            #print("aaaaaaaa ", diff)
            prod = np.dot(diff, diff.T)
            sum += np.sum((1/(len(y_preds)-i))*prod)
        #print("\nsum ", sum)
        return sum

    def adamw(self, beta1=0.9, beta2=0.999, epsilon=1e-8, L2_reg=1e-4):
        """
        Updates the RNN's parameters using the AdamW optimization algorithm.
        """
        # AdamW update for Wax
        self.mWax = beta1 * self.mWax + (1 - beta1) * self.dWax
        self.vWax = beta2 * self.vWax + (1 - beta2) * np.square(self.dWax)
        m_hat = self.mWax / (1 - beta1)
        v_hat = self.vWax / (1 - beta2)
        self.Wax -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.Wax)

        # AdamW update for Waa
        self.mWaa = beta1 * self.mWaa + (1 - beta1) * self.dWaa
        self.vWaa = beta2 * self.vWaa + (1 - beta2) * np.square(self.dWaa)
        m_hat = self.mWaa / (1 - beta1)
        v_hat = self.vWaa / (1 - beta2)
        self.Waa -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.Waa)

        # AdamW update for Wya
        self.mWya = beta1 * self.mWya + (1 - beta1) * self.dWya
        self.vWya = beta2 * self.vWya + (1 - beta2) * np.square(self.dWya)
        m_hat = self.mWya / (1 - beta1)
        v_hat = self.vWya / (1 - beta2)
        self.Wya -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.Wya)

        # AdamW update for ba
        self.mba = beta1 * self.mba + (1 - beta1) * self.dba
        self.vba = beta2 * self.vba + (1 - beta2) * np.square(self.dba)
        m_hat = self.mba / (1 - beta1)
        v_hat = self.vba / (1 - beta2)
        self.ba -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.ba)

        # AdamW update for by
        self.mby = beta1 * self.mby + (1 - beta1) * self.dby
        self.vby = beta2 * self.vby + (1 - beta2) * np.square(self.dby)

    def sample(self):
        """
        Sample a sequence of characters from the RNN.

        Args:
            None

        Returns:
            list: A list of integers representing the generated sequence.
        """
        # initialize input and hidden state
        x = np.zeros((self.vocab_size, 1))
        a_prev = np.zeros((self.hidden_size, 1))

        # create an empty list to store the generated character indices
        indices = []

        # idx is a flag to detect a newline character, initialize it to -1
        idx = -1

        # generate sequence of characters
        counter = 0
        max_chars = 50 # maximum number of characters to generate
        newline_character = self.data_generator.char_to_idx['\n'] # the newline character

        while (idx != newline_character and counter != max_chars):
            # compute the hidden state
            a = np.tanh(np.dot(self.Wax, x) + np.dot(self.Waa, a_prev) + self.ba)

            # compute the output probabilities
            y = self.softmax(np.dot(self.Wya, a) + self.by)

            # sample the next character from the output probabilities
            idx = np.random.choice(list(range(self.vocab_size)), p=y.ravel())

            # set the input for the next time step
            x = np.zeros((self.vocab_size, 1))
            x[idx] = 1

            # store the sampled character index in the list
            indices.append(idx)

            # update the previous hidden state
            a_prev = a

            # increment the counter
            counter += 1

        # return the list of sampled character indices
        return indices


    def train(self, generated_names=5):
        """
        Train the RNN on a dataset using backpropagation through time (BPTT).

        Args:
        - generated_names: an integer indicating how many example names to generate during training.

        Returns:
        - None
        """

        iter_num = 0
        threshold = 5 # stopping criterion for training
        smooth_loss = 100#-np.log(1.0 / self.data_generator.vocab_size) * self.sequence_length  # initialize loss

        self.trainSet
        self.valSet

        while (iter_num < len(self.trainSet)):
            a_prev = np.zeros((self.hidden_size, 1))
            #idx = iter_num % self.vocab_size
            # get a batch of inputs and targets
            #inputs, targets = self.data_generator.generate_example(idx)
            inputs = self.trainSet[iter_num][0]
            #print("aaaaaaaa ", inputs)
            targets = self.trainSet[iter_num][1]

            # forward pass
            x, a, y_pred  = self.forward(inputs, a_prev)

            # backward pass
            self.backward(x, a, y_pred, targets)

            # calculate and update loss
            loss = self.loss(y_pred, targets)
            self.adamw()
            smooth_loss = smooth_loss * 0.999 + loss * 0.001

            # update previous hidden state for the next batch
            a_prev = a[len(self.X) - 1]
            # print progress every 500 iterations
            if iter_num % 50 == 0:
                print("\n\niter :%d, loss:%f\n" % (iter_num, smooth_loss))
                #for i in range(generated_names):
                    #sample_idx = self.sample()
                    #txt = ''.join(self.data_generator.idx_to_char[idx] for idx in sample_idx)
                #    txt = txt.title()  # capitalize first character
                #    print ('%s' % (txt, ), end='')
            iter_num += 1

    def predict(self, start, num_sequences=1):
        """
        Generate a sequence of characters using the trained self, starting from the given start sequence.
        The generated sequence may contain a maximum of 50 characters or a newline character.

        Args:
        - start: a string containing the start sequence

        Returns:
        - txt: a string containing the generated sequence
        """

        # Initialize input vector and previous hidden state
        x = np.zeros((self.input_size, 1))
        x = start[0].reshape((self.input_size,1))
        a_prev = np.zeros((self.hidden_size, 1))

        y_preds = []
        # Generate sequence
        for counter in range(num_sequences):
            # Compute next hidden state and predicted character
            a = np.tanh(np.dot(self.Wax, x) + np.dot(self.Waa, a_prev) + self.ba)
            y_pred = self.softmax(np.dot(self.Wya, a) + self.by)
            y_preds.append(y_pred)

            # Update input vector, previous hidden state, and indices
            x = np.zeros((self.input_size, 1))
            x = start[counter].reshape((self.input_size,1))
            a_prev = a

        return y_preds

In [193]:
#data_generator = DataGenerator('dinos.txt')
_, x0, y0 = robot_data_treatment.dataGet("../data/quadrado_opt_1_1.csv")
_, x1, y1 = robot_data_treatment.dataGet("../data/quadrado_opt_1_2.csv")
_, x2, y2 = robot_data_treatment.dataGet("../data/quadrado_opt_1_3.csv")
_, x3, y3 = robot_data_treatment.dataGet("../data/quadrado_opt_1_4.csv")
_, x4, y4 = robot_data_treatment.dataGet("../data/quadrado_opt_1_5.csv")
_, x5, y5 = robot_data_treatment.dataGet("../data/quadrado_opt_2_1.csv")
_, x6, y6 = robot_data_treatment.dataGet("../data/quadrado_opt_2_2.csv")
_, x7, y7 = robot_data_treatment.dataGet("../data/quadrado_opt_2_3.csv")
_, x8, y8 = robot_data_treatment.dataGet("../data/quadrado_opt_2_4.csv")
_, x9, y9 = robot_data_treatment.dataGet("../data/quadrado_opt_2_5.csv")

x0 = x0 + x1 + x2 + x3 + x4 + x5 + x6 + x7 + x8 + x9
y0 = y0 + y1 + y2 + y3 + y4 + y5 + y6 + y7 + y8 + y9

sequenceLength = 10
trainTuple, valTuple, testTuple = robot_data_treatment.createTestTrainSets(x0, y0, sequenceLength, trainRatio=0.8, valRatio=0.1, testRatio=0.1)


In [194]:
rnn = RNN_robot(trainTuple, valTuple, hidden_size=200, sequence_length=sequenceLength, learning_rate=1e-3)
rnn.train()



iter :0, loss:99.900049



iter :50, loss:95.366202



iter :100, loss:91.059809



iter :150, loss:86.862969



iter :200, loss:82.908413



iter :250, loss:79.083875



iter :300, loss:75.490720



iter :350, loss:72.137276



iter :400, loss:69.018533



iter :450, loss:65.924898



iter :500, loss:63.081980



iter :550, loss:60.305169



iter :600, loss:57.590756



iter :650, loss:55.009548



iter :700, loss:52.660325



iter :750, loss:50.404504



iter :800, loss:48.206727



iter :850, loss:46.149161



iter :900, loss:44.199572



iter :950, loss:42.418750



iter :1000, loss:40.549766



iter :1050, loss:38.899486



iter :1100, loss:37.362322



iter :1150, loss:35.826491



iter :1200, loss:34.458299



iter :1250, loss:33.010498



iter :1300, loss:31.749008



iter :1350, loss:30.573779



iter :1400, loss:29.497132



iter :1450, loss:28.493527



iter :1500, loss:27.400683



In [199]:
test = testTuple[14] # random input
input = test[0]
output = test[1]
for i in range(len(input)-1):
    est1, est2 = rnn.predict([input[i], input[i+1]], 2)
    gt1 = output[i].reshape(3,1)
    gt2 = output[i+1].reshape(3,1)
    diff1 = gt1 - est1
    diff2 = gt2 - est2
    print(diff1, diff2)

[[-0.3938943 ]
 [ 0.46838724]
 [-0.0167895 ]] [[-0.2727314 ]
 [ 0.36737851]
 [-0.0167895 ]]
[[-0.3964885 ]
 [ 0.49113561]
 [-0.0167895 ]] [[-0.2719689 ]
 [ 0.42392613]
 [ 0.15241288]]
[[-0.39572552]
 [ 0.54768275]
 [ 0.15241288]] [[-0.27091121]
 [ 0.37448657]
 [ 0.15241288]]
[[-0.3946684 ]
 [ 0.49824376]
 [ 0.15241288]] [[-0.27278621]
 [ 0.36988657]
 [ 0.15241288]]
[[-0.39654277]
 [ 0.49364313]
 [ 0.15241288]] [[-0.26693166]
 [ 0.37579187]
 [ 0.06995833]]
[[-0.39068832]
 [ 0.49954853]
 [ 0.06995833]] [[-0.26511395]
 [ 0.37130229]
 [ 0.06995833]]
[[-0.38887048]
 [ 0.49505881]
 [ 0.06995833]] [[-0.26230626]
 [ 0.33137921]
 [ 0.01682197]]
[[-0.38606286]
 [ 0.45513581]
 [ 0.01682197]] [[-0.26256376]
 [ 0.47739588]
 [ 0.08552197]]
[[-0.38632043]
 [ 0.60115255]
 [ 0.08552197]] [[-0.26147285]
 [ 0.4860777 ]
 [ 0.08552197]]
