In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import InformerModel
from sklearn.metrics import classification_report, balanced_accuracy_score
import numpy as np
import os
from sklearn.decomposition import PCA
from scipy.signal import welch, coherence
from scipy.stats import skew, kurtosis
import pywt

In [None]:
from transformers import InformerConfig, InformerForPrediction

class InformerClassifier(nn.Module):
    def __init__(self, input_size=19, prediction_length=1, context_length=1, d_model=64, num_classes=4):
        super(InformerClassifier, self).__init__()
        
        # Create Informer configuration
        config = InformerConfig(
            input_size=input_size,
            prediction_length=prediction_length,
            context_length=context_length,
            d_model=d_model
        )
        
        # Initialize Informer model with the configuration
        self.informer = InformerForPrediction(config)
        
        # Classification head
        self.fc = nn.Linear(d_model, num_classes)
    
    def forward(self, x):
        # Assuming `x` contains the required tensors for Informer
        outputs = self.informer(
            past_values=x["past_values"],
            past_time_features=x["past_time_features"],
            past_observed_mask=x["past_observed_mask"]
        )
        # Use the hidden states for classification
        hidden_state = outputs.last_hidden_state
        logits = self.fc(hidden_state)
        return logits


In [None]:
import numpy as np
from scipy.signal import welch, coherence
from scipy.stats import skew, kurtosis
import pywt
from sklearn.decomposition import PCA

def extracted_features(data, sampling_rate=256, window_size=50):
    num_channels, timesteps = data.shape
    num_windows = timesteps // window_size  # Number of windows per signal
    extracted_features = []

    for w in range(num_windows):
        window_features = []  # Store features for all channels in this window
        
        for channel in range(num_channels):
            window = data[channel, w * window_size:(w + 1) * window_size]

            # **Statistical Features**
            mean = np.mean(window)
            variance = np.var(window)
            rms = np.sqrt(np.mean(window**2))
            skewness = skew(window)
            kurt = kurtosis(window)
            zero_crossings = np.sum(np.diff(np.sign(window)) != 0)

            # **Power Spectral Density (Frequency Features)**
            freqs, psd = welch(window, fs=sampling_rate, nperseg=window_size)
            delta_power = np.sum(psd[(freqs >= 0.5) & (freqs < 4)])
            theta_power = np.sum(psd[(freqs >= 4) & (freqs < 8)])
            alpha_power = np.sum(psd[(freqs >= 8) & (freqs < 12)])
            beta_power = np.sum(psd[(freqs >= 13) & (freqs < 30)])
            gamma_power = np.sum(psd[(freqs >= 30) & (freqs < 100)])

            # **Wavelet Transform Features**
            coeffs = pywt.wavedec(window, wavelet='db4', level=2)
            wavelet_features = [np.mean(c) for c in coeffs] + [np.std(c) for c in coeffs]

            # Collect all features
            channel_features = [mean, variance, rms, skewness, kurt, zero_crossings,
                                delta_power, theta_power, alpha_power, beta_power, gamma_power] + wavelet_features
            
            window_features.append(channel_features)  # Append channel features

        extracted_features.append(window_features)  # Append window features

    extracted_features = np.array(extracted_features)  # Shape: (num_windows, num_channels, num_features)
    extracted_features = extracted_features.reshape(num_windows, -1)  # Flatten channels into feature vector
    print(f"Extracted features shape: {extracted_features.shape}")
    
    return extracted_features  # Shape: (num_windows, total_features_per_window)


In [None]:
def load_split_data(split_name, base_data_path, folders, class_mapping):
    split_path = os.path.join(base_data_path, split_name)
    X, y = [], []
    for folder in folders:
        folder_path = os.path.join(split_path, folder)
        if not os.path.exists(folder_path):
            print(f"Folder {folder_path} does not exist.")
            continue
        
        files = [f for f in os.listdir(folder_path) if f.endswith('.npy')]
        if not files:
            print(f"Folder {folder_path} is empty.")
            continue
        
        for file in files:
            file_path = os.path.join(folder_path, file)
            try:
                data = np.load(file_path).astype(np.float32)
                if data.shape == (19, 500):  # Ensure consistent shape
                    features = extracted_features(data)
                    X.append(features)
                    y.append(class_mapping[folder])
            except Exception as e:
                print(f"Error loading {file_path}: {e}")
    
    X, y = np.array(X), np.array(y)
    print(f"Loaded {split_name} data shape: {X.shape}, Labels shape: {y.shape}")
    return X, y

In [None]:
# Load datasets
base_data_path = '/kaggle/input/impppp/Impulse/EEG_Data'
data_splits = ['train_data', 'validation_data']
folders = ['Normal', 'Complex_Partial_Seizures', 'Electrographic_Seizures', 'Video_detected_Seizures_with_no_visual_change_over_EEG']
class_mapping = {folder: idx for idx, folder in enumerate(folders)}

X_train, y_train = load_split_data('train_data', base_data_path, folders, class_mapping)
X_val, y_val = load_split_data('validation_data', base_data_path, folders, class_mapping)

sampled_indices = np.random.choice(len(X_train), size=int(0.1 * len(X_train)), replace=False)
X_train_sampled, y_train_sampled = X_train[sampled_indices], y_train[sampled_indices]
print(f"Shape of X_train_sampled: {X_train_sampled.shape}")


# Create Dataloaders
train_dataset = EEGDataset(X_train_sampled, y_train_sampled)
val_dataset = EEGDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

print(f"Data prepared: Train size {len(train_dataset)}, Validation size {len(val_dataset)}")




In [None]:
class EEGDataset(Dataset):
    def __init__(self, X, y=None, sampling_rate=256):
        """
        X: (samples, timesteps, features)
        y: Labels
        """
        self.past_values = torch.tensor(X, dtype=torch.float32)  # Shape: (samples, timesteps, features)
        self.past_observed_mask = torch.ones_like(self.past_values, dtype=torch.float32)
        self.labels = torch.tensor(y, dtype=torch.long) if y is not None else None

        # Generate past_time_features
        num_timesteps = X.shape[1]  # Assuming X is (samples, timesteps, features)
        
        # Example: Normalized time index
        time_index = np.linspace(0, 1, num_timesteps)
        self.past_time_features = torch.tensor(time_index, dtype=torch.float32).unsqueeze(0).repeat(len(X), 1).unsqueeze(-1)
        
        # You can add more features to this if needed.
    
    def __len__(self):
        return len(self.past_values)

    def __getitem__(self, idx):
        data = {
            "past_values": self.past_values[idx],  # (timesteps, features)
            "past_observed_mask": self.past_observed_mask[idx],  # (timesteps, features)
            "past_time_features": self.past_time_features[idx]  # (timesteps, num_features)
        }
        if self.labels is not None:
            data["labels"] = self.labels[idx]
        return data



In [None]:
def train_model(model, train_loader, val_loader, num_epochs=10, learning_rate=1e-3, device='cuda' if torch.cuda.is_available() else 'cpu'):
    """
    Trains the InformerClassifier model.

    Args:
        model: The model to be trained.
        train_loader: DataLoader for training data.
        val_loader: DataLoader for validation data.
        num_epochs: Number of epochs to train.
        learning_rate: Learning rate for optimizer.
        device: Device to train on ('cuda' or 'cpu').

    Returns:
        Trained model.
    """
    # Move model to device
    model = model.to(device)
    
    # Define optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    
    # Training loop
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for batch in train_loader:
            # Move data to device
            past_values = batch['past_values'].to(device)
            past_time_features = batch['past_time_features'].to(device)
            past_observed_mask = batch['past_observed_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # Zero the gradients
            optimizer.zero_grad()
            
            # Forward pass
            logits = model({
                "past_values": past_values,
                "past_time_features": past_time_features,
                "past_observed_mask": past_observed_mask
            })
            
            # Compute loss
            loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))
            
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation loop
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch in val_loader:
                past_values = batch['past_values'].to(device)
                past_time_features = batch['past_time_features'].to(device)
                past_observed_mask = batch['past_observed_mask'].to(device)
                labels = batch['labels'].to(device)
                
                logits = model({
                    "past_values": past_values,
                    "past_time_features": past_time_features,
                    "past_observed_mask": past_observed_mask
                })
                
                loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))
                val_loss += loss.item()
                
                preds = torch.argmax(logits, dim=-1).cpu().numpy()
                all_preds.extend(preds)
                all_labels.extend(labels.cpu().numpy())
        
        # Compute metrics
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        balanced_acc = balanced_accuracy_score(all_labels, all_preds)
        
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print(f"Train Loss: {train_loss:.4f} | Validation Loss: {val_loss:.4f} | Balanced Accuracy: {balanced_acc:.4f}")
    
    return model


In [None]:
model = InformerClassifier(input_size=X_train_sampled.shape[1])
print(f"past_values shape: {past_values.shape}")
print(f"past_time_features shape: {past_time_features.shape}")
trained_model = train_model(model, train_loader, val_loader, num_epochs=10, learning_rate=1e-3)

