In [7]:
!pip install mne --quiet

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#imports
import os
import numpy as np
import mne

In [None]:
stage_map = {
    "W":0,
    "1":1,
    "2":2,
    "3":3,
    "4":3,
    "R":4
}

In [None]:
def load_and_segment(path_psg,path_hyp,subject_id):
    raw = mne.io.read_raw_edf(path_psg, preload=True, stim_channel=None, verbose = False)
    raw.pick_types(eeg=True,eog=True,emg=True)

    sfreq = raw.info["sfreq"]
    sig_eeg = raw.get_data(picks="eeg")
    sig_eog = raw.get_data(picks="eog")
    sig_emg = raw.get_data(picks="emg")

    ann = mne.read_annotations(path_hyp)

    Xe,Xo,Xm,labs,subs = [],[],[],[],[]

    epoch_len = int(30*sfreq) #30 second windows
    for onset,desc in zip(ann.onset, ann.description):
        char = desc.split()[-1]
        if char in stage_map: #skipping M
            start = int(onset * sfreq)

            if start + epoch_len <= sig_eeg.shape[1]:
                seg_eeg = sig_eeg[:, start:start + epoch_len]
                seg_eog = sig_eog[:, start:start + epoch_len]
                seg_emg = sig_emg[:, start:start + epoch_len]

                Xe.append(seg_eeg.T)
                Xo.append(seg_eog.T)
                Xm.append(seg_emg.T)
                #turns to  3000,n_ch
                labs.append(stage_map[char])
                subs.append(subject_id)

    return np.array(Xe), np.array(Xo), np.array(Xm), np.array(labs), np.array(subs)


In [None]:
base_dir = "/content/drive/My Drive/sleep-edf"
data_dir = base_dir
all_Xe, all_Xo, all_Xm, all_y,all_subj = [], [], [], [], []

max_files = 5
file_c = 0

for subset in ["sleep-cassette","sleep-telemetry"]:
    subdir = os.path_join(data_dir, subset)
    for fname in os.listdir(subdir):
        if fname.endswith("PSG.edf"):
            psg_path = os.path.join(subdir, fname)
            prefix = fname.replace("-PSG.edf", "")
            hyp_files = [f for f in os.listdir(subdir) if f.startswith(prefix[:-1]) and f.endswith("Hypnogram.edf")]

            if not hyp_files:
                continue

            hyp_path = os.path.join(subdir, hyp_files[0])
            subj_str = prefix[3:5] if subset == "sleep-cassette" else prefix[3:5]
            subj_id = int(subj_str)

            Xe,Xo, Xm,labs,subs = load_and_segment(psg_path, hyp_path, subj_id)
            all_Xe.append(Xe);all_Xo.append(Xo);all_Xm.append(Xm);all_y.append(labs);all_subj.append(subs)
            
            file_c += 1
            if file_c >= max_files:
                break
    if file_c >= max_files:
        break

X_eeg, X_eog,X_emg, y, subjects = np.vstack(all_Xe),np.vstack(all_Xo),np.vstack(all_Xm),np.concatenate(all_y),np.concatenate(all_subj)

print(f"Total epochs: {len(y)} (EEG shape: {X_eeg.shape}, EOG shape: {X_eog.shape}, EMG shape: {X_emg.shape})")
print(f"class distrib: {np.bincount(y)}")
print(f"# of subjects: {len(set(subjects))}")

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
unique_subj, train_subj,test_subj = np.unique(subjects), train_test_split(np.unique(subjects), test_size=0.2, random_state=42)

train_mask = np.isin(subjects, train_subj)
test_mask = np.isin(subjects, test_subj)

Xe_train, Xe_test, Xo_train, Xo_test, Xm_train,Xm_test,y_train, y_test = X_eeg[train_mask], X_eeg[test_mask], X_eog[train_mask], X_eog[test_mask], X_emg[train_mask], X_emg[test_mask], y[train_mask], y[test_mask]

print(f"Train subj: {len(train_subj)},test subj: {len(test_subj)}")
print(f"Train epochs: {len(y_train)}, Test epochs: {len(y_test)}")
print(f"Train class #: {np.bincount(y_train)}")

In [None]:
!pip install tensorflow --quiet

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D,BatchNormalization,Activation,MaxPooling1D,GlobalAveragePooling1D,Dense,concatenate

In [None]:
n_eeg, n_eog,n_emg = Xe_train.shape[2], Xo_train.shape[2], Xm_train.shape[2]
input_eeg,input_eog,input_emg = Input(shape=(3000, n_eeg)), Input(shape=(3000, n_eog)), Input(shape=(3000, n_emg))

def build_branch(input_tensor,filters =16):
    x = Conv1D(filters, 64, padding='same')(input_tensor)
    x = BatchNormalization()(x); x = Activation('relu')(x)
    x = MaxPooling1D(4)(x)
    x = Conv1D(filters*2, 32, padding='same')(x)
    x = BatchNormalization()(x); x = Activation('relu')(x)
    x = MaxPooling1D(4)(x)
    x =GlobalAveragePooling1D()(x)
    return x

branch_eeg,branch_eog,branch_emg = build_branch(input_eeg, filters=16), build_branch(input_eog, filters=8), build_branch(input_emg, filters=8)
merged = concatenate([branch_eeg, branch_eog, branch_emg])
output = Dense(64, activation='softmax')(merged)
output = Dense(5, activation='softmax')(output)

model = Model(inputs=[input_eeg, input_eog, input_emg], outputs=output)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
perm = np.random.RandomState(0).permutation(len(y_train))
Xe_train, Xo_train, Xm_train, y_train = Xe_train[perm], Xo_train[perm], Xm_train[perm], y_train[perm]
history = model.fit(
    [Xe_train,Xo_train,Xm_train], y_train,
    epochs=20,batch_size=64,validation_split=0.1, verbose=2
)

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score

In [None]:
y_pred = model.predict([Xe_test,Xo_test,Xm_test]).argmax(axis=1)
acc = accuracy_score(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred)
print(f"Test accuracy: {acc:.4f}")
print("Confusion Matrix: (rows = true, cols=predicted:\n)",cm)

In [None]:
Xo_test_zero = np.zeros_like(Xo_test)
y_pred_no_eog = model.predict([Xe_test,Xo_test_zero,Xm_test]).argmax(axis=1)
acc_no_eog = accuracy_score(y_test, y_pred_no_eog)
print(f"Test accuracy without EOG: {acc_no_eog:.4f}")
cm_no_eog = confusion_matrix(y_test, y_pred_no_eog)
print("Confusion Matrix without EOG:\n)",cm_no_eog)