In [104]:
import os
import pyarrow.parquet as pq
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import tensorflow as tf

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Concatenate, Flatten, Dropout,BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import LearningRateScheduler

from scipy import signal
from skimage.transform import resize
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [105]:
from tensorflow.keras import backend as K
K.clear_session()

In [106]:
def read_data(data_folder, num_files=None):
    """
    Read EEG data from .npy files in the specified data folder.

    Parameters:
    - data_folder (str): Path to the main data folder containing 'train' and 'test' subfolders.
    - num_files (int or None): Number of files to read from each subfolder. If None, all files will be read.

    Returns:
    - train (List[Tuple[np.ndarray, np.ndarray]]): List of tuples containing train EEG data.
    - test (List[Tuple[np.ndarray, np.ndarray]]): List of tuples containing test EEG data.
    - train_labels (pd.DataFrame): DataFrame containing train labels.
    - test_labels (pd.DataFrame): DataFrame containing test labels.
    """
    train_eeg_folder = os.path.join(data_folder, 'npy_train')
    test_eeg_folder = os.path.join(data_folder, 'npy_test')


    def read_npy_folder(folder_path, n_files=None):
        arrays = []
        files_to_read = os.listdir(folder_path)[:n_files] if n_files else os.listdir(folder_path)
        for file in files_to_read:
            if file.endswith('.npy'):
                file_path = os.path.join(folder_path, file)
                array = np.load(file_path)
                arrays.append(array)
        print(f"Read {len(arrays)} files from {folder_path}.")
        return arrays

    # Read EEG data
    train_eeg = read_npy_folder(train_eeg_folder, num_files)
    test_eeg = read_npy_folder(test_eeg_folder)

    # Interpolate NaN values (if needed)
    train_eeg = [np.nan_to_num(array) for array in train_eeg]

    # Combine data into tuples
    train_labels = pd.read_csv('train.csv', nrows=num_files)
    test_labels = pd.read_csv('test.csv')

    return train_eeg, test_eeg, train_labels, test_labels

In [107]:
def single_vis(f, visualization_type):
    """
    Visualize single EEG channels or Spectrogram from the provided DataFrame.

    Parameters:
    - f (pd.DataFrame): The DataFrame containing the data to be visualized.
    - visualization_type (str): Specify the type of visualization: 'eeg' for EEG channels or 'spectrogram' for Spectrogram.

    Raises:
    - ValueError: If an invalid `visualization_type` is provided.

    Returns:
    None
    """
    
    if visualization_type == 'eeg':
        channels_to_exclude = []
        title = 'EEG Channels Visualization'
        
        eeg_channels = [column for column in f.columns if column not in channels_to_exclude]
        plt.figure(figsize=(40, 10))
        
        # Calculate the offset for each channel based on the maximum amplitude
        max_amplitude = f[eeg_channels].max().max()
        channel_offset = max_amplitude * 1.2  # Adjust the multiplier as needed
        
        for i, column in enumerate(eeg_channels):
            y_values = f[column] + i * channel_offset
            plt.plot(f.index, y_values, label=column)

        plt.title(title)
        plt.xlabel('Sample')
        plt.ylabel('Amplitude')
        plt.legend()
        plt.show()
        
    elif visualization_type == 'spectrogram':
        channels_to_exclude = ['time']
        title = 'Spectrogram Visualization'
        
        spectrogram_channels = [column for column in f.columns if column not in channels_to_exclude]
        plt.figure(figsize=(40, 10))
        
        combined_spectrogram = np.zeros((len(f), len(spectrogram_channels)))
        for i, column in enumerate(spectrogram_channels):
            combined_spectrogram[:, i] = f[column].values
        
        plt.imshow(combined_spectrogram.T, aspect='auto', cmap='viridis', interpolation='nearest')
        plt.title(title)
        plt.xlabel('Sample')
        plt.ylabel('Channel')
        plt.show()
        
    else:
        raise ValueError("Invalid visualization type. Use 'eeg' or 'spectrogram'.")


In [108]:
desired_length = 100
num_features = 32
desired_length = 100  
num_frequency_bins = 32  
num_classes = 6 
num_files = 100

In [109]:

def preprocess_eeg(X_train_eeg, target_shape=(desired_length, num_features)):
    eeg_array = np.array(X_train_eeg)[:desired_length, :num_features].astype(np.float32)
    return eeg_array



def preprocess_spectrogram(spectrogram_df, target_shape=(desired_length, num_frequency_bins)):
    spec_array = np.array(spectrogram_df)[:target_shape[0], :target_shape[1]].astype(np.float32)
    return spec_array

def create_model(input_shape_eeg, input_shape_spectrogram, num_classes=6):
    """Create a multi-input, multi-output model for
    EEG and Spectrogram data.

    Args:
        input_shape_eeg : shape of one EEG sample
        input_shape_spectrogram : shape of one Spectrogram sample
        num_classes : 6 for seizure, lpd, gpd, lrda, grda, other

    Returns:
        keras model
    """
    
    model = tf.keras.Sequential([
        tf.keras.layers.LSTM(units=64, input_shape=input_shape_eeg[1:], return_sequences=True),
        BatchNormalization(),
        Dropout(0.4),
        tf.keras.layers.LSTM(units=64),
        BatchNormalization(),
        Dropout(0.4),
        Dense(32, activation='relu'),
        Dense(6, activation='softmax')
        ])
    return model

def lr_schedule(epoch, lr):
    if epoch % 10 == 0 and epoch > 0:
        return lr * 0.9
    return lr

In [110]:
train,test,train_labels,test_labels = read_data('data',num_files)

Read 100 files from data/npy_train.
Read 1 files from data/npy_test.


In [111]:

X_train, X_test, y_train, y_test = train_test_split(train, train_labels, test_size=0.2, random_state=42,shuffle=True)

In [112]:
X_train= np.concatenate(X_train, axis=0)

In [113]:
# Define the number of examples you want
num_examples = int(num_files * 0.8)
 
# Calculate the number of samples per example
samples_per_example = X_train.shape[0] // num_examples

# Initialize a list to store the split examples
X_train_split = []

# Split X_train
for i in range(num_examples):
    start_index = i * samples_per_example
    end_index = (i + 1) * samples_per_example
    example = X_train[start_index:end_index]
    X_train_split.append(example)

# Convert the list of examples into a numpy array
X_train_split = np.array(X_train_split)

In [114]:
X_train_split.shape

(80, 13625, 20)

In [115]:
y_train = np.array(y_train)

In [116]:
labels = y_train[:, 8]

# Encoding labels using LabelEncoder
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)

# Convert labels to categorical one-hot encoding
one_hot_labels = to_categorical(encoded_labels, num_classes=num_classes)

# Convert labels to float32
y_train = one_hot_labels.astype('float32')

# Convert other columns to numeric values
for col_index in range(y_train.shape[1]):
    # Skip label column as it has already been encoded
    if col_index == 8:
        continue
    
    # Convert each element in the column to a numeric value
    y_train[:, col_index] = np.array([float(val) if isinstance(val, str) and '.' in val else int(val) for val in y_train[:, col_index]])


In [117]:
X_train_eeg = X_train_split.astype(np.float32)


input_shape_eeg = X_train_eeg.shape#Shape of one EEG sample

num_classes = 6


In [118]:
num_classes=6


In [119]:
for i in range(len(X_train_eeg)):
    if np.any(np.isnan(X_train_eeg[i])):
        print(i)

In [120]:
# print(X_train_eeg.dtype)
# print(X_train_spectrogram.dtype)
# print(y_train.dtype)
# print(input_shape_eeg[1:])
# print(input_shape_spectrogram[1:])
# print(np.any(np.isnan(X_train_eeg)))
# print(np.any(np.isnan(X_train_spectrogram)))
# print(np.any(np.isinf(X_train_eeg)))
# print(np.any(np.isinf(X_train_spectrogram)))

In [121]:
print("Shape of X_train_eeg:", X_train_eeg.shape)
print("Shape of y_train:", y_train.shape)
print(input_shape_eeg)


Shape of X_train_eeg: (80, 13625, 20)
Shape of y_train: (80, 6)
(80, 13625, 20)


In [None]:
model = create_model(input_shape_eeg, 6)
optimizer = Adam(learning_rate=0.001)
lr_scheduler = LearningRateScheduler(lr_schedule)

model.compile(optimizer=optimizer, loss = tf.keras.losses.KLDivergence(), metrics=['accuracy'])

model.fit(X_train_eeg, y_train, epochs=10, batch_size=2)


In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 13625, 64)         21760     
                                                                 
 batch_normalization (Batch  (None, 13625, 64)         256       
 Normalization)                                                  
                                                                 
 dropout (Dropout)           (None, 13625, 64)         0         
                                                                 
 lstm_1 (LSTM)               (None, 64)                33024     
                                                                 
 batch_normalization_1 (Bat  (None, 64)                256       
 chNormalization)                                                
                                                                 
 dropout_1 (Dropout)         (None, 64)                0

In [None]:
# # X_test = np.concatenate(X_test, axis=0)
# num_examples_test = int(num_files * 0.2)  
# samples_per_example_test = X_test.shape[0] // num_examples_test
# X_test_split = []
# for i in range(num_examples_test):
#     start_index = i * samples_per_example_test
#     end_index = (i + 1) * samples_per_example_test
#     example = X_test[start_index:end_index]
#     X_test_split.append(example)
# X_test_split = np.array(X_test_split)
# X_test_eeg = X_test_split.astype(np.float32)


In [None]:
labels_test = y_test.iloc[:, 8]
encoded_labels_test = label_encoder.transform(labels_test)
one_hot_labels_test = to_categorical(encoded_labels_test, num_classes=num_classes)
y_test = one_hot_labels_test.astype('float32')

In [None]:
loss, accuracy = model.evaluate(X_test, y_test)


In [None]:
X_test_eeg = np.array([preprocess_eeg(item) for item in X_test])
y_pred = model.predict(X_test_eeg)

eeg_ids_test = [X_test[i][0].index[0] for i in range(len(X_test))]

output_df = pd.DataFrame({
    'eeg_id': eeg_ids_test,
    'seizure_vote': y_pred[:, 0],
    'lpd_vote': y_pred[:, 1],
    'gpd_vote': y_pred[:, 2],
    'lrda_vote': y_pred[:, 3],
    'grda_vote': y_pred[:, 4],
    'other_vote': y_pred[:, 5]
})


In [None]:
output_df

In [None]:
output_df['predicted_class'] = output_df.iloc[:, 1:].idxmax(axis=1)

In [None]:
print(output_df[['eeg_id', 'predicted_class']])

In [None]:
print("Shape of X_test_eeg:", X_test_eeg.shape)
print("NaN values in X_test_eeg:", np.isnan(X_test_eeg).any())


In [None]:
y_test = np.array(y_test)
labelst = y_test[:, 8]

# Encoding labels using LabelEncoder
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labelst)

# Convert labels to categorical one-hot encoding
one_hot_labels = to_categorical(encoded_labels, num_classes=num_classes)

# Convert labels to float32
y_test = one_hot_labels.astype('float32')

# Convert other columns to numeric values
for col_index in range(y_test.shape[1]):
    # Skip label column as it has already been encoded
    if col_index == 8:
        continue
    
    # Convert each element in the column to a numeric value
    y_test[:, col_index] = np.array([float(val) if isinstance(val, str) and '.' in val else int(val) for val in y_test[:, col_index]])
