In [22]:
import numpy as np
import pyedflib
import os
import zipfile
from tqdm import tqdm
import pandas as pd

zip_path = 'EEG_DATASETS.zip' 
extract_path = './edf_data'

if not os.path.exists(extract_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
        print(f"Extracted files to {extract_path}")

edf_files = [os.path.join(extract_path, f) for f in os.listdir(extract_path) if f.endswith('.edf')]
print(f"Processing {len(edf_files)} EEG files...")


edf_files = [os.path.join(extract_path, f) for f in os.listdir(extract_path) if f.endswith('.edf')]
print(f"Processing {len(edf_files)} EEG files...")

all_features = []
all_labels = []
file_info = []

from scipy.signal import butter, filtfilt
import numpy as np

# Define bandpass filter function
def bandpass_filter(data, low=0.5, high=40, fs=256, order=5):
    nyq = 0.5 * fs
    low /= nyq
    high /= nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data, axis=1)

# Define sliding window feature extraction function
def sliding_window_features(data, window_size=256, step_size=128):
    num_channels, num_samples = data.shape
    features = []
    for start in range(0, num_samples - window_size + 1, step_size):
        window = data[:, start:start + window_size]
        # Example features: mean and std per channel
        feat = np.concatenate([np.mean(window, axis=1), np.std(window, axis=1)])
        features.append(feat)
    return np.array(features)

# Process each file
for i, filename in enumerate(tqdm(edf_files)):
    try:
        # Load EEG data
        with pyedflib.EdfReader(filename) as f:
            n_channels = f.signals_in_file
            signal_labels = f.getSignalLabels()
            sampling_rate = int(f.getSampleFrequency(0))
            
            # Read all channels
            data = np.zeros((n_channels, f.getNSamples()[0]))
            for ch in range(n_channels):
                data[ch, :] = f.readSignal(ch)
        
        # Preprocess and extract features
        filtered_data = bandpass_filter(data, fs=sampling_rate)
        features = sliding_window_features(filtered_data)


        
        # Create labels (0 = normal, 1 = seizure)
        # Files with 'sz' in name contain seizures
        is_seizure = 1 if 'sz' in filename.lower() else 0
        labels = np.full(features.shape[0], is_seizure)
        
        all_features.append(features)
        all_labels.append(labels)
        
        file_info.append({
            'filename': filename,
            'n_channels': n_channels,
            'n_windows': features.shape[0],
            'has_seizure': is_seizure,
            'duration_hours': data.shape[1] / (sampling_rate * 3600)
        })
        
    except Exception as e:
        print(f"Error processing {filename}: {e}")

# Combine all features and labels
X = np.vstack(all_features)
y = np.concatenate(all_labels)

print(f"Dataset created:")
print(f"Total samples: {X.shape[0]}")
print(f"Features per sample: {X.shape[1]}")
print(f"Seizure samples: {np.sum(y)}")
print(f"Normal samples: {len(y) - np.sum(y)}")

# Save complete dataset
np.save('eeg_features_complete.npy', X)
np.save('eeg_labels_complete.npy', y)
pd.DataFrame(file_info).to_csv('file_info.csv', index=False)

print("Complete dataset saved!")

Processing 39 EEG files...
Processing 39 EEG files...


100%|██████████| 39/39 [02:22<00:00,  3.65s/it]


Dataset created:
Total samples: 273605
Features per sample: 46
Seizure samples: 0
Normal samples: 273605
Complete dataset saved!


In [24]:
from scipy.signal import welch

def sliding_window_features(data, window_size=256, step_size=128, fs=256):
    num_channels, num_samples = data.shape
    features = []
    for start in range(0, num_samples - window_size + 1, step_size):
        window = data[:, start:start + window_size]
        feat = []

        for ch in range(num_channels):
            ch_data = window[ch]

            # Time-domain
            feat.append(np.mean(ch_data))
            feat.append(np.std(ch_data))

            # Frequency-domain using Welch PSD
            freqs, psd = welch(ch_data, fs)
            # Compute power in standard EEG bands
            delta = np.trapz(psd[(freqs >= 0.5) & (freqs < 4)])
            theta = np.trapz(psd[(freqs >= 4) & (freqs < 8)])
            alpha = np.trapz(psd[(freqs >= 8) & (freqs < 13)])
            beta  = np.trapz(psd[(freqs >= 13) & (freqs < 30)])
            gamma = np.trapz(psd[(freqs >= 30) & (freqs < 50)])
            feat.extend([delta, theta, alpha, beta, gamma])

        features.append(feat)

    return np.array(features)


In [26]:
from sklearn.preprocessing import StandardScaler

X = np.load('eeg_features_complete.npy')
y = np.load('eeg_labels_complete.npy')

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
np.save('eeg_features_scaled.npy', X_scaled)


In [28]:
X_scaled = X_scaled.reshape(-1, 1, X.shape[1], 1)


In [38]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from keras.saving import register_keras_serializable

MAX_TIMESTEPS = 500
def pad_or_truncate(data, max_len):
    if data.shape[0] > max_len:
        return data[:max_len]
    else:
        pad_width = ((0, max_len - data.shape[0]), (0, 0))  
        return np.pad(data, pad_width=pad_width, mode='constant')

uniform_features = []
for feature in all_features:
    uniform_features.append(pad_or_truncate(feature, MAX_TIMESTEPS))

all_features = np.array(uniform_features)

clean_labels = []
for label in all_labels:
    if isinstance(label, (list, np.ndarray)):
        clean_labels.append(int(label[0]))
    else:
        clean_labels.append(int(label))

all_labels = np.array(clean_labels)

all_features = (all_features - all_features.mean()) / all_features.std()

#  DEFINE MODEL
from keras.saving import register_keras_serializable

@register_keras_serializable()
class SeizurePredictionModel(tf.keras.Model):
    def __init__(self, input_shape=None, **kwargs):
        super(SeizurePredictionModel, self).__init__(**kwargs)
        self.input_shape_ = input_shape  # Save input_shape for config

        self.cnn = models.Sequential([
            layers.Conv1D(64, kernel_size=3, activation='relu', input_shape=input_shape),
            layers.MaxPooling1D(pool_size=2),
            layers.Conv1D(128, kernel_size=3, activation='relu'),
            layers.MaxPooling1D(pool_size=2),
            layers.Dropout(0.3)
        ])
        self.lstm = layers.Bidirectional(layers.LSTM(64))
        self.dense = layers.Dense(64, activation='relu')
        self.out = layers.Dense(1, activation='sigmoid')

    def call(self, x):
        x = self.cnn(x)
        x = self.lstm(x)
        x = self.dense(x)
        return self.out(x)

    def get_config(self):
        config = super().get_config()
        config.update({
            "input_shape": self.input_shape_
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)


In [39]:
import numpy as np
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(all_features, all_labels, test_size=0.2, random_state=42)

input_shape = (X_train.shape[1], X_train.shape[2])

model = SeizurePredictionModel(input_shape)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2)


Epoch 1/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 16s/step - accuracy: 0.7083 - loss: 0.6636 - val_accuracy: 1.0000 - val_loss: 0.3745
Epoch 2/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 457ms/step - accuracy: 1.0000 - loss: 0.3976 - val_accuracy: 1.0000 - val_loss: 0.2199
Epoch 3/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 687ms/step - accuracy: 1.0000 - loss: 0.2421 - val_accuracy: 1.0000 - val_loss: 0.1272
Epoch 4/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 508ms/step - accuracy: 1.0000 - loss: 0.1463 - val_accuracy: 1.0000 - val_loss: 0.0742
Epoch 5/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 486ms/step - accuracy: 1.0000 - loss: 0.0873 - val_accuracy: 1.0000 - val_loss: 0.0438
Epoch 6/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 549ms/step - accuracy: 1.0000 - loss: 0.0521 - val_accuracy: 1.0000 - val_loss: 0.0263
Epoch 7/10
[1m1/1[0m [32m━━━━━━━━━━━━━

In [40]:
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 183ms/step - accuracy: 1.0000 - loss: 0.0055
Test Loss: 0.0055
Test Accuracy: 1.0000


In [41]:
model.save("seizure_model.keras", save_format="keras")
print("Model saved successfully as 'seizure_model.keras'")




Model saved successfully as 'seizure_model.keras'
