1. Import library

In [1]:
import os
import random
import numpy as np
import torch
import math
import tensorflow as tf
from tensorflow.keras import layers, models
from typing import List, Tuple, Callable, Optional

2025-11-14 08:44:23.463872: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1763109863.618376      39 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1763109863.667372      39 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


2. Audio Augmentation

* Time masking
* Frequency masking
* Additive noise

In [2]:
class AudioAugmentation:
    def __init__(self,
                 time_mask_prob=0.5, freq_mask_prob=0.5,
                 time_mask_width=20, freq_mask_width=10,
                 noise_prob=0.3, noise_level=0.05):
        self.time_mask_prob = time_mask_prob
        self.freq_mask_prob = freq_mask_prob
        self.time_mask_width = time_mask_width
        self.freq_mask_width = freq_mask_width
        self.noise_prob = noise_prob
        self.noise_level = noise_level

    def add_time_mask(self, spec):
        time_length = spec.shape[1]
        if time_length <= 0:
            return spec
        start = random.randint(0, max(0, time_length - self.time_mask_width))
        end = min(start + self.time_mask_width, time_length)
        spec_copy = spec.copy()
        spec_copy[:, start:end] = 0
        return spec_copy

    def add_freq_mask(self, spec):
        freq_length = spec.shape[0]
        if freq_length <= 0:
            return spec
        start = random.randint(0, max(0, freq_length - self.freq_mask_width))
        end = min(start + self.freq_mask_width, freq_length)
        spec_copy = spec.copy()
        spec_copy[start:end, :] = 0
        return spec_copy

    def add_gaussian_noise(self, spec):
        noise = np.random.normal(0, self.noise_level, spec.shape)
        return spec + noise

    def __call__(self, spec):
        if random.random() < self.time_mask_prob:
            spec = self.add_time_mask(spec)
        if random.random() < self.freq_mask_prob:
            spec = self.add_freq_mask(spec)
        if random.random() < self.noise_prob:
            spec = self.add_gaussian_noise(spec)
        return spec

3. Lazy loading

This dataset class enables efficient lazy loading of apnea spectrograms by reading individual samples on demand using memory mapping. It reduces memory usage and supports optional data augmentation during training.

In [3]:
import os, math, random
import numpy as np
import tensorflow as tf

class LazyApneaTFDataset:
    def __init__(self, root_dir, sample_list, transform=None):
        self.root_dir = root_dir
        self.sample_list = sample_list  # list of ((patient_id, idx), label)
        self.transform = transform

    def _generator(self):
        for (pid, i), label in self.sample_list:
            x_path = os.path.join(self.root_dir, "block", pid, "X.npy")
            spec = np.load(x_path, mmap_mode='r')[i]   # shape (64, 311)
            if self.transform:
                spec = self.transform(spec)
            spec = spec.T.astype(np.float32)  # -> (311, 64)
            yield spec, int(label)

    def get_tf_dataset(self, batch_size=32, shuffle=True, buffer_size=1000):
        output_signature = (
            tf.TensorSpec(shape=(311, 64), dtype=tf.float32),
            tf.TensorSpec(shape=(), dtype=tf.int64),
        )
        ds = tf.data.Dataset.from_generator(self._generator, output_signature=output_signature)
        if shuffle:
            ds = ds.shuffle(buffer_size=buffer_size, seed=42)
        ds = ds.batch(batch_size, drop_remainder=False).prefetch(tf.data.AUTOTUNE)
        return ds


4. Build dataset

The dataset preparation pipeline is built around the ApneaDatasetBuilder class, which manages loading patient-specific data for sleep apnea detection. It provides three types of splitting strategies:

Random split: randomly divides all samples into training, validation, and test sets.

Dependent-subject split: splits each patient's data into train/val/test sets, then combines them.

Independent-subject split: assigns different patients entirely to train, validation, or test sets.

The class supports optional class balancing to handle data imbalance and integrates lazy loading and data augmentation for training.

To generate data loaders from a chosen split, the create_dataloaders_from_builder function wraps the dataset builder's output into PyTorch DataLoader objects. This setup enables efficient batching and parallel loading during model training, with configurable batch size, number of workers, and random seed

In [4]:
class ApneaDatasetBuilder:
    def __init__(self, root_dir, balance=False, augmentation=None):
        self.root_dir = root_dir
        self.balance = balance
        self.augmentation = augmentation
        self.patient_ids = []
        self.label_per_patient = {}
        self.all_samples = []

        block_dir = os.path.join(root_dir, "block")
        if not os.path.isdir(block_dir):
            raise FileNotFoundError(f"Directory not found: {block_dir}")

        for patient_id in sorted(os.listdir(block_dir)):
            patient_path = os.path.join(block_dir, patient_id)
            if not os.path.isdir(patient_path):
                continue
            y_path = os.path.join(patient_path, "y.npy")
            if not os.path.exists(y_path):
                continue
            y = np.load(y_path)
            self.patient_ids.append(patient_id)
            self.label_per_patient[patient_id] = y
            for i in range(len(y)):
                self.all_samples.append(((patient_id, i), int(y[i])))

    def __len__(self):
        return len(self.all_samples)

    def balance_samples(self, samples):
        label_to_samples = {}
        for sample, lbl in samples:
            label_to_samples.setdefault(lbl, []).append(sample)
        if not label_to_samples:
            return samples
        max_count = max(len(lst) for lst in label_to_samples.values())
        balanced = []
        for lbl, spec_list in label_to_samples.items():
            for spec in spec_list:
                balanced.append((spec, lbl))
            while len(spec_list) < max_count:
                spec = random.choice(spec_list)
                balanced.append((spec, lbl))
                spec_list.append(spec)
        random.shuffle(balanced)
        return balanced

    def _create_dataset(self, samples, transform=None):
      return LazyApneaTFDataset(self.root_dir, samples, transform=transform)

    def split_random(self, train_ratio=0.7, val_ratio=0.15, seed=42):
        total_samples = len(self.all_samples)
        train_size = int(train_ratio * total_samples)
        val_size = int(val_ratio * total_samples)
        test_size = total_samples - train_size - val_size
        random.seed(seed)
        indices = list(range(total_samples))
        random.shuffle(indices)
        train_idx = indices[:train_size]
        val_idx = indices[train_size:train_size + val_size]
        test_idx = indices[train_size + val_size:]

        train_samples = [self.all_samples[i] for i in train_idx]
        val_samples = [self.all_samples[i] for i in val_idx]
        test_samples = [self.all_samples[i] for i in test_idx]

        if self.balance:
            train_samples = self.balance_samples(train_samples)

        train_dataset = self._create_dataset(train_samples, transform=self.augmentation)
        val_dataset = self._create_dataset(val_samples)
        test_dataset = self._create_dataset(test_samples)

        print(f"Dependent Subject Split: {len(train_samples)} train, {len(val_samples)} val, {len(test_samples)} test")
        return train_dataset, val_dataset, test_dataset

    def split_dependent_subject(self, train_ratio=0.7, val_ratio=0.15, seed=42):
        random.seed(seed)
        train_samples, val_samples, test_samples = [], [], []
        for pid in self.patient_ids:
            y = self.label_per_patient[pid]
            num_samples = len(y)
            indices = list(range(num_samples))
            random.shuffle(indices)
            train_end = int(train_ratio * num_samples)
            val_end = train_end + int(val_ratio * num_samples)
            for i in indices[:train_end]:
                train_samples.append(((pid, i), int(y[i])))
            for i in indices[train_end:val_end]:
                val_samples.append(((pid, i), int(y[i])))
            for i in indices[val_end:]:
                test_samples.append(((pid, i), int(y[i])))

        if self.balance:
            train_samples = self.balance_samples(train_samples)

        train_dataset = self._create_dataset(train_samples, transform=self.augmentation)
        val_dataset = self._create_dataset(val_samples)
        test_dataset = self._create_dataset(test_samples)

        print(f"Dependent Subject Split: {len(train_samples)} train, {len(val_samples)} val, {len(test_samples)} test")
        return train_dataset, val_dataset, test_dataset

    def split_independent_subject(self, train_ratio=0.7, val_ratio=0.15, seed=42):
        random.seed(seed)
        patient_count = len(self.patient_ids)
        train_count = max(int(train_ratio * patient_count), 1)
        val_count = max(int(val_ratio * patient_count), 1)
        test_count = patient_count - train_count - val_count
        indices = list(range(patient_count))
        random.shuffle(indices)
        train_ids = [self.patient_ids[i] for i in indices[:train_count]]
        val_ids = [self.patient_ids[i] for i in indices[train_count:train_count + val_count]]
        test_ids = [self.patient_ids[i] for i in indices[train_count + val_count:]]

        train_samples = [((pid, i), int(self.label_per_patient[pid][i])) for pid in train_ids for i in range(len(self.label_per_patient[pid]))]
        val_samples = [((pid, i), int(self.label_per_patient[pid][i])) for pid in val_ids for i in range(len(self.label_per_patient[pid]))]
        test_samples = [((pid, i), int(self.label_per_patient[pid][i])) for pid in test_ids for i in range(len(self.label_per_patient[pid]))]

        if self.balance:
            train_samples = self.balance_samples(train_samples)

        train_dataset = self._create_dataset(train_samples, transform=self.augmentation)
        val_dataset = self._create_dataset(val_samples)
        test_dataset = self._create_dataset(test_samples)

        print(f"Dependent Subject Split: {len(train_samples)} train, {len(val_samples)} val, {len(test_samples)} test")

        return train_dataset, val_dataset, test_dataset

In [5]:
def create_tf_datasets_from_builder(builder: ApneaDatasetBuilder,
                                    split_type: str = "dependent",
                                    batch_size: int = 32,
                                    seed: int = 42):

    if split_type == "random":
        train_dataset, val_dataset, test_dataset = builder.split_random(seed=seed)
    elif split_type == "dependent":
        train_dataset, val_dataset, test_dataset = builder.split_dependent_subject(seed=seed)
    elif split_type == "independent":
        train_dataset, val_dataset, test_dataset = builder.split_independent_subject(seed=seed)
    else:
        raise ValueError(f"Unsupported split type: {split_type}")

    train_ds = train_dataset.get_tf_dataset(batch_size=batch_size, shuffle=True)
    val_ds = val_dataset.get_tf_dataset(batch_size=batch_size, shuffle=False)
    test_ds = test_dataset.get_tf_dataset(batch_size=batch_size, shuffle=False)

    return train_ds, val_ds, test_ds


5. CNN Model

In [35]:
def create_cnn_model(input_shape=(64, 311, 1), num_classes=5):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv2D(16, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),

        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

model = create_cnn_model()
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.summary()

6. GRU model

In [44]:
from tensorflow.keras import layers, models

def build_rnn_model(input_shape=(311,64), num_classes=5, rnn_units=128):
    inp = layers.Input(shape=input_shape)   # (time, feat)
    x = layers.Bidirectional(layers.GRU(rnn_units, return_sequences=True))(inp)
    x = layers.Dropout(0.3)(x)
    x = layers.Bidirectional(layers.GRU(rnn_units//2))(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(128, activation='relu')(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return models.Model(inp, out)



In [45]:
num_classes = 5   
model = build_rnn_model(input_shape=(311,64), num_classes=num_classes, rnn_units=128)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Callbacks
callbacks = [
    tf.keras.callbacks.ModelCheckpoint("best_rnn.h5", monitor="val_accuracy", save_best_only=True, mode="max"),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1),
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True)
]
model.summary()


7. LSTM Model

In [7]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers

def build_lstm_model(input_shape=(311, 64),
                     num_classes=5,
                     lstm_units=128,
                     dropout_rate=0.3,
                     bidirectional=True):
    inp = layers.Input(shape=input_shape, name="input_spec")  # (311,64)
    x = inp

    if bidirectional:
        x = layers.Bidirectional(layers.LSTM(lstm_units, return_sequences=True))(x)
    else:
        x = layers.LSTM(lstm_units, return_sequences=True)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(dropout_rate)(x)

    # LSTM layer 2 (return last state)
    if bidirectional:
        x = layers.Bidirectional(layers.LSTM(lstm_units // 2))(x)
    else:
        x = layers.LSTM(lstm_units // 2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(dropout_rate)(x)

    # Fully connected head
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(0.4)(x)
    out = layers.Dense(num_classes, activation="softmax")(x)

    model = models.Model(inputs=inp, outputs=out, name="LSTM_apnea")
    return model

# Example: build + compile
num_classes = 5
model = build_lstm_model(input_shape=(311,64), num_classes=num_classes, lstm_units=128, dropout_rate=0.3)
model.compile(optimizer=optimizers.Adam(learning_rate=1e-3),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

model.summary()


8. Train

In [6]:
builder = ApneaDatasetBuilder(root_dir="/kaggle/input/dataset", balance=True)

train, val, test = create_tf_datasets_from_builder(builder, 'dependent')

Dependent Subject Split: 135375 train, 15291 val, 15550 test


I0000 00:00:1763109877.491867      39 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0


In [51]:
history = model.fit(
    train, 
    epochs=20,
    validation_data=val,
)
# đánh giá
test_loss, test_acc = model.evaluate(test)
print("Test accuracy: {test_acc:.4f}")


Epoch 1/20
   2240/Unknown [1m260s[0m 115ms/step - accuracy: 0.4017 - loss: 1.2912



[1m2240/2240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m291s[0m 129ms/step - accuracy: 0.4017 - loss: 1.2912 - val_accuracy: 0.3724 - val_loss: 1.3757
Epoch 2/20
[1m2240/2240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m180s[0m 80ms/step - accuracy: 0.4017 - loss: 1.2902 - val_accuracy: 0.3724 - val_loss: 1.3576
Epoch 3/20
[1m2240/2240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m178s[0m 78ms/step - accuracy: 0.3945 - loss: 1.2924 - val_accuracy: 0.3724 - val_loss: 1.3343
Epoch 4/20
[1m2240/2240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m186s[0m 82ms/step - accuracy: 0.3935 - loss: 1.2926 - val_accuracy: 0.3724 - val_loss: 1.3341
Epoch 5/20
[1m2240/2240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m191s[0m 84ms/step - accuracy: 0.3971 - loss: 1.2950 - val_accuracy: 0.3724 - val_loss: 1.3542
Epoch 6/20
[1m2240/2240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m197s[0m 87ms/step - accuracy: 0.3949 - loss: 1.2922 - val_accuracy: 0.3724 - val_loss: 1.3334
Epoch 7/

KeyboardInterrupt: 

In [None]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint("best_lstm.h5", monitor="val_accuracy", save_best_only=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3),
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True)
]

history = model.fit(
    train,
    epochs=20,
    validation_data=val,
    callbacks=callbacks
)

# Evaluate
test_loss, test_acc = model.evaluate(test)
print("Test accuracy:", test_acc)

Epoch 1/20
   4231/Unknown [1m695s[0m 163ms/step - accuracy: 0.2141 - loss: 1.6710



[1m4231/4231[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m757s[0m 178ms/step - accuracy: 0.2141 - loss: 1.6710 - val_accuracy: 0.3693 - val_loss: 1.5203 - learning_rate: 0.0010
Epoch 2/20
[1m4231/4231[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m604s[0m 142ms/step - accuracy: 0.2007 - loss: 1.6105 - val_accuracy: 0.3677 - val_loss: 1.4746 - learning_rate: 0.0010
Epoch 3/20
[1m 659/4231[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m8:21[0m 140ms/step - accuracy: 0.2006 - loss: 1.6093