In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from io import StringIO
import re
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
from keras.utils.vis_utils import plot_model
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix, classification_report, adjusted_rand_score as ARS

### Often used parameters

In [None]:
RANDOM_STATE = 404
TOP = 10

## Data Preprocessing

Importing the dataset, removing unnecessary columns and filling up the NaN values.

In [None]:
df = pd.read_csv('UPFALL_dataset.csv')
columns_to_remove = df.filter(regex='TimeStamps|Subject|Trial|Tag|Infrared|BrainSensor|Luminosity').columns.values
for column in columns_to_remove:
    df.pop(column)
df.head()

Combining the column name with first row which holds the unit for the purpose of clarification. This way we remove Unnamed columns and diffirentiate columns between each other on the base of axis.

In [None]:
columns_to_copy = ['AngularVelocity', 'Accelerometer']
column_name = ""
counter = 0

for column in df.columns:
    if any(substring in column for substring in columns_to_copy):
        column_name = column
        df.rename(columns = {column:column + " " + df.iloc[0][counter]}, inplace = True)
    elif "Unnamed" in column:
        df.rename(columns = {column:column_name + " " + df.iloc[0][counter]}, inplace = True)
    counter += 1

df = df.tail(-1)
df = df.astype(float)
df.head()

In [None]:
df = df.dropna()
nan_rows_count = df.isna().sum()
print(nan_rows_count)

Acquiring the labels and removing them from the dataset before normalization. Numbers 1-5 are different types of falls.

In [None]:
labels = df["Activity"]
df.pop("Activity")
num_classes = labels.unique()

Applying the min max normalization on the whole dataframe.

In [None]:
scaler = MinMaxScaler()
df = pd.DataFrame(scaler.fit_transform(df), columns = df.columns)
df.head()

### Feature importance

Dividing into a 50% split for the purpose of obtaining feature importance.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df, labels, test_size=0.5, random_state=RANDOM_STATE)


Get the top 10 features based on feature importance from either a file if it exists or create new using Random Forest Classifier:

In [None]:
content = open('feature_importance.txt', 'r').read()
if not re.search(r'^\s*$', content):
    # read the file
    with open('feature_importance.txt', 'r') as f:
        feat_importance = [line.strip() for line in f]
else:
    # get feature importance, RF takes around 17 minutes, DT ~3 and GB ~30
    clf = RandomForestClassifier()  # DecisionTreeClassifier, GradientBoostingClassifier,
    clf.fit(X_train, y_train)

    result = permutation_importance(clf, X_train, y_train, n_repeats=5,random_state=RANDOM_STATE)
    feat_importance = result.importances_mean
    feat_importance = pd.DataFrame(feat_importance, index=df.columns, columns=["Importance"])
    feat_importance.sort_values(by='Importance', ascending=True, inplace=True)
    feat_importance.plot(kind='barh', figsize=(8,6))

    # save it to a file to limit time consumption for next run
    f = open("feature_importance.txt", "w")
    for feature in feat_importance:
        f.write(feature+"\n")
    f.close()

    feat_importance.sort_values(by='Importance', ascending=False, inplace=True)
    feat_importance = feat_importance.index.values.take([i for i in range(TOP)]).tolist()


Copy over the dataframe but with the important features only :

In [None]:
realistic = ['RightPocketAccelerometer x-axis (g)','RightPocketAccelerometer y-axis (g)','RightPocketAccelerometer z-axis (g)','WristAccelerometer x-axis (g)', 'WristAccelerometer y-axis (g)', 'WristAccelerometer z-axis (g)']
df_realistic = df[realistic]

In [None]:
df = df[feat_importance]
df.head()

Dividing the data into 60/20/20 split of train/test/validate of the dataset with only the important features.

In [None]:
X_remaining, X_test, y_remaining, y_test = train_test_split(df, labels, test_size=0.2, random_state=RANDOM_STATE)
X_train, X_val, y_train, y_val = train_test_split(X_remaining, y_remaining, test_size=0.25, random_state=RANDOM_STATE)
print(len(X_train))
print(len(X_test))
print(len(X_val))

## Sequential Neural Network

Building the model of neural network using Keras

In [None]:
ANN_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=X_train.shape[1]),
    tf.keras.layers.Dense(units=256, activation='relu'),
    tf.keras.layers.LayerNormalization(),
    tf.keras.layers.Dropout(0.05),
    tf.keras.layers.Dense(units=128, activation='relu'),
    tf.keras.layers.LayerNormalization(),
    tf.keras.layers.Dropout(0.05),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.LayerNormalization(),
    tf.keras.layers.Dropout(0.05),
    tf.keras.layers.Dense(units=num_classes.size+1, activation='softmax')
])

# Compile the model with binary cross-entropy loss and Adam optimizer:

ANN_model.compile(
    optimizer = 'adam',
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics = ['accuracy']
)

ANN_model.summary()


Training the model

In [None]:
# Train the model on the training data:
ANN_history = ANN_model.fit(X_train, y_train, validation_data = (X_val, y_val), epochs=300, batch_size=1024)

In [None]:
# summarize history for accuracy
plt.plot(ANN_history.history['accuracy'])
plt.plot(ANN_history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(ANN_history.history['loss'])
plt.plot(ANN_history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
score = ANN_model.evaluate(X_train, y_train)

In [None]:
y_pred1 = ANN_model.predict(X_test)
y_pred = np.argmax(y_pred1, axis=1)

# Print f1, precision, and recall scores
print("Precision score = ", precision_score(y_test, y_pred , average="macro"))
print("Recall score = ", recall_score(y_test, y_pred , average="macro"))
print("F1 score = ", f1_score(y_test, y_pred , average="macro"))

## Recurrent Neural Network

In [None]:
RNN_model = tf.keras.Sequential([
    tf.keras.layers.LSTM(units=64, return_sequences=True, input_shape=(X_train.shape[1],1)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=32, return_sequences=True)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=16, return_sequences=True)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.LSTM(units=8),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(num_classes.size + 1, activation='softmax')
])

# Compiling the RNN
RNN_model.compile(
    optimizer='adam', # rmsprop
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics = ['accuracy'])

RNN_model.summary()

In [None]:
if not isinstance(X_train, np.ndarray):
    X_train = X_train.to_numpy().reshape(X_train.shape[0],X_train.shape[1],1)

In [None]:
RNN_history = RNN_model.fit(X_train, y_train, validation_data = (X_val, y_val), epochs=300, batch_size=1024)

In [None]:
# summarize history for accuracy
plt.plot(RNN_history.history['accuracy'])
plt.plot(RNN_history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(RNN_history.history['loss'])
plt.plot(RNN_history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
score = RNN_model.evaluate(X_train, y_train)

In [None]:
y_pred1 = RNN_model.predict(X_test)
y_pred = np.argmax(y_pred1, axis=1)

# Print f1, precision, and recall scores
print("Precision score = ", precision_score(y_test, y_pred , average="macro"))
print("Recall score = ", recall_score(y_test, y_pred , average="macro"))
print("F1 score = ", f1_score(y_test, y_pred , average="macro"))


## Particle Swarm Optimization

In [None]:
import math
import numpy as np
class Bounds:
    import numpy as np
    ## The goal of the bounds class is to ensure swarm positions are set within a certain range
    def __init__(self, lower, upper, enforce='Clip'):
        ##The enforce argument is used to determine what happens if a position is out of the boundaries.
        ##Clip sets the position to be either the lower or upper limit, depending on which side of the boundaries the position is.
        ##Resample sets a random position within the boundaries.
        self.lower = np.array(lower)
        self.upper = np.array(upper)
        self.enforce = enforce.lower()

    def Upper(self):
        # Upper boundaries
        return self.upper

    def Lower(self):
        # Lower boundaries
        return self.lower

    def Limits(self, pos):
        # The limits function changes the positions so they are in a certain range in case they are out of the boundaries
        npart, ndim = pos.shape

        for i in range(npart):  ##Looping through every particle
            if self.enforce == "resample":
                for j in range(ndim):  ##Looping through every dimension
                    if (pos[i,j] <= self.lower[j]) or (pos[i,j] >= self.upper[j]):
                        pos[i, j] = self.lower[j] + (self.upper[j] - self.lower[j]) * np.random.random()

            else:
                for j in range(ndim):  ## Looping through every dimension
                    ## In case of enforce=clip, the positions that are out of boundaries are set to lower/upper limit depending on what side of the boundaries the positions are.
                    if pos[i, j] <= self.lower[j]:
                        pos[i, j] = self.lower[j]
                    if pos[i, j] >= self.upper[j]:
                        pos[i, j] = self.upper[j]

        return pos

    def Validate(self, pos):
        ## The validate function simply returns the positions and can be used to validate that the positions are within the boundaries.
        return pos
class QuasiRandomInitializer:
    def Halton(self, i, b):
        f = 1.0
        r = 0
        while i > 0:
            f = f / b
            r = r + f * (i % b)
            i = math.floor(i / float(b))
        return b

    def __init__(self, n_particles=10, n_dimensions=3, bounds=None, k=1, jitter=0.0):
        self.n_particles = n_particles
        self.n_dimensions = n_dimensions
        self.bounds = bounds
        self.k = k
        self.jitter = jitter
        self.primes = [
            2, 3, 5, 7, 11, 13, 17, 19, 23, 29,
            31, 37, 41, 43, 47, 53, 59, 61, 67, 71,
            73, 79, 83, 89, 97, 101, 103, 107, 109, 113,
            127, 131, 137, 139, 149, 151, 157, 163, 167, 173,
            179, 181, 191, 193, 197, 199, 211, 223, 227, 229,
            233, 239, 241, 251, 257, 263, 269, 271, 277, 281,
            283, 293, 307, 311, 313, 317, 331, 337, 347, 349,
            353, 359, 367, 373, 379, 383, 389, 397, 401, 409,
            419, 421, 431, 433, 439, 443, 449, 457, 461, 463,
            467, 479, 487, 491, 499, 503, 509, 521, 523, 541,
            547, 557, 563, 569, 571, 577, 587, 593, 599, 601,
            607, 613, 617, 619, 631, 641, 643, 647, 653, 659]

    def InitializeSwarm(self):
        self.swarm = np.zeros((self.n_particles, self.n_dimensions))
        if self.bounds == None:
            lo = np.zeros(self.n_dimensions)
            hi = np.ones(self.n_dimensions)
        else:
            lo = self.bounds.Lower()
            hi = self.bounds.Upper()
        for i in range(self.n_particles):
            for j in range(self.n_dimensions):
                h = self.Halton(i + self.k, self.primes[j % len(self.primes)])
                q = self.jitter * (np.random.random() - 0.5)
                self.swarm[i, j] = lo[j] + (hi[j] - lo[j])*h + q
        if (self.bounds != None):
            self.swarm = self.bounds.Limits(self.swarm)
        return self.swarm
class LinearInertia:
    def __init__(self, hi=0.9, lo=0.6):
        if hi > lo:
            self.hi = hi
            self.lo = lo
        else:
            self.hi = lo
            self.lo = hi

    def CalculateW(self, w0, iterations, max_iter):
        return self.hi - (iterations / max_iter) * (self.hi - self.lo)
class PSO:
    """Particle swarm optimization"""

    # -----------------------------------------------------------
    #  __init__
    #
    def __init__(self, obj,  # the objective function (subclass Objective)
                 npart=10,  # number of particles in the swarm
                 ndim=3,  # number of dimensions in the swarm
                 max_iter=200,  # maximum number of steps
                 c1=1.49,  # cognitive parameter
                 c2=1.49,  # social parameter
                 #  best if w > 0.5*(c1+c2) - 1:
                 w=0.729,  # base velocity decay parameter
                 inertia=None,  # velocity weight decay object (None == constant)
                 #  Bare-bones from:
                 #    Kennedy, James. "Bare bones particle swarms." In Proceedings of
                 #    the 2003 IEEE Swarm Intelligence Symposium. SIS'03 (Cat. No. 03EX706),
                 #    pp. 80-87. IEEE, 2003.
                 bare=False,  # if True, use bare-bones update
                 bare_prob=0.5,  # probability of updating a particle's component
                 tol=None,  # tolerance (done if no done object and gbest < tol)
                 init=None,  # swarm initialization object (subclass Initializer)
                 done=None,  # custom Done object (subclass Done)
                 ring=False,  # use ring topology if True
                 neighbors=2,  # number of particle neighbors for ring, must be even
                 vbounds=None,  # velocity bounds object
                 bounds=None):  # swarm bounds object

        self.obj = obj
        self.npart = npart
        self.ndim = ndim
        self.max_iter = max_iter
        self.init = init
        self.done = done
        self.vbounds = vbounds
        self.bounds = bounds
        self.tol = tol
        self.c1 = c1
        self.c2 = c2
        self.w = w
        self.bare = bare
        self.bare_prob = bare_prob
        self.inertia = inertia
        self.ring = ring
        self.neighbors = neighbors
        self.initialized = False

        if (ring) and (neighbors > npart):
            self.neighbors = npart

    # -----------------------------------------------------------
    #  Results
    #
    def Results(self):
        """Return the current results"""

        if (not self.initialized):
            return None

        return {
            "npart": self.npart,  # number of particles
            "ndim": self.ndim,  # number of dimensions
            "max_iter": self.max_iter,  # maximum possible iterations
            "iterations": self.iterations,  # iterations actually performed
            "c1": self.c1,  # cognitive parameter
            "c2": self.c2,  # social parameter
            "w": self.w,  # base velocity decay parameter
            "tol": self.tol,  # tolerance value, if any
            "gbest": self.gbest,  # sequence of global best function values
            "giter": self.giter,  # iterations when global best updates happened
            "gpos": self.gpos,  # global best positions
            "gidx": self.gidx,  # particle number for new global best
            "pos": self.pos,  # current particle positions
            "vel": self.vel,  # velocities
            "xpos": self.xpos,  # per particle best positions
            "xbest": self.xbest,  # per particle bests
        }

    # -----------------------------------------------------------
    #  Initialize
    #
    def Initialize(self):
        """Set up the swarm"""

        self.initialized = True
        self.iterations = 0

        self.pos = self.init.InitializeSwarm()  # initial swarm positions
        self.vel = np.zeros((self.npart, self.ndim))  # initial velocities
        self.xpos = self.pos.copy()  # these are the particle bests
        self.xbest = self.Evaluate(self.pos)  # and objective function values

        #  Swarm and particle bests
        self.gidx = []
        self.gbest = []
        self.gpos = []
        self.giter = []

        self.gidx.append(np.argmin(self.xbest))
        self.gbest.append(self.xbest[self.gidx[-1]])
        self.gpos.append(self.xpos[self.gidx[-1]].copy())
        self.giter.append(0)

    # -----------------------------------------------------------
    #  Done
    #
    def Done(self):
        """Check if we are done"""

        if (self.done == None):
            if (self.tol == None):
                print(self.iterations)
                return (self.iterations == self.max_iter)
            else:
                return (self.gbest[-1] < self.tol) or (self.iterations == self.max_iter)
        else:
            return self.done.Done(self.gbest,
                                  gpos=self.gpos,
                                  pos=self.pos,
                                  max_iter=self.max_iter,
                                  iteration=self.iterations)

    # -----------------------------------------------------------
    #  Evaluate
    #
    def Evaluate(self, pos):
        """Evaluate a set of positions"""

        p = np.zeros(self.npart)
        for i in range(self.npart):
            p[i] = self.obj.Evaluate(pos[i])
        return p

    # -----------------------------------------------------------
    #  RingNeighborhood
    #
    def RingNeighborhood(self, n):
        """Return a list of particles in the neighborhood of n"""

        idx = np.array(range(n - self.neighbors // 2, n + self.neighbors // 2 + 1))
        i = np.where(idx >= self.npart)
        if (len(i) != 0):
            idx[i] = idx[i] % self.npart
        i = np.where(idx < 0)
        if (len(i) != 0):
            idx[i] = self.npart + idx[i]

        return idx

    # -----------------------------------------------------------
    #  NeighborhoodBest
    #
    def NeighborhoodBest(self, n):
        """Return neighborhood best for particle n"""

        if (not self.ring):
            return self.gbest[-1], self.gpos[-1]

        # Using a ring, return best known position of the neighborhood
        lbest = 1e9
        lpos=self.pos[0]
        for i in self.RingNeighborhood(n):
            if (self.xbest[i] < lbest):
                lbest = self.xbest[i]
                lpos = self.xpos[i]

        return lbest, lpos

    # -----------------------------------------------------------
    #  BareBonesUpdate
    #
    def BareBonesUpdate(self):
        """Apply a bare-bones update to the positions"""

        pos = np.zeros((self.npart, self.ndim))

        for i in range(self.npart):
            lbest, lpos = self.NeighborhoodBest(i)
            for j in range(self.ndim):
                if (np.random.random() < self.bare_prob):
                    m = 0.5 * (lpos[j] + self.xpos[i, j])
                    s = np.abs(lpos[j] - self.xpos[i, j])
                    pos[i, j] = np.random.normal(m, s)
                else:
                    pos[i, j] = self.xpos[i, j]

        return pos

    # -----------------------------------------------------------
    #  Step
    #
    def Step(self):
        """Do one swarm step"""

        #  Weight for this iteration
        if (self.inertia != None):
            w = self.inertia.CalculateW(self.w, self.iterations, self.max_iter)
        else:
            w = self.w

        if (self.bare):
            #  Bare-bones position update
            self.pos = self.BareBonesUpdate()
        else:
            #  Canonical position/velocity update
            for i in range(self.npart):
                lbest, lpos = self.NeighborhoodBest(i)
                c1 = self.c1 * np.random.random(self.ndim)
                c2 = self.c2 * np.random.random(self.ndim)
                self.vel[i] = w * self.vel[i] + \
                              c1 * (self.xpos[i] - self.pos[i]) + \
                              c2 * (lpos - self.pos[i])

            #  Keep velocities bounded
            if (self.vbounds != None):
                self.vel = self.vbounds.Limits(self.vel)

            #  Update the positions
            self.pos = self.pos + self.vel

        #  Keep positions bounded
        if (self.bounds != None):
            self.pos = self.bounds.Limits(self.pos)

        #  Evaluate the new positions
        p = self.Evaluate(self.pos)

        #  Check if any new particle and swarm bests
        for i in range(self.npart):
            if (p[i] < self.xbest[i]):  # is new position a particle best?
                self.xbest[i] = p[i]  # keep the function value
                self.xpos[i] = self.pos[i]  # and position
            if (p[i] < self.gbest[-1]):  # is new position global best?
                self.gbest.append(p[i])  # new position is new swarm best
                self.gpos.append(self.pos[i].copy())  # keep the position
                self.gidx.append(i)  # particle number
                self.giter.append(self.iterations)  # and when it happened

        self.iterations += 1

    # -----------------------------------------------------------
    #  Optimize
    #
    def Optimize(self):
        """Run a full optimization and return the best"""

        self.Initialize()

        while (not self.Done()):
            self.Step()

        return self.gbest[-1], self.gpos[-1]
class Done:
    """Customize 'done' idea"""

        #-----------------------------------------------------------
        #  __init__
        #
    def __init__(self):
        """Constructor"""

        pass


    #-----------------------------------------------------------
    #  Done
    #
    def Done(self, gbest, gpos=None, pos=None, max_iter=None, iteration=None):
        """Return True if done"""

        return False
class Bounds:
    import numpy as np
    ## The goal of the bounds class is to ensure swarm positions are set within a certain range
    def __init__(self, lower, upper, enforce='Clip'):
        ##The enforce argument is used to determine what happens if a position is out of the boundaries.
        ##Clip sets the position to be either the lower or upper limit, depending on which side of the boundaries the position is.
        ##Resample sets a random position within the boundaries.
        self.lower = np.array(lower)
        self.upper = np.array(upper)
        self.enforce = enforce.lower()

    def Upper(self):
        # Upper boundaries
        return self.upper

    def Lower(self):
        # Lower boundaries
        return self.lower

    def Limits(self, pos):
        # The limits function changes the positions so they are in a certain range in case they are out of the boundaries
        npart, ndim = pos.shape

        for i in range(npart):  ##Looping through every particle
            if self.enforce == "resample":
                for j in range(ndim):  ##Looping through every dimension
                    if (pos[i,j] <= self.lower[j]) or (pos[i,j] >= self.upper[j]):
                        pos[i, j] = self.lower[j] + (self.upper[j] - self.lower[j]) * np.random.random()

            else:
                for j in range(ndim):  ## Looping through every dimension
                    ## In case of enforce=clip, the positions that are out of boundaries are set to lower/upper limit depending on what side of the boundaries the positions are.
                    if pos[i, j] <= self.lower[j]:
                        pos[i, j] = self.lower[j]
                    if pos[i, j] >= self.upper[j]:
                        pos[i, j] = self.upper[j]

        return pos

    def Validate(self, pos):
        ## The validate function simply returns the positions and can be used to validate that the positions are within the boundaries.
        return pos

# PSO for ANN

In [None]:
class Objective:
    def __init__(self): 
        self.positions=[]
        self.metric=[]
        self.counter = 0

    def printstats(self):
        best_idx=np.argmin(self.metric)
        print("Best metric", self.metric[best_idx])
        print("Best position",self.positions[best_idx])
    
    def Evaluate(self,pos):
        h1,h2,h3=pos
        h1,h2,h3=int(h1),int(h2),int(h3)
        model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=X_train.shape[1]),
        tf.keras.layers.Dense(units=h1, activation='relu'),
        tf.keras.layers.LayerNormalization(),
        tf.keras.layers.Dropout(0.05),
        tf.keras.layers.Dense(units=h2, activation='relu'),
        tf.keras.layers.LayerNormalization(),
        tf.keras.layers.Dropout(0.05),
        tf.keras.layers.Dense(units=h3, activation='relu'),
        tf.keras.layers.LayerNormalization(),
        tf.keras.layers.Dropout(0.05),
        tf.keras.layers.Dense(units=num_classes.size+1, activation='softmax')
    ])
        callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
    # Compile the model with binary cross-entropy loss and Adam optimizer:

        model.compile(
            #optimizer = 'adam'
            
            loss = tf.keras.losses.SparseCategoricalCrossentropy(),
            metrics = ['accuracy']
        )
        history = model.fit(X_train, y_train, validation_data = (X_val, y_val), epochs=300, batch_size=1000,callbacks=[callback])
        losses=model.evaluate(X_train,y_train)
        metric=losses[0]
        
 

        self.metric.append(metric)
        self.positions.append(pos)
        self.printstats()
        plt.plot(self.metric)
        plt.ylabel("Training categorical cross entropy loss")
        plt.xlabel("Individual")
        plt.savefig(f'pso_plots/ANN_{self.counter}.png')
        self.counter += 1
        plt.show()

In [None]:
obj=Objective()
npart = 10
ndim = 3
m =10
tol = None
b = Bounds([30,30,30], [500,500,500],enforce="resample")
i = QuasiRandomInitializer(npart, ndim, bounds=b)
t = LinearInertia()
swarm = PSO(obj=obj, npart=npart, ndim=ndim, init=i, tol=tol,
max_iter=m, bounds=b)
opt=swarm.Optimize()
res=swarm.Results()
pos = res["gpos"][-1]
g = res["gbest"][-1]

## Optimised ANN (achieved at position: 88,416,398)

In [None]:
ANN_model_oped = tf.keras.Sequential([
    tf.keras.layers.Input(shape=X_train.shape[1]),
    tf.keras.layers.Dense(units=88, activation='relu'),
    tf.keras.layers.LayerNormalization(),
    tf.keras.layers.Dropout(0.05),
    tf.keras.layers.Dense(units=416, activation='relu'),
    tf.keras.layers.LayerNormalization(),
    tf.keras.layers.Dropout(0.05),
    tf.keras.layers.Dense(units=398, activation='relu'),
    tf.keras.layers.LayerNormalization(),
    tf.keras.layers.Dropout(0.05),
    tf.keras.layers.Dense(units=num_classes.size+1, activation='softmax')
])

# Compile the model with binary cross-entropy loss and Adam optimizer:

ANN_model_oped.compile(
    optimizer = 'adam',
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics = ['accuracy']
)

ANN_model_oped.summary()

In [None]:
# Train the model on the training data:
ANN_oped_history = ANN_model_oped.fit(X_train, y_train, validation_data = (X_val, y_val), epochs=300, batch_size=1024)

In [None]:
# summarize history for accuracy
plt.plot(ANN_oped_history.history['accuracy'])
plt.plot(ANN_oped_history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(ANN_oped_history.history['loss'])
plt.plot(ANN_oped_history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
y_pred1 = ANN_model_oped.predict(X_test)
y_pred = np.argmax(y_pred1, axis=1)

# Print f1, precision, and recall scores
print("Precision score = ", precision_score(y_test, y_pred , average="macro"))
print("Recall score = ", recall_score(y_test, y_pred , average="macro"))
print("F1 score = ", f1_score(y_test, y_pred , average="macro"))

# PSO for RNN

In [None]:
class Objective:
    def __init__(self): 
        self.positions=[]
        self.metric=[]
        self.counter = 0

    def printstats(self):
        best_idx=np.argmin(self.metric)
        print("Best metric", self.metric[best_idx])
        print("Best position",self.positions[best_idx])
    
    def Evaluate(self,pos):
        h1,h2,h3,h4=pos
        h1,h2,h3,h4=int(h1),int(h2),int(h3),int(h4)
        model = tf.keras.Sequential([
        tf.keras.layers.LSTM(units=h1, return_sequences=True, input_shape=(X_train.shape[1],1)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=h2, return_sequences=True)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=h3, return_sequences=True)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.LSTM(units=h4),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(num_classes.size + 1, activation='softmax')
    ])
        callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
    # Compile the model with binary cross-entropy loss and Adam optimizer:

        model.compile(
            #optimizer = 'adam'
            
            loss = tf.keras.losses.SparseCategoricalCrossentropy(),
            metrics = ['accuracy']
        )
        history = model.fit(X_train, y_train, validation_data = (X_val, y_val), epochs=300, batch_size=1000,callbacks=[callback])
        losses=model.evaluate(X_train,y_train)
        metric=losses[0]
        
 

        self.metric.append(metric)
        self.positions.append(pos)
        self.printstats()
        plt.plot(self.metric)
        plt.ylabel("Training categorical cross entropy loss")
        plt.xlabel("Individual")
        plt.savefig(f'pso_plots/RNN_{self.counter}.png')
        self.counter += 1
        plt.show()

In [None]:
obj=Objective()
npart = 10
ndim = 4
m =10
tol = None
b = Bounds([30,30,30,30], [500,500,500,500],enforce="resample")
i = QuasiRandomInitializer(npart, ndim, bounds=b)
t = LinearInertia()
swarm = PSO(obj=obj, npart=npart, ndim=ndim, init=i, tol=tol,
max_iter=m, bounds=b)
opt=swarm.Optimize()
res=swarm.Results()
pos = res["gpos"][-1]
g = res["gbest"][-1]

## Optimised RNN (achieved at position: 225,396,203,186)

In [None]:
RNN_model_oped = tf.keras.Sequential([
    tf.keras.layers.LSTM(units=225, return_sequences=True, input_shape=(X_train.shape[1],1)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=396, return_sequences=True)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=203, return_sequences=True)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.LSTM(units=186),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(num_classes.size + 1, activation='softmax')
])

# Compiling the RNN
RNN_model_oped.compile(
    optimizer='adam', # rmsprop
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics = ['accuracy'])

RNN_model_oped.summary()

In [None]:
if not isinstance(X_train, np.ndarray):
    X_train = X_train.to_numpy().reshape(X_train.shape[0],X_train.shape[1],1)

In [None]:
# Train the model on the training data:
RNN_oped_history = RNN_model_oped.fit(X_train, y_train, validation_data = (X_val, y_val), epochs=300, batch_size=1024)

In [None]:
# summarize history for accuracy
plt.plot(RNN_oped_history.history['accuracy'])
plt.plot(RNN_oped_history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(RNN_oped_history.history['loss'])
plt.plot(RNN_oped_history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
y_pred1 = RNN_model_oped.predict(X_test)
y_pred = np.argmax(y_pred1, axis=1)

# Print f1, precision, and recall scores
print("Precision score = ", precision_score(y_test, y_pred , average="macro"))
print("Recall score = ", recall_score(y_test, y_pred , average="macro"))
print("F1 score = ", f1_score(y_test, y_pred , average="macro"))

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=2e0e26f6-0d34-49f5-8e85-f3cc34f3f75f' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>