In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import keras_tuner as kt
from tqdm import tqdm
import os

import scipy as sc
from scipy import integrate

In [None]:
#loading the trajectories and their corresponding labels

#loading the trajectories with one peak
xtrainx1_1 = np.loadtxt('../Data/Xtrainx1_1.csv', delimiter=',')
xtrainx1_2 = np.loadtxt('../Data/Xtrainx1_2.csv', delimiter=',')
#loading the trajectories with two peaks
xtrainx2_1 = np.loadtxt('../Data/Xtrainx2_1.csv', delimiter=',')
xtrainx2_2 = np.loadtxt('../Data/Xtrainx2_2.csv', delimiter=',')
#loading the trajectories with three peaks
xtrainx3_1 = np.loadtxt('../Data/Xtrainx3_1.csv', delimiter=',')
xtrainx3_2 = np.loadtxt('../Data/Xtrainx3_1.csv', delimiter=',')

#files with trajectories were split to be small enough to upload to GitHub, here we concatenate them
xtrainx1 = np.concatenate((xtrainx1_1, xtrainx1_2))
xtrainx2 = np.concatenate((xtrainx2_1, xtrainx2_2))
xtrainx3 = np.concatenate((xtrainx3_1, xtrainx3_2))

#loading the labels for the trajectories with one peak
ytrain1 = np.loadtxt('../Data/Ytrain1.csv', delimiter=',')
#loading the labels for the trajectories with two peaks
ytrain2 = np.loadtxt('../Data/Ytrain2.csv', delimiter=',')
#loading the labels for the trajectories with three peaks
ytrain3 = np.loadtxt('../Data/Ytrain3.csv', delimiter=',')

In [None]:
#concatenate an index column to the original data to keep track of the original row positions (trajectories). This
#is important because after operations like filtering, shuffling, and training, we may lose track of the original 
#correspondence between each trajectory and its original position in the dataset. By appending the row indices as a
#new column, we can still identify each trajectory later for further analysis.
xtrainx1_index = np.concatenate((xtrainx1, np.arange(xtrainx1.shape[0]).reshape(xtrainx1.shape[0], 1)), axis=1)
xtrainx2_index = np.concatenate((xtrainx2, np.arange(xtrainx2.shape[0]).reshape(xtrainx2.shape[0], 1)), axis=1)
xtrainx3_index = np.concatenate((xtrainx3, np.arange(xtrainx3.shape[0]).reshape(xtrainx3.shape[0], 1)), axis=1)

In [None]:
#extract the columns from ytrain2 and ytrain3 that contain the positions of the peaks
nu2 = ytrain2[:,[2,6]]
nu3 = ytrain3[:,[2,6,10]]

In [None]:
#function to split the data into training, validation and test sets, and calculate their Fourier coefficients along
#with the corresponding labels. The function also appends the original index of each trajectory for tracking.
def fouriertrainvaltest(X, Y, Ntrain, Nval, Ntest):

    #Generating a training set with Ntrain trajectories, a validation with Nval trajectories and a test set with
    #Ntest trajectories. The original trajectories contain 800 time steps but we only use 400 of them, we thus take
    #every second point
    Xtrain = X[0:Ntrain, 0:800:2]
    Xval = X[Ntrain:Ntrain+Nval, 0:800:2]
    Xtest = X[Ntrain+Nval:Ntrain+Nval+Ntest, 0:800:2]

    #extract the corresponding labels for the training, validation and test sets.
    Ytrain = Y[0:Ntrain, :]
    Yval = Y[Ntrain:Ntrain+Nval, :]
    Ytest = Y[Ntrain+Nval:Ntrain+Nval+Ntest, :]

    #calculating the Fourier coefficients for each subset.
    XtrainF = np.fft.fft(Xtrain)
    XvalF = np.fft.fft(Xval)
    XtestF = np.fft.fft(Xtest)

    #Prepare to split the Fourier coefficients into their real and imaginary components. Each complex number will 
    #occupy two columns: one for the real part and one for the imaginary part. Therefore, we create new arrays that 
    #have twice the number of columns. 
    xtrain = np.zeros((XtrainF.shape[0], 2*XtrainF.shape[1]))
    xval = np.zeros((XvalF.shape[0], 2*XvalF.shape[1]))
    xtest = np.zeros((XtestF.shape[0], 2*XtestF.shape[1]))

    #For each Fourier coefficient in the training set, split into real and imaginary parts. These parts are then
    #stored alternately (even indices for real, odd indices for imaginary).
    for i in range(XtrainF.shape[0]):
        for j in range(XtrainF.shape[1]):
            xtrain[i, 2*j] = XtrainF[i,j].real
            xtrain[i, 2*j + 1] = XtrainF[i,j].imag

    #Do the same for the test set, splitting the Fourier coefficients into their real and imaginary parts.
    for i in range(XtestF.shape[0]):
        for j in range(XtestF.shape[1]):
            xtest[i, 2*j] = XtestF[i,j].real
            xtest[i, 2*j + 1] = XtestF[i,j].imag

    #Similarly, split the Fourier coefficients for the validation set.
    for i in range(XvalF.shape[0]):
        for j in range(XvalF.shape[1]):
            xval[i, 2*j] = XvalF[i,j].real
            xval[i, 2*j + 1] = XvalF[i,j].imag
            
    #concatenating the original index from X as a new column to keep track of the trajectories. This index column
    #allows you to track each trajectory after operations like filtering and shuffling. 
    xtrain = np.concatenate((xtrain, X[0:Ntrain,-1].reshape(xtrain.shape[0], 1)), axis=1)
    xval = np.concatenate((xval, X[Ntrain:Ntrain+Nval,-1].reshape(xval.shape[0], 1)), axis=1)
    xtest = np.concatenate((xtest, X[Ntrain+Nval:Ntrain+Nval+Ntest,-1].reshape(xtest.shape[0], 1)), axis=1)

    #Return the transformed training, validation and test sets along with their corresponding labels
    return(xtrain, xval, xtest, Ytrain, Yval, Ytest)

In [None]:
#generating a training, validation and test set so the dimensions can be used to load the hyperparameters
xtrainf, xval, xtest, ytrainf, yval, ytest = fouriertrainvaltest(xtrainx3, ytrain3, 4800, 600, 600)

In [None]:
#define a HyperModel class for the Keras Tuner to optimise the model's architecture and hyperparameters
class HyperModel(kt.HyperModel):

    #function to build the model with hyperparameter tuning
    def build(self, hp):
        #create a sequential model
        model = tf.keras.Sequential()

        #Tune the number of neurons in the first dense layer between 32 and 512, with a step of 32. 'units_0' is the 
        #hyperparameter name, and it will be varied during tuning.
        model.add(tf.keras.layers.Dense(
            units=hp.Int('units_0', min_value = 32, max_value = 512, step=32),
            input_dim = (xtrainf.shape[1]-1),
            activation='relu'))

        #Tune the number of additional hidden layers between 0 and 10. For each layer, tune the number of neurons
        #between 32 and 512.
        for i in range(hp.Int('layers', 0, 10)):
            model.add(tf.keras.layers.Dense(
                units=hp.Int('units_' + str(i + 1), min_value=32, max_value=512, step=32),
                activation='relu'))

        #Add the output layer with 3 neurons (for classification with three outputs), using the softmax activation 
        #function
        model.add(tf.keras.layers.Dense(3,
                activation='softmax'))


        #Tune the learning rate of the Adam optimiser, choosing from [0.01, 0,001, 0.0001, 0.00001, 0.000001]
        hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4, 1e-5, 1e-6])
        
        #compile the model with the Adam optimiser using the tuned learning rate, categorical cross-entropy loss,
        #and categorical accuracy
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = hp_learning_rate),
                      loss="categorical_crossentropy",
                      metrics = 'categorical_accuracy')
        
        #return the constructed model
        return(model)

    #function to fit the model, allowing the batch size to be tuned as well
    def fit(self, hp, model, *args, **kwargs):
        return model.fit(
            *args,
            #Tune the batch size by selecting from [16, 32, 61, half of the training set, or the full training set]
            batch_size=hp.Choice("batch_size", [16, 32, 64, int(xtrain.shape[0]/2), xtrain.shape[0]]),
            **kwargs,
        )

#instantiate a bayesian optimisation tuner
tuner = kt.BayesianOptimization(HyperModel(), #pass the HyperModel class
                     objective='val_loss', #Objective is to minimise the validation loss
                     max_trials = 100, #Perform up to 100 trials to explore different hyperparameter combinations
                     project_name='hp_optimisation_RCclassification_onetwoorthreepeaks') #project name 

In [None]:
#Retrieve the best hyperparameters from the search process
best_hps=tuner.get_best_hyperparameters(num_trials=100)[0] #Get the top hyperparameter combination from 100 trials

#print statements to display the optimal hyperparameters
print(f"""
The hyperparameter search is complete. The optimal number of units in the first densely-connected
layer is {best_hps.get('units_0')}.""")

#print the optimal number of hidden layers
print(f""" The optimal number of hidden layers is {best_hps.get('layers')}""")

#loop through and print the optimal number of units for each hidden layer
for i in range(best_hps.get('layers')):
  print(f""" The optimal number of units in layer {i + 1} is {best_hps.get('units_' + str(i + 1))}""")

#print the optimal learning rate and batch size
print(f"""the optimal learning rate for the optimizer is {best_hps.get('learning_rate')} and the optimal batch size is {best_hps.get('batch_size')}""")

In [None]:
#function to categorise the model's predictions into correct and incorrect label categories
def correctandincorrectlabels(x, y):
    
    #initialise empty lists to store categorised predictions based on their true labels and predicted labels
    
    p1l1 = [] #true label 1, predicted label 1 (correct)
    p1l2 = [] #true label 1, predicted label 1 (incorrect)
    p1l3 = [] #true label 1, predicted label 3 (incorrect)

    p2l1 = [] #true label 2, predicted label 1 (incorrect)
    p2l2 = [] #true label 2, predicted label 2 (correct)
    p2l3 = [] #true label 2, predicted label 3 (incorrect)

    p3l1 = [] #true label 3, predicted label 1 (incorrect)
    p3l2 = [] #true label 3, predicted label 2 (incorrect)
    p3l3 = [] #true label 3, predicted label 3 (correct)
    
    #predict labels using the trained model (ignoring the last column in x which contains the indices for tracking)
    predictions = model.predict(x[:,:-1])

    #loop through all trajectories to categorise based on true and predicted labels
    for i in range(x.shape[0]):

        #if the true label is 1 (i.e., the index with the max value in y[i,:] is 0)
        if y[i,:].argmax() == 0:

            #predicted label is also 1 (correct classification for label 1)
            if predictions[i,:].argmax() == 0:
                #add to list with indices for tracking
                p1l1.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1)) 

            #predicted label is 2 (misclassified as label 2)
            if predictions[i,:].argmax() == 1:
                p1l2.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

            #predicted label is 3 (misclassified as label 3)
            if predictions[i,:].argmax() == 2:
                p1l3.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

        #if the true label is 2 (i.e, the index with the max value in y[i,:] is 1)
        if y[i,:].argmax() == 1:

            #predicted label is 1 (misclassified as label 1)
            if predictions[i,:].argmax() == 0:
                p2l1.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

            #predicted label is 2 (correct classification for label 2)
            if predictions[i,:].argmax() == 1:
                p2l2.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

            #predicted label is 3 (misclassified as label 3)
            if predictions[i,:].argmax() == 2:
                p2l3.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

        #if the true label is 3 (i.e., the index with the max value in y[i,:] is 2)
        if y[i,:].argmax() == 2:
            
            #predicted label is 1 (misclassified as label 1)
            if predictions[i,:].argmax() == 0:
                p3l1.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

            #predicted label is 2 (misclassified as label 2)
            if predictions[i,:].argmax() == 1:
                p3l2.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))

            #predicted label is 3 (correct classification for label 3)
            if predictions[i,:].argmax() == 2:
                p3l3.append(np.concatenate((predictions[[i],:], x[i,-1].reshape(1,1)), axis=1))
    
    #return the categorised predictions (9 lists total, 3 categories for each true label)
    return(p1l1, p1l2, p1l3, p2l1, p2l2, p2l3, p3l1, p3l2, p3l3)

In [None]:
#define a range of epsilon values from 0 to 0.45 with a step of 0.05
epsilon = np.arange(0, 0.45+0.05, 0.05)

#initialise arrays to store the classification accuracy and loss metric for the training, validation and test sets.
#These metrics will be recorded for each value of epsilon

#arrays to store the training classification accuracy and loss
trainingacc = np.zeros(len(epsilon)) #classification accuracy for training set
trainingloss = np.zeros(len(epsilon)) #loss for training set

#arrays to store the validation classification accuracy and loss
valacc = np.zeros(len(epsilon)) #classification accuracy for validation set
valloss = np.zeros(len(epsilon)) #loss for validation set

#arrays to store the test classification accuracy and loss
testacc = np.zeros(len(epsilon)) #classification accuracy for test set
testloss = np.zeros(len(epsilon)) #loss for test set

In [None]:
#iterate over all values of epsilon, tracking the progress with tqdm
for i in tqdm(range(len(epsilon))):
    
    #initialise lists to store the filtered trajectories with 2 and 3 peaks
    Xtrainx2_filtered = []
    Xtrainx3_filtered = []

    #loop through the trajectories with 2 peaks and filter based on the condition that the difference between the 
    #positions of the peaks is greater than or equal to the current epsilon
    for j in range(xtrainx2_index.shape[0]):        
        if np.abs(nu2[j,0] - nu2[j,1]) >= epsilon[i]:
            Xtrainx2_filtered.append(xtrainx2_index[[j],:])

    #loop through the trajectories with 3 peaks and filter based on the condition that all pairwise differences
    #between the positions of the peaks are greater than or equal to the current epsilon
    for k in range(xtrainx3_index.shape[0]):
        if np.abs(nu3[k,0] - nu3[k,1]) >= epsilon[i] and np.abs(nu3[k,0] - nu3[k,2]) >= epsilon[i] and np.abs(nu3[k,1]-nu3[k,2]) >= epsilon[i]:
            Xtrainx3_filtered.append(xtrainx3_index[[k],:])

    #convert the filter lists into numpy arrays
    Xtrainx2_filtered_arr = np.concatenate(Xtrainx2_filtered)
    Xtrainx3_filtered_arr = np.concatenate(Xtrainx3_filtered)

    #create one-hot encoded labels for the three classes
    ytrain1 = np.zeros((xtrainx1_index.shape[0], 3)) #labels for trajectories with one peak
    ytrain2 = np.zeros((Xtrainx2_filtered_arr.shape[0], 3)) #labels for trajectories with two peaks
    ytrain3 = np.zeros((Xtrainx3_filtered_arr.shape[0], 3)) #labels for trajectories with three peaks

    #assign the correct label for each class
    for l in range(xtrainx1_index.shape[0]):
        ytrain1[l,0] = 1 #label for one peak
    
    for m in range(Xtrainx2_filtered_arr.shape[0]):
        ytrain2[m,1] = 1 #label for two peaks

    for n in range(Xtrainx3_filtered_arr.shape[0]):
        ytrain3[n,2] = 1 #label for three peaks

    #concatenate the filtered training data and their corresponding labels
    Xtrain = np.concatenate((xtrainx1_index, Xtrainx2_filtered_arr))
    Xtrain = np.concatenate((Xtrain, Xtrainx3_filtered_arr))
    Ytrain = np.concatenate((ytrain1, ytrain2))
    Ytrain = np.concatenate((Ytrain, ytrain3))
    
    #shuffle the training data and labels
    indices = np.arange(Xtrain.shape[0])
    indices_shuffle = np.random.permutation(indices)
    xtrain = Xtrain[indices_shuffle]
    ytrain = Ytrain[indices_shuffle]
    
    #determine training, validation and test set sizes
    Ntrain = xtrain.shape[0] - 2*int(xtrain.shape[0]*0.1)
    Nval = int(xtrain.shape[0]*0.1)
    Ntest = int(xtrain.shape[0]*0.1)
    
    #split the data into training, validation and test sets containing the Fourier coefficients
    xtrainf, xval, xtest, ytrainf, yval, ytest = fouriertrainvaltest(xtrain, ytrain, Ntrain, Nval, Ntest)

    #buld the model using the best hyperparameters
    model = HyperModel().build(best_hps)
    
    #Fit the model on the training data with validation
    history = model.fit(xtrainf[:,:-1], ytrainf, epochs = 1000, validation_data = (xval[:,:-1], yval), batch_size = best_hps.get('batch_size'), verbose=0)
    
    #evaluate the model on the training, validation and test sets and store the loss and classification accuracy
    trainingloss[i], trainingacc[i] = model.evaluate(xtrainf[:,:-1], ytrainf)
    valloss[i], valacc[i] = model.evaluate(xval[:,:-1], yval)
    testloss[i], testacc[i] = model.evaluate(xtest[:,:-1], ytest)
    
    #save the accuracy and loss results to csv files
    np.savetxt('traininglossvepsilon.csv', trainingloss, delimiter=',')
    np.savetxt('trainingaccvepsilon.csv', trainingacc, delimiter=',')
    np.savetxt('vallossvepsilon.csv', valloss, delimiter=',')
    np.savetxt('valaccvepsilon.csv', valacc, delimiter=',')
    np.savetxt('testlossvepsilon.csv', testloss, delimiter=',')
    np.savetxt('testaccvepsilon.csv', testacc, delimiter=',')
 
    #get correct and incorrect label predictions using the defined function
    p1l1, p1l2, p1l3, p2l1, p2l2, p2l3, p3l1, p3l2, p3l3 = correctandincorrectlabels(xtest, ytest)
    
    #save the predictions for each category to csv files
    
    if len(p1l1)>0:
        p1l1arr = np.concatenate(p1l1)
        np.savetxt('Predictions/predictions1labelled1_epsilon{0}.csv'.format(epsilon[i]), p1l1arr, delimiter=',')
        
    if len(p1l2)>0:
        p1l2arr = np.concatenate(p1l2)
        np.savetxt('Predictions/predictions1labelled2_epsilon{0}.csv'.format(epsilon[i]), p1l2arr, delimiter=',')
        
    if len(p1l3)>0:
        p1l3arr = np.concatenate(p1l3)
        np.savetxt('Predictions/predictions1labelled3_epsilon{0}.csv'.format(epsilon[i]), p1l3arr, delimiter=',')
        
    if len(p2l1)>0:
        p2l1arr = np.concatenate(p2l1)
        np.savetxt('Predictions/predictions2labelled1_epsilon{0}.csv'.format(epsilon[i]), p2l1arr, delimiter=',')
        
    if len(p2l2)>0:
        p2l2arr = np.concatenate(p2l2)
        np.savetxt('Predictions/predictions2labelled2_epsilon{0}.csv'.format(epsilon[i]), p2l2arr, delimiter=',')

    if len(p2l3)>0:
        p2l3arr = np.concatenate(p2l3)
        np.savetxt('Predictions/predictions2labelled3_epsilon{0}.csv'.format(epsilon[i]), p2l3arr, delimiter=',')
        
    if len(p3l1)>0:
        p3l1arr = np.concatenate(p3l1)
        np.savetxt('Predictions/predictions3labelled1_epsilon{0}.csv'.format(epsilon[i]), p3l1arr, delimiter=',')
        
    if len(p3l2)>0:
        p3l2arr = np.concatenate(p3l2)
        np.savetxt('Predictions/predictions3labelled2_epsilon{0}.csv'.format(epsilon[i]), p3l2arr, delimiter=',')
        
    if len(p3l3)>0:
        p3l3arr = np.concatenate(p3l3)
        np.savetxt('Predictions/predictions3labelled3_epsilon{0}.csv'.format(epsilon[i]), p3l3arr, delimiter=',')