In [9]:
import pandas as pd, numpy as np, torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader


class BlinkSeqDataset(Dataset):
    """
    eye_seq : (seq_len, 1, 24, 12)  float32  [0‑1]  (pinned)
    num_seq : (seq_len, 7)          float32  (z‑scored, pinned)
    label   : int64 0/1
    """
    def __init__(self, csv_path, seq_len=30, train=True,
                 split_ratio=0.8, numeric_stats=None):
        df = pd.read_csv(csv_path).dropna().reset_index(drop=True)

        # ---- contiguous train/test split -----------------------------------
        split = int(len(df) * split_ratio)
        df = df.iloc[:split] if train else df.iloc[split:]

        px_cols  = [c for c in df.columns if c.startswith('px_')]
        num_cols = ['ratio_left','ratio_right','ratio_avg',
                    'v_left','v_right','h_left','h_right']

        # ---- numeric features : z‑score ------------------------------------
        X_num = df[num_cols].values.astype(np.float32)
        if numeric_stats is None:
            mean, std = X_num.mean(0), X_num.std(0) + 1e-6
        else:
            mean, std = numeric_stats
        X_num = (X_num - mean) / std

        # ---- pixel patch : scale 0‑1 ---------------------------------------
        X_px = (df[px_cols].values.astype(np.float32) / 255.0) \
               .reshape(-1, 1, 24, 12)

        # ---- convert ONCE to pinned tensors --------------------------------
        self.X_num = torch.from_numpy(X_num)   # stay regular CPU tensors
        self.X_px  = torch.from_numpy(X_px )
        self.labels = torch.from_numpy(df['manual_blink'].values.astype(np.int64))
        self.seq_len = seq_len
        self.numeric_stats = (mean, std)

    def __len__(self):
        return len(self.labels) - self.seq_len + 1

    def __getitem__(self, idx):
        sl = slice(idx, idx + self.seq_len)
        # NO conversion inside __getitem__
        return (self.X_px [sl],           # (seq,1,24,12)
                self.X_num[sl],           # (seq,7)
                self.labels[idx+self.seq_len-1])


In [10]:
train_dl = DataLoader(train_ds, batch_size=256,
                      shuffle=True,  num_workers=0, pin_memory=True)
test_dl  = DataLoader(test_ds,  batch_size=256,
                      shuffle=False, num_workers=0, pin_memory=True)


In [11]:
import torch
import torch.nn as nn

class BlinkDetector(nn.Module):
    def __init__(self, num_features=7, img_height=24, img_width=12, 
                 conv_channels=(16, 32), lstm_hidden=64, lstm_layers=1, bidirectional=True):
        super(BlinkDetector, self).__init__()
        # CNN feature extractor for eye patch
        self.conv1 = nn.Conv2d(1, conv_channels[0], kernel_size=3, padding=1)  # conv layer 1
        self.conv2 = nn.Conv2d(conv_channels[0], conv_channels[1], kernel_size=3, padding=1)  # conv layer 2
        self.relu  = nn.ReLU(inplace=True)
        self.pool  = nn.MaxPool2d(2, 2)  # 2x2 pooling reduces HxW by factor of 2

        # Compute flattened size after two conv+pool layers
        conv_out_h = img_height // 4   # 24 -> 6 after two pools
        conv_out_w = img_width  // 4   # 12 -> 3 after two pools
        conv_flat_dim = conv_out_h * conv_out_w * conv_channels[1]
        # Fully-connected layer to embed CNN output to a fixed size (e.g. 64)
        self.img_fc = nn.Linear(conv_flat_dim, 64)

        # Fully-connected layer to embed numeric features to a similar size (e.g. 16)
        self.num_fc = nn.Linear(num_features, 16)

        # LSTM for sequence modeling (input size = 64+16, hidden size = lstm_hidden)
        lstm_input_dim = 64 + 16
        self.lstm = nn.LSTM(lstm_input_dim, lstm_hidden, num_layers=lstm_layers, 
                             batch_first=True, bidirectional=bidirectional)
        # Final classifier layer
        self.bidirectional = bidirectional
        output_dim = lstm_hidden * (2 if bidirectional else 1)
        self.fc = nn.Linear(output_dim, 1)  # single output logit

    def forward(self, eye_patches, features):
        """
        eye_patches: Tensor of shape (batch, seq_len, 1, 24, 12)
        features:    Tensor of shape (batch, seq_len, num_features)
        """
        batch_size, seq_len, _, H, W = eye_patches.shape

        # CNN forward pass for all patches in the sequence
        x = eye_patches.view(batch_size * seq_len, 1, H, W)      # merge batch and sequence for CNN
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)                                        # shape: [batch*seq, conv_channels[1], H/4, W/4]
        x = x.view(x.size(0), -1)                               # flatten spatial dims
        x = self.relu(self.img_fc(x))                           # image patch embedded to 64-dim

        # Numeric feature embedding
        f = features.view(batch_size * seq_len, -1)             # flatten batch and seq
        f = self.relu(self.num_fc(f))                           # embed numeric features to 16-dim

        # Combine image and numeric features
        combined = torch.cat([x, f], dim=1)                     # shape: [batch*seq, 64+16]
        combined = combined.view(batch_size, seq_len, -1)       # reshape to [batch, seq_len, feature_dim]

        # LSTM over time
        lstm_out, (h_n, c_n) = self.lstm(combined)              # h_n shape: (num_layers*direction, batch, hidden)
        if self.bidirectional:
            # Concatenate final forward and backward hidden states
            # For 1-layer LSTM: h_n[0] = last hidden (forward), h_n[1] = last hidden (backward)
            forward_h = h_n[-2]    # shape [batch, hidden]
            backward_h = h_n[-1]   # shape [batch, hidden]
            seq_repr = torch.cat([forward_h, backward_h], dim=1)  # shape [batch, 2*hidden]
        else:
            seq_repr = h_n[-1]    # shape [batch, hidden]

        # Final output layer
        logit = self.fc(seq_repr)   # shape [batch, 1]
        return logit


In [12]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch, torch.cuda.amp as amp

torch.backends.cudnn.benchmark = True  # optimise convs

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("device:", device)

model = BlinkDetector(num_features=7).to(device)

# ---- imbalance weighting ---------------------------------------------
blink_frac = train_ds.labels.float().mean().item()
pos_weight = torch.tensor([(1-blink_frac)/blink_frac], device=device)
criterion  = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight)

optimizer  = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler  = torch.optim.lr_scheduler.ReduceLROnPlateau(
                optimizer, factor=0.5, patience=3)

scaler = amp.GradScaler('cuda')           # mixed precision

def run_epoch(loader, train=True):
    model.train() if train else model.eval()
    loss_sum, all_pred, all_true = 0.0, [], []

    for eye, num, lbl in loader:
        eye = eye.to(device, non_blocking=True).float()
        num = num.to(device, non_blocking=True).float()
        lbl = lbl.to(device, non_blocking=True).float()

        with amp.autocast():
            logits = model(eye, num).squeeze(1)
            loss   = criterion(logits, lbl)

        if train:
            optimizer.zero_grad()
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
            scaler.step(optimizer)
            scaler.update()

        loss_sum += loss.item() * lbl.size(0)
        all_pred.append(torch.sigmoid(logits).detach().cpu() > 0.5)
        all_true.append(lbl.cpu().bool())

    y_pred = torch.cat(all_pred)
    y_true = torch.cat(all_true)
    acc  = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(
                           y_true, y_pred, average='binary', zero_division=0)
    return loss_sum / len(loader.dataset), acc, prec, rec, f1


device: cuda


AttributeError: 'BlinkSeqDataset' object has no attribute 'labels'

In [None]:
best_f1, patience, epochs_no_improve = 0, 7, 0
for epoch in range(40):
    tr_loss, _, _, _, tr_f1 = run_epoch(train_dl, train=True)
    va_loss, _, _, _, va_f1 = run_epoch(test_dl,  train=False)
    scheduler.step(va_loss)

    print(f"[{epoch+1:02}] trainL {tr_loss:.3f} F1 {tr_f1:.3f} | "
          f"valL {va_loss:.3f} F1 {va_f1:.3f}")

    if va_f1 > best_f1 + 1e-3:
        best_f1, epochs_no_improve = va_f1, 0
        torch.save(model.state_dict(), "blink_best.pth")
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stop – no F1 improvement")
            break


RuntimeError: DataLoader worker (pid(s) 12828, 33428) exited unexpectedly

In [None]:
model.load_state_dict(torch.load("blink_best.pth"))
model.eval()

def predict_sequence(eye_seq, num_seq):
    # eye_seq: (30,1,24,12) numpy
    eye = torch.from_numpy(eye_seq).unsqueeze(0).float().to(device)
    num = torch.from_numpy(num_seq).unsqueeze(0).float().to(device)
    with torch.no_grad():
        prob = torch.sigmoid(model(eye, num)).item()
    return prob   # >0.5 → intentional blink
