In [None]:
!pip install pyswarm

In [None]:
import numpy as np

import pandas as pd

import seaborn as sns

from sklearn.preprocessing import StandardScaler

from sklearn.model_selection import train_test_split

from keras.utils import to_categorical

import joblib

import tensorflow as tf

from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout

from tensorflow.keras.utils import to_categorical

from sklearn.preprocessing import StandardScaler

from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score, confusion_matrix

tf.keras.backend.clear_session()


In [None]:
df=pd.read_csv('/kaggle/input/eeg-data/preprocessed_dataset.csv')

sns.countplot(x='label', data=df)



# df.isnull().sum().sum()



encode = ({'NEUTRAL': 0, 'POSITIVE': 1, 'NEGATIVE': 0} )

df_encoded = df.replace(encode)



X=df_encoded.drop(["label"]  ,axis=1)

y = df_encoded.loc[:,'label'].values

X_orig=df_encoded.drop(["label"]  ,axis=1)

y_orig = to_categorical(df_encoded.loc[:,'label'].values)


In [None]:
scaler = StandardScaler()

scaler.fit(X)

X = scaler.transform(X)

y = to_categorical(y)

# print(y.shape)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)

print(X_train.shape)

X_train = np.reshape(X_train, (X_train.shape[0],1,X.shape[1]))

X_test = np.reshape(X_test, (X_test.shape[0],1,X.shape[1]))

In [None]:
import numpy as np

import tensorflow as tf

from tensorflow.keras.models import Model, Sequential

from tensorflow.keras.layers import Input, Dense, Conv1D, MaxPooling1D, UpSampling1D, Flatten, Dropout, BatchNormalization

from tensorflow.keras.optimizers import Adam

from tensorflow.keras.callbacks import EarlyStopping

from sklearn.metrics import confusion_matrix

from pyswarm import pso  # Install pyswarm for PSO algorithm



# Helper function to add noise

def add_noise(data, noise_factor=0.2):  

    noisy_data = data + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=data.shape)

    return np.clip(noisy_data, 0., 1.)



# Denoising Autoencoder architecture

def get_denoising_autoencoder(input_shape):

    input_layer = Input(shape=input_shape)

    

    x = Conv1D(64, 2, activation='relu', padding='same')(input_layer)

    x = BatchNormalization()(x)  

    x = MaxPooling1D(2, padding='same')(x)

    x = Conv1D(128, 2, activation='relu', padding='same')(x)

    x = BatchNormalization()(x)

    encoded = MaxPooling1D(2, padding='same')(x)



    x = Conv1D(128, 2, activation='relu', padding='same')(encoded)

    x = BatchNormalization()(x)

    x = UpSampling1D(2)(x)

    x = Conv1D(64, 2, activation='relu', padding='same')(x)

    x = BatchNormalization()(x)

    x = UpSampling1D(2)(x)

    decoded = Conv1D(input_shape[-1], 2, activation='sigmoid', padding='same')(x)

    autoencoder = Model(input_layer, decoded)

    return autoencoder



# Classifier model

def get_classifier_model(input_shape, dense_units_1=128, dense_units_2=64, dense_units_3=32, dropout_rate=0.5):

    model = Sequential([

        Flatten(input_shape=input_shape),

        Dense(dense_units_1, activation='relu'),  

        BatchNormalization(),  

        Dropout(dropout_rate),

        Dense(dense_units_2, activation='relu'),

        BatchNormalization(),  

        Dropout(dropout_rate),

        Dense(dense_units_3, activation='relu'),

        Dropout(dropout_rate),

        Dense(2, activation='sigmoid')  # Assuming 3 classes

    ])

    return model



# PSO objective function: optimize learning_rate and number of units in dense layers

def pso_objective(params):

    learning_rate, dense_units_1, dense_units_2, dense_units_3, dropout_rate = params

    

    autoencoder = get_denoising_autoencoder(input_shape)

    autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')  # Autoencoder not affected by PSO

    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)



    # Train autoencoder

    autoencoder.fit(X_train_noisy, X_train, epochs=10, batch_size=64, validation_data=(X_test_noisy, X_test), 

                    callbacks=[early_stopping], verbose=0)



    encoder = Model(inputs=autoencoder.input, outputs=autoencoder.layers[4].output)

    X_train_encoded = encoder.predict(X_train)

    X_test_encoded = encoder.predict(X_test)



    classifier = get_classifier_model(X_train_encoded.shape[1:], dense_units_1=int(dense_units_1), 

                                      dense_units_2=int(dense_units_2), dense_units_3=int(dense_units_3), 

                                      dropout_rate=dropout_rate)

    classifier.compile(optimizer=Adam(learning_rate=learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])



    history = classifier.fit(X_train_encoded, y_train, epochs=10, batch_size=64, validation_data=(X_test_encoded, y_test), 

                             callbacks=[early_stopping], verbose=0)



    # Return negative of validation accuracy (PSO minimizes, we want to maximize accuracy)

    val_acc = history.history['val_accuracy'][-1]

    return -val_acc



# PSO bounds: [learning_rate, dense_units_1, dense_units_2, dense_units_3, dropout_rate]

lb = [1e-3, 128, 64, 32, 0.3]  # Lower bounds

ub = [1e-2, 130, 70, 40, 0.5]  # Upper bounds



# Add noise to the training and testing datasets

X_train_noisy = add_noise(X_train)

X_test_noisy = add_noise(X_test)



# Define input shape from the dataset

input_shape = (X_train.shape[1], X_train.shape[2])



# Perform PSO

best_params, best_val = pso(pso_objective, lb, ub, swarmsize=20, maxiter=10)



# Extract best hyperparameters

best_learning_rate, best_dense_units_1, best_dense_units_2, best_dense_units_3, best_dropout_rate = best_params



print(f"Best Hyperparameters:\nLearning Rate: {best_learning_rate}\nDense Units 1: {best_dense_units_1}\nDense Units 2: {best_dense_units_2}\nDense Units 3: {best_dense_units_3}\nDropout Rate: {best_dropout_rate}")



# Retrain the final model with optimal hyperparameters

autoencoder = get_denoising_autoencoder(input_shape)

autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)



autoencoder.fit(X_train_noisy, X_train, epochs=50, batch_size=64, validation_data=(X_test_noisy, X_test), 

                callbacks=[early_stopping], verbose=1)



encoder = Model(inputs=autoencoder.input, outputs=autoencoder.layers[4].output)

X_train_encoded = encoder.predict(X_train)

X_test_encoded = encoder.predict(X_test)



classifier = get_classifier_model(X_train_encoded.shape[1:], dense_units_1=int(best_dense_units_1), 

                                  dense_units_2=int(best_dense_units_2), dense_units_3=int(best_dense_units_3), 

                                  dropout_rate=best_dropout_rate)

classifier.compile(optimizer=Adam(learning_rate=best_learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])



classifier.fit(X_train_encoded, y_train, epochs=50, batch_size=64, validation_data=(X_test_encoded, y_test), 

               callbacks=[early_stopping], verbose=1)



# Evaluate the final model

score, acc = classifier.evaluate(X_test_encoded, y_test)

print(f"Final Classifier Accuracy: {acc}")



# Save the best model

classifier.save('best_classifier_model.h5')



# Print confusion matrix

y_pred = classifier.predict(X_test_encoded)

predict_classes = np.argmax(y_pred, axis=1)

expected_classes = np.argmax(y_test, axis=1)

conf_matrix = confusion_matrix(expected_classes, predict_classes)

print("Confusion Matrix:\n", conf_matrix)
