In [None]:
import sys

In [None]:
!{sys.executable} -m pip install imbalanced-learn

In [None]:
import os
import numpy as np
import tensorflow as tf
import pickle
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, optimizers, Input, Model
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.callbacks import Callback

# Ensure compatibility with different TensorFlow versions
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'

In [None]:
data_dir = 'C:\\Users\\MindRove_BZs\\Diploma\\processed_data_05Hz_no_norm\\'
class_dir = 'C:\\Users\\MindRove_BZs\\Diploma\\classes\\'

def load_data(patient_ids):
    emg_data = {}
    eeg_data = {}
    for i in patient_ids:
        with open(os.path.join(data_dir, f'normalized_filtered_emg_data_patient_{i}.pkl'), 'rb') as f:
            patient_emg_data = pickle.load(f)
            for key in patient_emg_data:
                if isinstance(key, float) and key.is_integer():
                    key = int(key)  
                modified_key = f"patient_{i}_{key}"
                emg_data[modified_key] = patient_emg_data[key]
        
        with open(os.path.join(data_dir, f'normalized_filtered_eeg_data_patient_{i}.pkl'), 'rb') as f:
            patient_eeg_data = pickle.load(f)
            for key in patient_eeg_data:
                if isinstance(key, float) and key.is_integer():
                    key = int(key)  
                modified_key = f"patient_{i}_{key}"
                eeg_data[modified_key] = patient_eeg_data[key]
        print(f"Size of eeg data after patient {i}: {len(eeg_data)}")
    
    return emg_data, eeg_data

def load_classes():
    class_files = [f for f in os.listdir(class_dir) if f.endswith('.pickle')]
    all_classes = {}
    for file in class_files:
        with open(os.path.join(class_dir, file), 'rb') as f:
            classes_data = pickle.load(f)
            identifier = file.split('_')[-1].replace('.pickle', '')
            for key in classes_data:
                modified_key = f"patient_{identifier}_{key}"
                all_classes[modified_key] = classes_data[key]
    return all_classes

In [None]:
def filter_data(emg_data, eeg_data):
    channels_to_keep = np.r_[0:6, 7:8] 

    filtered_emg_data = {}
    filtered_eeg_data = {}
    for frame in emg_data:
        if emg_data[frame].shape[1] >= 30 and eeg_data[frame].shape[1] >= 30:
            min_length = 30
            filtered_emg_data[frame] = emg_data[frame][:, :min_length]
            filtered_eeg_data[frame] = eeg_data[frame][channels_to_keep][:, :min_length]
    return filtered_emg_data, filtered_eeg_data


def prepare_dataset(emg_data, eeg_data, all_classes):
    filtered_emg_data, filtered_eeg_data = filter_data(emg_data, eeg_data)
    X = []
    y = []
    for frame in all_classes.keys():
        
        if frame in filtered_emg_data and frame in filtered_eeg_data:
            combined_data = np.concatenate((filtered_emg_data[frame], filtered_eeg_data[frame]), axis=0)
            X.append(combined_data)
            y.append(all_classes[frame])
    X = np.array(X)
    y = to_categorical(np.array(y))
    return X, y

In [None]:

class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)
    
    def result(self):
        p = self.precision.result()
        r = self.recall.result()
        return 2 * ((p * r) / (p + r + tf.keras.backend.epsilon()))
    
    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()


class AdjustLearningRate(Callback):
    def __init__(self, new_lr, epoch_to_adjust):
        super(AdjustLearningRate, self).__init__()
        self.new_lr = new_lr
        self.epoch_to_adjust = epoch_to_adjust

    def on_epoch_begin(self, epoch, logs=None):
        if epoch == self.epoch_to_adjust:
            old_lr = self.model.optimizer.learning_rate.numpy() 
            self.model.optimizer.learning_rate.assign(self.new_lr)
            print(f"\nEpoch {epoch+1}: Adjusting learning rate from {old_lr} to {self.new_lr}.")

def cnn_model(lr=1e-5):
    input_shape = (15, 30, 1)
    num_classes = 8
    
    input_tensor = Input(shape=input_shape)
    x = layers.Flatten()(input_tensor)
    x = layers.Dense(1024, activation='relu')(x)
    x = layers.Dense(512, activation='relu')(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(32, activation='relu')(x)
    output_tensor = layers.Dense(num_classes, activation='softmax')(x)

    model = Model(input_tensor, output_tensor)

    opt = optimizers.Nadam(learning_rate=lr)
    model.compile(optimizer=opt, 
                  loss="categorical_crossentropy",
                  metrics=["accuracy", F1Score()])

    #adjust_lr_callback = AdjustLearningRate(new_lr=new_lr, epoch_to_adjust=adjust_lr_epoch-1)

    return model

In [None]:

def plot(loss, val_loss, acc, val_acc):
    epochs = range(1, len(loss) + 1)
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, 'b-', label='Training loss')
    plt.plot(epochs, val_loss, 'r-', label='Validation loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(epochs, acc, 'b-', label='Training Accuracy')
    plt.plot(epochs, val_acc, 'r-', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

In [None]:
def plot_confusion_matrix(cm, classes, title='Confusion Matrix', cmap=plt.cm.Blues):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = 'd'
    thresh = cm.max() / 2.
    for i, j in np.ndindex(cm.shape):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [None]:
def ensure_one_hot_encoding(y):
    if len(y.shape) == 1:
        n_classes = np.max(y) + 1
        y = to_categorical(y, num_classes=n_classes)
    return y

In [None]:

def visualize_results(y_true, y_pred_classes, patient_ids, base_folder="C:\\Users\\MindRove_BZs\\Pictures\\"):
    correct_indices = np.where(y_true == y_pred_classes)[0]
    incorrect_indices = np.where(y_true != y_pred_classes)[0]

    correct_to_visualize = correct_indices[:3]  
    incorrect_to_visualize = incorrect_indices[:3]  

    def display_image(frame_number, patient_id, correct=True):
        if patient_id == 5:
            folder_name = f"{base_folder}0507_patient_{patient_id}"
        if patient_id == 6:
            folder_name = f"{base_folder}0517_patient_{patient_id}"
        if patient_id == 7:
            folder_name = f"{base_folder}0522_patient_{patient_id}"
        if patient_id == 8:
            folder_name = f"{base_folder}0523_patient_{patient_id}"
        if patient_id == 9:
            folder_name = f"{base_folder}05231_patient_{patient_id}"
        if patient_id == 10:
            folder_name = f"{base_folder}05232_patient_{patient_id}"
        if patient_id == 11:
            folder_name = f"{base_folder}05233_patient_{patient_id}"
        file_name = f"frame_{frame_number}_*.png"  
        full_path = os.path.join(folder_name, file_name)
        image_files = glob.glob(full_path)
        if image_files:
            img = plt.imread(image_files[0])
            plt.figure()
            plt.imshow(img)
            predicted_class = y_pred_classes[frame_number]
            truth = y_true[frame_number]
            plt.title(f"{'Correct' if correct else 'Incorrect'} Prediction for Patient {patient_id}\nPredicted: {predicted_class}, Truth: {truth}")
            plt.show()
        else:
            print(f"No image found for frame {frame_number} and patient {patient_id}")

    for idx in correct_to_visualize:
        display_image(idx, patient_ids[idx % len(patient_ids)], correct=True)

    for idx in incorrect_to_visualize:
        display_image(idx, patient_ids[idx % len(patient_ids)], correct=False)

In [None]:

def main():
    patient_ids = range(5, 12)
    emg_data, eeg_data = load_data(patient_ids)
    print(f"Number of frames in EMG data: {len(emg_data)}")
    print(f"Number of frames in EEG data: {len(eeg_data)}")
    
    all_classes = load_classes()
    print(f"Number of frames in all classes: {len(all_classes)}")    

    X, y = prepare_dataset(emg_data, eeg_data, all_classes)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")

    X_train = X_train.reshape((-1, 15, 30, 1))
    X_test = X_test.reshape((-1, 15, 30, 1))

    y_train_labels = np.argmax(y_train, axis=1)

    class_weights = compute_class_weight('balanced', classes=np.unique(y_train_labels), y=y_train_labels)
    print("Class weights:", class_weights)
    class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}


    y_train = ensure_one_hot_encoding(y_train)
    y_test = ensure_one_hot_encoding(y_test)

    print(f"y_train shape (after check): {y_train.shape}")
    print(f"y_test shape (after check): {y_test.shape}")
        
    model = cnn_model(lr=1e-4)

    history = model.fit(X_train, y_train, epochs=3, validation_data=(X_test, y_test))

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)

    all_classes = [0, 1, 2, 3, 4, 5, 6, 7]

    print(f"Length of all_classes: {len(all_classes)}")
    print(f"Content of all_classes: {all_classes}")
    all_classes_str = [str(c) for c in all_classes]  

    print("Classification Report:")
    print(classification_report(y_true, y_pred_classes, target_names=all_classes_str))

    cm = confusion_matrix(y_true, y_pred_classes)
    plt.figure(figsize=(10, 10))
    plot_confusion_matrix(cm, classes=all_classes_str)
    plt.show()

    evaluation_results = model.evaluate(X_test, y_test, verbose=2)
    test_loss, test_accuracy = evaluation_results[:2]
    print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")
    
    model.save('C:\\Users\\MindRove_BZs\\Diploma\\models\\model_results.h5')
    plot(history.history['loss'], history.history['val_loss'], history.history['accuracy'], history.history['val_accuracy'])

    visualize_results(y_true, y_pred_classes, list(patient_ids))

main()