In [1]:
!pip install pyedflib
!pip install torchvision
import pyedflib
import numpy as np
import cv2
import dlib
import os
import pickle
import torch
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm


Collecting pyedflib
  Downloading pyedflib-0.1.42-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Downloading pyedflib-0.1.42-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.8 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━[0m [32m2.5/2.8 MB[0m [31m64.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyedflib
Successfully installed pyedflib-0.1.42


In [None]:
def load_psg(edf_path, channels):
    # Open the EDF file
    edf_reader = pyedflib.EdfReader(edf_path)
    # Get the number of signals in the file
    num_signals = edf_reader.signals_in_file
    # Retrieve signal labels and data
    signal_labels = edf_reader.getSignalLabels()
    signal_data = {}
    for signal, label in enumerate(signal_labels):
        data = edf_reader.readSignal(signal)
        signal_data[label] = data
    edf_reader.close()
    psg = []
    for c in channels:
        psg.append(signal_data[c])
    psg = np.stack(psg, axis=1)
    return psg

In [None]:
def extract_face_frames(video_path, downsample_fps):
    detector = dlib.get_frontal_face_detector()
    vid = cv2.VideoCapture(video_path)
    fps = vid.get(cv2.CAP_PROP_FPS)
    step = fps / downsample_fps
    success = True
    face_frames = []
    frame_times = []
    frame_idx = 0
    while success:
        success, image = vid.read() # Read frame
        if success:
            if frame_idx % step != 0:
                frame_idx += 1
                continue
            t = frame_idx / fps
            frame_idx += 1
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            faces = detector(image, 1)
            if len(faces) == 0:
            # no face detected in this frame -> just skip it
                continue
            face = faces[0]
            x1 = face.left()
            y1 = face.top()
            x2 = face.right()
            y2 = face.bottom()

            # clip to bounds
            h, w = image.shape
            x1 = max(0, x1)
            y1 = max(0, y1)
            x2 = min(w, x2)
            y2 = min(h, y2)

            face_crop = image[y1:y2, x1:x2]

            # resize to 75x75
            face_resized = cv2.resize(face_crop, (75, 75))

            # normalize to [0,1]
            face_norm = face_resized.astype(np.float32) / 255.0

            face_frames.append(face_norm)  # (75,75)
            frame_times.append(t)

    vid.release()
    return face_frames, frame_times

In [None]:
def build_samples(psg, face_frames, frame_times, label):
    samples = []
    window_length = 1
    psg_fr = 512
    frame_times = np.array(frame_times)
    for index, time in enumerate(frame_times):
        if time >= 1:
            end_index = int(np.round(time * psg_fr))
            start_index = int(np.round((time - window_length) * psg_fr))
            psg_window = psg[start_index:end_index,:]
            start_time = time - window_length
            idxs = np.where((frame_times >= start_time) & (frame_times <= time))[0]
            avg_face = np.stack([face_frames[idx] for idx in idxs], axis=0).mean(axis=0)
            samples.append([avg_face, psg_window, label])
    return samples


In [None]:
def KSS_class(KSS):
    return 1 if KSS >= 6 else 0


In [None]:
psg_dir = r"C:\Users\gojas\Downloads\DROZY\DROZY\psg"
video_dir = r"C:\Users\gojas\Downloads\DROZY\DROZY\videos_i8"
channels = ['Fz','Cz','C3','C4','Pz','EOG-V','EOG-H','EMG','ECG']
KSS_values = [3, 6, 7,
              3, 7, 6,
              2, 3, 4,
              4, 8, 9,
              3, 7, 8,
              2, 3, 7,
              0, 4, 9,
              2, 6, 8,
              2, 6, 8,
              3, 6, 7,
              4, 7, 7,
              2, 5, 6,
              6, 3, 7,
              5, 7, 8]

subjects = np.arange(1,15)
subject_samples = {subj: [] for subj in subjects}
tests = [1, 2, 3]

sessions = []
for subject in subjects:
    for test in tests:
        sessions.append((subject, test, f"{subject}-{test}"))

session_labels = {}
for session, KSS in zip(sessions, KSS_values):
    session_labels[session[2]] = KSS_class(KSS)
for subject,test,session in sessions:
    edf_path = os.path.join(psg_dir,  session + ".edf")
    vid_path = os.path.join(video_dir, session + ".mp4")

    if not os.path.exists(edf_path) or not os.path.exists(vid_path):
        print(f"Skipping {session}: missing EDF or video")
        continue
    if session not in session_labels:
        print(f"Skipping {session}: no KSS label")
        continue

    label = session_labels[session]
    print(f"Processing session {session} (subject {subject}, test {test}) ...")

    psg = load_psg(edf_path, channels)
    face_frames, frame_times = extract_face_frames(vid_path, 1)
    if len(face_frames) == 0:
        print(f"  No faces in {session}, skipping")
        continue
    session_samples = build_samples(psg, face_frames, frame_times, label)

    print(f"{session}: {len(session_samples)} samples")
    subject_samples[subject].extend(session_samples)

for subj in subjects:
    print(f"Subject {subj}: total {len(subject_samples[subj])} samples")



Skipping 1-1: missing EDF or video
Skipping 1-2: missing EDF or video
Skipping 1-3: missing EDF or video
Skipping 2-1: missing EDF or video
Skipping 2-2: missing EDF or video
Skipping 2-3: missing EDF or video
Skipping 3-1: missing EDF or video
Skipping 3-2: missing EDF or video
Skipping 3-3: missing EDF or video
Skipping 4-1: missing EDF or video
Skipping 4-2: missing EDF or video
Skipping 4-3: missing EDF or video
Skipping 5-1: missing EDF or video
Skipping 5-2: missing EDF or video
Skipping 5-3: missing EDF or video
Skipping 6-1: missing EDF or video
Skipping 6-2: missing EDF or video
Skipping 6-3: missing EDF or video
Skipping 7-1: missing EDF or video
Skipping 7-2: missing EDF or video
Skipping 7-3: missing EDF or video
Skipping 8-1: missing EDF or video
Skipping 8-2: missing EDF or video
Skipping 8-3: missing EDF or video
Skipping 9-1: missing EDF or video
Skipping 9-2: missing EDF or video
Skipping 9-3: missing EDF or video
Skipping 10-1: missing EDF or video
Skipping 10-2: miss

In [None]:
expected_len = 512
num_removed = 0

for subj in list(subject_samples.keys()):
    clean_list = []
    for face, phys, label in subject_samples[subj]:
        if phys.shape[0] == expected_len:
            clean_list.append([face, phys, label])
        else:
            num_removed += 1
    subject_samples[subj] = clean_list

In [None]:
save_path = "subject_samples.pkl"  # you can put a full path if you want

with open(save_path, "wb") as f:
    pickle.dump(subject_samples, f)

print("Saved subject_samples to", save_path)

Saved subject_samples to subject_samples.pkl


In [None]:
subject_ids = list(subject_samples.keys())   # [1,2,...,14]

np.random.seed(42)

np.random.shuffle(subject_ids)

trainval_subjects = subject_ids[:13]   # first 13
test_subjects      = subject_ids[13]    # last one

train_subjects = trainval_subjects[:9]   # first 9
val_subjects   = trainval_subjects[9:]   # remaining 4



In [None]:
def gather_samples_for_subjects(subject_ids, subject_samples):
    all_samples = []
    for s in subject_ids:
        all_samples.extend(subject_samples[s])
    return all_samples

train_samples = gather_samples_for_subjects(train_subjects, subject_samples)
val_samples   = gather_samples_for_subjects(val_subjects, subject_samples)
test_samples  = subject_samples[test_subjects]


In [None]:
class DrowsinessDataset(Dataset):
    def __init__(self, samples):
        self.samples = samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        face, phys, label = self.samples[idx]

        face_tensor = torch.from_numpy(face).float().unsqueeze(0)

        phys_tensor = torch.from_numpy(phys).float()

        label_tensor = torch.tensor(label, dtype=torch.long)

        return face_tensor, phys_tensor, label_tensor


In [None]:
batch_size = 32

train_dataset = DrowsinessDataset(train_samples)
val_dataset   = DrowsinessDataset(val_samples)
test_dataset  = DrowsinessDataset(test_samples)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)


ValueError: num_samples should be a positive integer value, but got num_samples=0

In [None]:

class FaceResNetEncoder(nn.Module):
    def __init__(self, feature_dim=512):
        super().__init__()
        # base ResNet18
        self.backbone = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        in_feats = self.backbone.fc.in_features
        # remove final classifier
        self.backbone.fc = nn.Identity()
        # projection head
        self.project = nn.Linear(in_feats, feature_dim)

    def forward(self, x):
        x = x.repeat(1, 3, 1, 1)
        feats = self.backbone(x)         # (B, in_feats)
        feats = self.project(feats)      # (B, feature_dim)
        return feats


In [None]:
class PhysLSTMEncoder(nn.Module):
    def __init__(self, input_size=9, hidden_size=128, num_layers=1, out_dim=512):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        self.project = nn.Linear(hidden_size, out_dim)

    def forward(self, x):
        out, (h_n, c_n) = self.lstm(x)
        last_hidden = h_n[-1]
        feats = self.project(last_hidden)
        return feats


In [None]:
seed = 1000
if torch.cuda.is_available() and try_cuda:
    cuda = True
    torch.cuda.manual_seed(seed)
else:
    cuda = False
    torch.manual_seed(seed)
device = torch.device("cuda" if cuda else "cpu")
face_encoder = FaceResNetEncoder(feature_dim=512)
phys_encoder = PhysLSTMEncoder(input_size=9, hidden_size=128, out_dim=512)
face_encoder.to(device)
phys_encoder.to(device)

In [None]:
class FeatureCoupledNet(nn.Module):
    def __init__(self, num_channels=9, num_classes=2,
                 face_feat_dim=512, phys_out_dim=512, dropout=0.4):
        super().__init__()
        self.face_encoder = FaceResNetEncoder(feature_dim=face_feat_dim)
        self.phys_encoder = PhysLSTMEncoder(input_size=num_channels,
                                            hidden_size=256,
                                            out_dim=phys_out_dim)

        self.fc1 = nn.Linear(face_feat_dim, 512)
        self.dropout = nn.Dropout(dropout)
        self.fc_out = nn.Linear(512, num_classes)

    def minmax_norm(self, x, eps=1e-6):
        x_min = x.min(dim=1, keepdim=True)[0]
        x_max = x.max(dim=1, keepdim=True)[0]
        denom = (x_max - x_min).clamp(min=eps)
        return (x - x_min) / denom

    def forward(self, face, phys):
        f_img  = self.face_encoder(face)
        f_phys = self.phys_encoder(phys)

        f_img_norm  = self.minmax_norm(f_img)
        f_phys_norm = self.minmax_norm(f_phys)

        coupled = f_img_norm * f_phys_norm

        h = F.relu(self.fc1(coupled))
        h=self.dropout(h)
        logits = self.fc_out(h)
        return logits


In [None]:
model = FeatureCoupledNet(num_channels=9, num_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3,weight_decay=1e-4)


In [None]:
def train_one_epoch(model, loader, optimizer, criterion, device, epoch=None):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_count = 0
    loop = tqdm(loader, desc=f"Train Epoch {epoch}", leave=False)
    for faces, phys, labels in loop:
        faces = faces.to(device)
        phys  = phys.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        logits = model(faces, phys)           # (B, num_classes)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        preds = logits.argmax(dim=1)
        total_correct += (preds == labels).sum().item()
        total_count += labels.size(0)
        loop.set_postfix(loss=loss.item())

    return total_loss / total_count, total_correct / total_count


def eval_one_epoch(model, loader, criterion, device, epoch=None, phase="Val"):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0
    loop = tqdm(loader, desc=f"{phase} Epoch {epoch}", leave=False)
    with torch.no_grad():
        for faces, phys, labels in loop:
            faces = faces.to(device)
            phys  = phys.to(device)
            labels = labels.to(device)

            logits = model(faces, phys)
            loss = criterion(logits, labels)

            total_loss += loss.item() * labels.size(0)
            preds = logits.argmax(dim=1)
            total_correct += (preds == labels).sum().item()
            total_count += labels.size(0)
            loop.set_postfix(loss=loss.item())

    return total_loss / total_count, total_correct / total_count


In [None]:
num_epochs = 10  # or more

for epoch in range(1, num_epochs + 1):
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device, epoch)
    val_loss, val_acc     = eval_one_epoch(model, val_loader,   criterion, device, epoch, phase="Val")

    print(f"Epoch {epoch:02d} | "
          f"train_loss={train_loss:.4f}, train_acc={train_acc:.3f} | "
          f"val_loss={val_loss:.4f}, val_acc={val_acc:.3f}")


In [None]:
test_loss, test_acc = eval_one_epoch(model, test_loader, criterion, device)
print(f"Test | loss={test_loss:.4f}, acc={test_acc:.3f}")
