In [2]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import os
import matplotlib.pyplot as plt
import torch.optim as optim

In [117]:
import os
import scipy.io as sio
import torch
from torch.utils.data import Dataset

class MatFileDataset(Dataset):
    def __init__(self, directory):
        self.features = []  # Will store multi-channel fractal features (num_channels, feature_length)
        self.signals = []   # Will store multi-channel EEG signals (num_channels, signal_length)
        self.labels = []    # Labels for each window (before_label, after_label)
        self.num_channels = 4  # Number of channels (4 separate .mat files)
        self.fractal_feature_length = None  # To determine feature length from first valid data

        self._load_data(directory)
        self._validate_labels()

    def _load_data(self, directory):
        """Load all four channels separately and align participant data correctly."""
        # Step 1: Find all .mat files (assuming four files exist, one per channel)
        mat_files = sorted([os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".mat")])
        if len(mat_files) != self.num_channels:
            raise ValueError(f"Expected {self.num_channels} .mat files, but found {len(mat_files)}")

        # Step 2: Load data from all channels
        all_channels_data = [sio.loadmat(f, struct_as_record=False, squeeze_me=True) for f in mat_files]

        # Step 3: Check if all files contain 'all_window_features'
        for i, mat_data in enumerate(all_channels_data):
            if "all_window_features" not in mat_data:
                raise ValueError(f"Missing 'all_window_features' in file {mat_files[i]}")

        # Step 4: Process participant data by aligning across channels
        num_participants = len(all_channels_data[0]["all_window_features"])
        
        for participant_idx in range(num_participants):
            # Extract participant data from each channel
            participant_windows = [
                mat_data["all_window_features"][participant_idx] for mat_data in all_channels_data
            ]

            # Skip if any channel is missing participant data
            if any(p is None for p in participant_windows):
                continue

            # Step 5: Ensure that windows are aligned across channels
            num_windows = len(participant_windows[0])  # Get window count from first channel
            for win_idx in range(num_windows):
                # Extract corresponding window from each channel
                window_data = [participant_windows[ch][win_idx] for ch in range(self.num_channels)]

                # Extract labels from the first channel only (assuming labels are the same across channels)
                before_label = getattr(window_data[0], "before_label", None)
                after_label = getattr(window_data[0], "after_label", None)

                # Validate labels
                if (before_label is None or after_label is None or 
                    not (0 <= before_label <= 3) or not (0 <= after_label <= 3)):
                    continue  # Skip invalid windows
                
                # Step 6: Extract multichannel fractal features and signals
                # fractal_features = []
                # signals = []
                # for window in window_data:
                #     # Extract raw EEG signals
                #     if hasattr(window, "raw_window_signal") and window.raw_window_signal is not None:
                #         signals.append(torch.tensor(window.raw_window_signal.flatten(), dtype=torch.float32))
                #     else:
                #         signals.append(torch.zeros(1))  # Placeholder if missing

                #     # Extract fractal features
                #     channel_features = []
                #     # if hasattr(window, "hq") and window.Dq is not None:
                #     #     channel_features.extend(window.Dq.flatten())
                #     if hasattr(window, "Dq") and window.Dq is not None:
                #         channel_features.extend(window.Dq.flatten())

                #     fractal_features.append(torch.tensor(channel_features, dtype=torch.float32))
                #     # if torch.isnan(fractal_features[-1]).any():
                #     #     print("here is a nan in {}".format(participant_idx))
                #     #     continue

                # # Ensure all feature lengths are the same
                # if self.fractal_feature_length is None:
                #     self.fractal_feature_length = len(fractal_features[0])

                # # Convert to torch tensors and store
                # self.features.append(torch.stack(fractal_features))  # Shape: (num_channels, feature_length)
                # self.signals.append(torch.stack(signals))  # Shape: (num_channels, signal_length)
                # self.labels.append((int(before_label), int(after_label)))
                # Step 6: Extract multichannel fractal features and signals
                fractal_features = []
                signals = []
                for window in window_data:
                    # Extract raw EEG signals
                    if hasattr(window, "raw_window_signal") and window.raw_window_signal is not None:
                        signals.append(torch.tensor(window.raw_window_signal.flatten(), dtype=torch.float32))
                    else:
                        signals.append(torch.zeros(1))  # Placeholder if missing

                    # Extract fractal features as a 41x41 matrix
                    if hasattr(window, "Dq") and window.Dq is not None and hasattr(window, "hq") and window.hq is not None:
                        Dq = torch.tensor(window.Dq, dtype=torch.float32)  # Shape: (41,)
                        hq = torch.tensor(window.hq, dtype=torch.float32)  # Shape: (41,)
                        
                        # Expand both into 41x41 matrices using outer product
                        fractal_feature_matrix = torch.outer(Dq, hq)  # Shape: (41, 41)
                    else:
                        fractal_feature_matrix = torch.zeros((41, 41), dtype=torch.float32)  # Placeholder if missing
                    
                    fractal_features.append(fractal_feature_matrix)  # Store as 2D matrix

                # Ensure all feature lengths are the same
                if self.fractal_feature_length is None:
                    self.fractal_feature_length = fractal_features[0].shape  # Now (41, 41)

                # Convert to torch tensors and store
                self.features.append(torch.stack(fractal_features))  # Shape: (num_channels, 41, 41)
                self.signals.append(torch.stack(signals))  # Shape: (num_channels, signal_length)
                self.labels.append((int(before_label), int(after_label)))


    def _validate_labels(self):
        """Ensure all labels are valid integers 0-3."""
        valid_before = all(0 <= lbl[0] <= 3 for lbl in self.labels)
        valid_after = all(0 <= lbl[1] <= 3 for lbl in self.labels)
        if not (valid_before and valid_after):
            invalid = [
                (i, lbl) for i, lbl in enumerate(self.labels)
                if not (0 <= lbl[0] <= 3 and 0 <= lbl[1] <= 3)
            ]
            raise ValueError(f"Invalid labels found at indices: {invalid[:10]}")

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        """Return a single data sample (x, y)"""
        x = {
            'fractal': self.features[idx],  # Shape: (num_channels, feature_length)
            'signal': self.signals[idx]  # Shape: (num_channels, signal_length)
        }
        y = {
            "before_label": torch.tensor(self.labels[idx][0], dtype=torch.long),
            "after_label": torch.tensor(self.labels[idx][1], dtype=torch.long),
        }
        return x, y

# Example Usage
dataset = MatFileDataset("/Users/athenasaghi/VSProjects/CognitiveFatigueDetection/Prediction/window1000LWT/")
print(f"Loaded {len(dataset)} valid samples")

# Check shapes
x, y = dataset[0]
print(f"Fractal Features Shape: {x['fractal'].shape}")  # Expected: (num_channels, feature_length)
print(f"Signal Shape: {x['signal'].shape}")  # Expected: (num_channels, signal_length)
print(f"Labels: {y}")


Loaded 1354 valid samples
Fractal Features Shape: torch.Size([4, 41, 41])
Signal Shape: torch.Size([4, 1000])
Labels: {'before_label': tensor(0), 'after_label': tensor(0)}


In [77]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

class FractalCNN(nn.Module):
    def __init__(self, num_channels, num_classes=4):
        super(FractalCNN, self).__init__()
        self.conv1 = nn.Conv2d(num_channels, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.5)

        self.fc1 = nn.Linear(128 * 5 * 5, 256)  # Adjusted for 41x41 input
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.pool(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))
        x = x.view(x.size(0), -1)  # Flatten
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

    
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_channels = 4  # Adjust based on your dataset
model = FractalCNN(num_channels=num_channels).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.00001)

num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct, total = 0, 0
    
    for batch in dataloader:
        x, y = batch
        fractal_features = x['fractal'].to(device)  # Shape: (batch_size, num_channels, 41, 41)
        labels = y["before_label"].to(device)  # Labels (before_label classification)
        
        if torch.isinf(fractal_features).any():
            fractal_features[torch.isinf(fractal_features)] = torch.max(fractal_features[~torch.isinf(fractal_features)])
        if torch.isnan(fractal_features).any():
            fractal_features[torch.isnan(fractal_features)] = torch.mean(fractal_features[~torch.isnan(fractal_features)])

        optimizer.zero_grad()
        outputs = model(fractal_features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(dataloader)}, Accuracy: {100 * correct/total:.2f}%")

print("Training complete!")

# -------------------------------
# 4. Evaluate the Model
# -------------------------------
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for batch in dataloader:
        x, y = batch
        fractal_features = x['fractal'].to(device)
        labels = y["before_label"].to(device)
        
        outputs = model(fractal_features)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

print(f"Test Accuracy: {100 * correct/total:.2f}%")

Epoch 1/100, Loss: 1.1392625708912694, Accuracy: 59.16%
Epoch 2/100, Loss: 1.0302386450213055, Accuracy: 61.23%
Epoch 3/100, Loss: 1.0124052901600682, Accuracy: 60.93%
Epoch 4/100, Loss: 1.0098251298416492, Accuracy: 61.37%
Epoch 5/100, Loss: 1.0064689821975177, Accuracy: 60.86%
Epoch 6/100, Loss: 0.9897971693859544, Accuracy: 61.45%
Epoch 7/100, Loss: 0.9713855413503425, Accuracy: 61.74%
Epoch 8/100, Loss: 0.983438792616822, Accuracy: 61.08%
Epoch 9/100, Loss: 0.9684184872826864, Accuracy: 61.08%
Epoch 10/100, Loss: 0.9591253053310306, Accuracy: 62.04%
Epoch 11/100, Loss: 0.9642432207284972, Accuracy: 61.30%
Epoch 12/100, Loss: 0.9583164384198744, Accuracy: 61.60%
Epoch 13/100, Loss: 0.9435403042061384, Accuracy: 61.67%
Epoch 14/100, Loss: 0.9406047449555508, Accuracy: 62.11%
Epoch 15/100, Loss: 0.9273331442544627, Accuracy: 62.26%
Epoch 16/100, Loss: 0.9298028280568678, Accuracy: 61.45%
Epoch 17/100, Loss: 0.9206396604693213, Accuracy: 61.52%
Epoch 18/100, Loss: 0.9246319615563681, A

KeyboardInterrupt: 

In [113]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split

# -------------------------------
# Transformer Model (ViT)
# -------------------------------
class PatchEmbedding(nn.Module):
    """Splits input into patches and embeds them for the Transformer"""
    def __init__(self, img_size=41, patch_size=5, in_channels=4, embed_dim=256):
        super().__init__()
        self.num_patches = (img_size // patch_size) ** 2
        self.proj = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

    def forward(self, x):
        x = self.proj(x).flatten(2).transpose(1, 2)  # (B, embed_dim, num_patches) -> (B, num_patches, embed_dim)
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)  # Add classification token
        x = torch.cat([cls_tokens, x], dim=1)  # (B, num_patches+1, embed_dim)
        x += self.pos_embedding
        return x

class ViTFractal(nn.Module):
    def __init__(self, img_size=41, patch_size=5, in_channels=4, num_classes=4, embed_dim=768, num_heads=12, num_layers=12):
        super().__init__()
        self.patch_embedding = PatchEmbedding(img_size, patch_size, in_channels, embed_dim)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=1024)
        self.transformer = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        x = self.patch_embedding(x)
        x = self.transformer(x)[:, 0]
        x = self.mlp_head(x)
        return x


# -------------------------------
# Load Dataset and Split
# -------------------------------
batch_size = 32
dataset = MatFileDataset("/Users/athenasaghi/VSProjects/CognitiveFatigueDetection/Prediction/window1000LWT/")

train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# -------------------------------
# Train the Model
# -------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_channels = 4
model = ViTFractal(in_channels=num_channels).to(device)

criterion = nn.CrossEntropyLoss()
# optimizer = optim.AdamW(model.parameters(), lr=0.0003)
optimizer  = optim.SGD(model.parameters(), lr=0.0003, momentum=0.9)

num_epochs = 10
best_val_accuracy = 0.0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct, total = 0, 0
    
    for batch in train_loader:
        x, y = batch
        fractal_features = x['fractal'].to(device)  # Shape: (batch_size, num_channels, 41, 41)
        labels = y["before_label"].to(device)  # Labels (before_label classification)
        
        if torch.isinf(fractal_features).any():
            fractal_features[torch.isinf(fractal_features)] = torch.max(fractal_features[~torch.isinf(fractal_features)])
        if torch.isnan(fractal_features).any():
            fractal_features[torch.isnan(fractal_features)] = torch.mean(fractal_features[~torch.isnan(fractal_features)])
        
        optimizer.zero_grad()
        outputs = model(fractal_features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    train_accuracy = 100 * correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Train Accuracy: {train_accuracy:.2f}%")

    # -------------------------------
    # Validation
    # -------------------------------
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for batch in val_loader:
            x, y = batch
            fractal_features = x['fractal'].to(device)
            labels = y["before_label"].to(device)
            
            outputs = model(fractal_features)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    val_accuracy = 100 * correct / total
    print(f"Validation Accuracy: {val_accuracy:.2f}%")

    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), "best_model.pth")

print("Training complete!")

# -------------------------------
# Test the Model
# -------------------------------
model.load_state_dict(torch.load("best_model.pth"))
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for batch in test_loader:
        x, y = batch
        fractal_features = x['fractal'].to(device)
        labels = y["before_label"].to(device)
        
        outputs = model(fractal_features)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

print(f"Test Accuracy: {100 * correct/total:.2f}%")


  incorrect execution, including forward and backward


Epoch 1/10, Loss: 1.1293194472789765, Train Accuracy: 56.49%
Validation Accuracy: 59.61%


KeyboardInterrupt: 

In [None]:
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# Load dataset
dataset = MatFileDataset("/Users/athenasaghi/VSProjects/CognitiveFatigueDetection/Prediction/window1000LWT/")

# Convert dataset to numpy arrays
features = []
labels = []

for i in range(len(dataset)):
    x, y = dataset[i]
    fractal_features = x['fractal'].numpy().flatten()  # Flatten (4, 41, 41) → (4 * 41 * 41,)
    features.append(fractal_features)
    labels.append(y["before_label"].item())

features = np.array(features)  # Shape: (num_samples, feature_dim)
labels = np.array(labels)  # Shape: (num_samples,)

# Split data
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2)
# X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

if np.isnan(X_train).any():
    X_train[np.isnan(X_train)] = np.nanmax(X_train[~np.isnan(X_train)])
if np.isinf(X_train).any():
    X_train[np.isinf(X_train)] = np.nanmax(X_train[~np.isinf(X_train)])

# Initialize and train KNN
knn = KNeighborsClassifier(n_neighbors=12)
knn.fit(X_train, y_train)

# # Validation Accuracy
# if np.isnan(X_val).any():
#     X_val[np.isnan(X_val)] = np.nanmax(X_val[~np.isnan(X_val)])
# if np.isinf(X_val).any():
#     X_val[np.isinf(X_val)] = np.nanmax(X_val[~np.isinf(X_val)])
# y_val_pred = knn.predict(X_val)
# val_accuracy = accuracy_score(y_val, y_val_pred)
# print(f"Validation Accuracy: {val_accuracy:.2f}")

# Test Accuracy
if np.isnan(X_test).any():
    X_test[np.isnan(X_test)] = np.nanmax(X_test[~np.isnan(X_test)])
if np.isinf(X_test).any():
    X_test[np.isinf(X_test)] = np.nanmax(X_test[~np.isinf(X_test)])
y_test_pred = knn.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
print(f"Test Accuracy: {test_accuracy:.2f}")


Test Accuracy: 0.58


In [114]:
dataset = MatFileDataset("/Users/athenasaghi/VSProjects/CognitiveFatigueDetection/Prediction/window50/")


In [116]:
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Load dataset

# Convert dataset to numpy arrays
features = []
labels = []

for i in range(len(dataset)):
    x, y = dataset[i]
    fractal_features = x['fractal'].numpy().flatten()  # Flatten (4, 41, 41) → (4 * 41 * 41,)
    features.append(fractal_features)
    labels.append(y["before_label"].item())

features = np.array(features)  # Shape: (num_samples, feature_dim)
labels = np.array(labels)  # Shape: (num_samples,)

# Split data
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

if np.isnan(X_train).any():
    X_train[np.isnan(X_train)] = np.nanmax(X_train[~np.isnan(X_train)])
if np.isinf(X_train).any():
    X_train[np.isinf(X_train)] = np.nanmax(X_train[~np.isinf(X_train)])

# Initialize and train Random Forest
rf = RandomForestClassifier(n_estimators=20, max_depth=20, random_state=42)
rf.fit(X_train, y_train)

if np.isnan(X_test).any():
    X_test[np.isnan(X_test)] = np.nanmax(X_test[~np.isnan(X_test)])
if np.isinf(X_test).any():
    X_test[np.isinf(X_test)] = np.nanmax(X_test[~np.isinf(X_test)])

# Test Accuracy
y_test_pred = rf.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
print(f"Test Accuracy: {test_accuracy:.2f}")


Test Accuracy: 0.61


In [None]:
# import os
# import scipy.io as sio
# import torch
# from torch.utils.data import Dataset

# class MatFileDataset(Dataset):
#     def __init__(self, directory):
#         self.features = []  # Will store multi-channel fractal features (num_channels, feature_length)
#         self.signals = []   # Will store multi-channel EEG signals (num_channels, signal_length)
#         self.labels = []    # Labels for each window (before_label, after_label)
#         self.num_channels = 4  # Number of channels (4 separate .mat files)
#         self.fractal_feature_length = None  # To determine feature length from first valid data

#         self._load_data(directory)
#         self._validate_labels()

#     def _load_data(self, directory):
#         """Load all four channels separately and align participant data correctly."""
#         # Step 1: Find all .mat files (assuming four files exist, one per channel)
#         mat_files = sorted([os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".mat")])
#         if len(mat_files) != self.num_channels:
#             raise ValueError(f"Expected {self.num_channels} .mat files, but found {len(mat_files)}")

#         # Step 2: Load data from all channels
#         all_channels_data = [sio.loadmat(f, struct_as_record=False, squeeze_me=True) for f in mat_files]

#         # Step 3: Check if all files contain 'all_window_features'
#         for i, mat_data in enumerate(all_channels_data):
#             if "all_window_features" not in mat_data:
#                 raise ValueError(f"Missing 'all_window_features' in file {mat_files[i]}")

#         # Step 4: Process participant data by aligning across channels
#         num_participants = len(all_channels_data[0]["all_window_features"])
        
#         for participant_idx in range(num_participants):
#             # Extract participant data from each channel
#             participant_windows = [
#                 mat_data["all_window_features"][participant_idx] for mat_data in all_channels_data
#             ]

#             # Skip if any channel is missing participant data
#             if any(p is None for p in participant_windows):
#                 continue

#             # Step 5: Ensure that windows are aligned across channels
#             num_windows = len(participant_windows[0])  # Get window count from first channel
#             for win_idx in range(num_windows):
#                 # Extract corresponding window from each channel
#                 window_data = [participant_windows[ch][win_idx] for ch in range(self.num_channels)]

#                 # Extract labels from the first channel only (assuming labels are the same across channels)
#                 before_label = getattr(window_data[0], "before_label", None)
#                 after_label = getattr(window_data[0], "after_label", None)

#                 # Validate labels
#                 if (before_label is None or after_label is None or 
#                     not (0 <= before_label <= 3) or not (0 <= after_label <= 3)):
#                     continue  # Skip invalid windows
                
#                 # **Map labels:**
#                 before_label = 0 if before_label == 0 else 1  # 0 -> 0, (1,2,3) -> 1
#                 after_label = 0 if after_label == 0 else 1  # 0 -> 0, (1,2,3) -> 1

#                 # Step 6: Extract multichannel fractal features and signals
#                 fractal_features = []
#                 signals = []
#                 for window in window_data:
#                     # Extract raw EEG signals
#                     if hasattr(window, "raw_window_signal") and window.raw_window_signal is not None:
#                         signals.append(torch.tensor(window.raw_window_signal.flatten(), dtype=torch.float32))
#                     else:
#                         signals.append(torch.zeros(1))  # Placeholder if missing

#                     # Extract fractal features
#                     channel_features = []
#                     # if hasattr(window, "hq") and window.Dq is not None:
#                     #     channel_features.extend(window.Dq.flatten())
#                     if hasattr(window, "Dq") and window.hq is not None:
#                         channel_features.extend(window.hq.flatten())

#                     fractal_features.append(torch.tensor(channel_features, dtype=torch.float32))

#                 # Ensure all feature lengths are the same
#                 if self.fractal_feature_length is None:
#                     self.fractal_feature_length = len(fractal_features[0])

#                 # Convert to torch tensors and store
#                 self.features.append(torch.stack(fractal_features))  # Shape: (num_channels, feature_length)
#                 self.signals.append(torch.stack(signals))  # Shape: (num_channels, signal_length)
#                 self.labels.append((int(before_label), int(after_label)))

#     def _validate_labels(self):
#         """Ensure all labels are valid integers 0-1 after mapping."""
#         valid_before = all(lbl[0] in {0, 1} for lbl in self.labels)
#         valid_after = all(lbl[1] in {0, 1} for lbl in self.labels)
#         if not (valid_before and valid_after):
#             invalid = [
#                 (i, lbl) for i, lbl in enumerate(self.labels)
#                 if lbl[0] not in {0, 1} or lbl[1] not in {0, 1}
#             ]
#             raise ValueError(f"Invalid labels found at indices: {invalid[:10]}")

#     def __len__(self):
#         return len(self.features)

#     def __getitem__(self, idx):
#         """Return a single data sample (x, y)"""
#         x = {
#             'fractal': self.features[idx],  # Shape: (num_channels, feature_length)
#             'signal': self.signals[idx]  # Shape: (num_channels, signal_length)
#         }
#         y = {
#             "before_label": torch.tensor(self.labels[idx][0], dtype=torch.long),
#             "after_label": torch.tensor(self.labels[idx][1], dtype=torch.long),
#         }
#         return x, y

# # Example Usage
# dataset = MatFileDataset("/Users/athenasaghi/VSProjects/CognitiveFatigueDetection/Prediction/window200/")
# print(f"Loaded {len(dataset)} valid samples")

# # Check shapes
# x, y = dataset[0]
# print(f"Fractal Features Shape: {x['fractal'].shape}")  # Expected: (num_channels, feature_length)
# print(f"Signal Shape: {x['signal'].shape}")  # Expected: (num_channels, signal_length)
# print(f"Labels: {y}")  # Expected: {0,1}


Loaded 6795 valid samples
Fractal Features Shape: torch.Size([4, 41])
Signal Shape: torch.Size([4, 200])
Labels: {'before_label': tensor(0), 'after_label': tensor(0)}


In [35]:
import torch
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

def preprocess_data(fractal, dataset_name="Dataset"):
    fractal = fractal.numpy().reshape(fractal.shape[0], -1)  # Convert to NumPy and flatten
    
    # Count NaNs and Infs
    num_nans = np.isnan(fractal).sum()
    num_infs = np.isinf(fractal).sum()
    print(f"{dataset_name}: NaNs={num_nans}, Infs={num_infs}")

    # Remove rows with NaNs or Infs
    clean_mask = ~np.isnan(fractal).any(axis=1) & ~np.isinf(fractal).any(axis=1)
    fractal_cleaned = fractal[clean_mask]

    return fractal_cleaned, clean_mask  # Return mask to apply on labels

# Load and preprocess training data

train_loader = DataLoader(dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(dataset, batch_size=1, shuffle=False)

x_train, y_train = zip(*[(batch[0]['fractal'], batch[1]['before_label']) for batch in train_loader])
fractal_train, mask_train = preprocess_data(torch.cat(x_train, dim=0), "Training Data")
labels_train = torch.cat(y_train, dim=0).numpy().reshape(-1)[mask_train]  # Apply mask

# Train KNN classifier
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(fractal_train, labels_train)

# Evaluate on training data
train_accuracy = accuracy_score(labels_train, knn.predict(fractal_train))
print(f"Training Accuracy: {train_accuracy:.2%}")

# Load and preprocess test data
x_test, y_test = zip(*[(batch[0]['fractal'], batch[1]['before_label']) for batch in test_loader])
fractal_test, mask_test = preprocess_data(torch.cat(x_test, dim=0), "Test Data")
labels_test = torch.cat(y_test, dim=0).numpy().reshape(-1)[mask_test]  # Apply mask

# Evaluate on test data
test_accuracy = accuracy_score(labels_test, knn.predict(fractal_test))
print(f"Test Accuracy: {test_accuracy:.2%}")


Training Data: NaNs=57, Infs=9972
Training Accuracy: 58.70%
Test Data: NaNs=57, Infs=9972
Test Accuracy: 58.70%


In [70]:
import os
import scipy.io as sio
import torch
import numpy as np
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import RobustScaler
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, StackingClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.decomposition import PCA
from imblearn.over_sampling import SMOTE

class MatFileDataset(Dataset):
    def __init__(self, directory):
        self.features = []
        self.labels = []
        self.num_channels = 4  

        self._load_data(directory)
        self._validate_labels()

    def _load_data(self, directory):
        mat_files = sorted([os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".mat")])
        if len(mat_files) != self.num_channels:
            raise ValueError(f"Expected {self.num_channels} .mat files, but found {len(mat_files)}")

        all_channels_data = [sio.loadmat(f, struct_as_record=False, squeeze_me=True) for f in mat_files]
        num_participants = len(all_channels_data[0]["all_window_features"])

        for participant_idx in range(num_participants):
            participant_windows = [mat_data["all_window_features"][participant_idx] for mat_data in all_channels_data]
            if any(p is None for p in participant_windows):
                continue

            num_windows = len(participant_windows[0])
            for win_idx in range(num_windows):
                window_data = [participant_windows[ch][win_idx] for ch in range(self.num_channels)]

                before_label = getattr(window_data[0], "before_label", None)
                after_label = getattr(window_data[0], "after_label", None)
                if (before_label is None or after_label is None or not (0 <= before_label <= 3) or not (0 <= after_label <= 3)):
                    continue  

                fractal_features = []
                for ch, window in enumerate(window_data):
                    channel_features = []
                    if hasattr(window, "Dq") and window.hq is not None:
                        channel_features.extend(window.hq.flatten())

                    channel_features = np.array(channel_features, dtype=np.float32)

                    # **Fix NaN & Inf by replacing with column mean**
                    nan_mask = np.isnan(channel_features) | np.isinf(channel_features)
                    mean_value = np.nanmean(channel_features) if np.any(~nan_mask) else 0
                    channel_features[nan_mask] = mean_value

                    fractal_features.append(channel_features)

                # Concatenate features from all channels
                combined_features = np.concatenate(fractal_features)
                
                # **Final NaN/Inf Check**
                if np.isnan(combined_features).any() or np.isinf(combined_features).any():
                    print(f"⚠️ Warning: NaN/Inf detected at participant {participant_idx}, window {win_idx}, replacing with zero")
                    combined_features[np.isnan(combined_features)] = 0
                    combined_features[np.isinf(combined_features)] = 0
                
                self.features.append(combined_features)
                self.labels.append(int(after_label))

    def _validate_labels(self):
        valid_labels = all(0 <= lbl <= 3 for lbl in self.labels)
        if not valid_labels:
            raise ValueError("Invalid labels detected.")

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# Load dataset
dataset = MatFileDataset("/Users/athenasaghi/VSProjects/CognitiveFatigueDetection/Prediction/window150/")
print(f"Loaded {len(dataset)} valid samples")

# Extract features and labels
features = np.array(dataset.features)
labels = np.array(dataset.labels)

# **Final Debugging: Check for NaN/Inf Before Scaling**
if np.isnan(features).any() or np.isinf(features).any():
    print("⚠️ NaN or Inf detected in features after extraction. Replacing with column mean.")
    col_means = np.nanmean(features, axis=0)
    nan_indices = np.where(np.isnan(features))
    features[nan_indices] = np.take(col_means, nan_indices[1])
    features[np.isinf(features)] = np.nanmax(features[np.isfinite(features)])

# **Split dataset**
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, stratify=labels, random_state=42)

# **Scale features using RobustScaler**
# scaler = RobustScaler()
# X_train = scaler.fit_transform(X_train)
# X_test = scaler.transform(X_test)

# **Apply SMOTE to fix class imbalance**
smote = SMOTE(random_state=42)
X_train, y_train = smote.fit_resample(X_train, y_train)

# **Feature Selection using RandomForest**
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)

importances = rf.feature_importances_
sorted_idx = np.argsort(importances)[::-1]
num_top_features = int(0.9 * len(importances))  # Keep 90% of important features
selected_features = sorted_idx[:num_top_features]

X_train_selected = X_train[:, selected_features]
X_test_selected = X_test[:, selected_features]

# **Dimensionality Reduction using PCA**
pca = PCA(n_components=0.99)  # Keep 99% variance
X_train_pca = pca.fit_transform(X_train_selected)
X_test_pca = pca.transform(X_test_selected)

# **Train Optimized Classifier (Stacking XGBoost + GradientBoosting)**
base_learners = [
    ('xgb', XGBClassifier(n_estimators=300, learning_rate=0.05, max_depth=8, random_state=42)),
    ('gb', GradientBoostingClassifier(n_estimators=300, learning_rate=0.05, max_depth=8, random_state=42))
]
stacked_model = StackingClassifier(estimators=base_learners, final_estimator=LogisticRegression())
stacked_model.fit(X_train_pca, y_train)

# **Predict**
y_pred = stacked_model.predict(X_test_pca)

# **Evaluate performance**
accuracy = accuracy_score(y_test, y_pred)
print(f"\nStacked Model Classification Accuracy: {accuracy:.4f}")

# **Compute per-class accuracy**
conf_matrix = confusion_matrix(y_test, y_pred)
class_accuracies = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

print("\nPer-Class Accuracy:")
for label, acc in enumerate(class_accuracies):
    print(f"Label {label}: {acc:.4f}")

# **Print detailed classification report**
print("\nClassification Report:")
print(classification_report(y_test, y_pred))


Loaded 9812 valid samples

Stacked Model Classification Accuracy: 0.3785

Per-Class Accuracy:
Label 0: 0.4637
Label 1: 0.2435
Label 2: 0.2625
Label 3: 0.3028

Classification Report:
              precision    recall  f1-score   support

           0       0.68      0.46      0.55      1117
           1       0.17      0.24      0.20       308
           2       0.20      0.26      0.23       320
           3       0.19      0.30      0.23       218

    accuracy                           0.38      1963
   macro avg       0.31      0.32      0.30      1963
weighted avg       0.47      0.38      0.41      1963

