In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import os
from pathlib import Path

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

In [None]:
DATA_DIR = Path("apnea-ecg-database-1.0.0")

FS = 100  # Hz

# Window length (60 seconds)
WINDOW_SEC = 60
WINDOW_SAMPLES = FS * WINDOW_SEC

print("Window samples:", WINDOW_SAMPLES)

In [None]:
import wfdb
record_name = "a01"
record_path = DATA_DIR / record_name

signal, fields = wfdb.rdsamp(str(record_path))

print("Signal shape:", signal.shape)
print("Sampling frequency:", fields['fs'])

In [None]:
plt.figure(figsize=(12,3))
plt.plot(signal[:5000])
plt.title("Raw ECG (first ~50s)")
plt.xlabel("Samples")
plt.ylabel("Amplitude")
plt.show()

In [None]:
ann = wfdb.rdann(str(record_path), 'apn')

print("Annotation attributes:")
print([attr for attr in dir(ann) if not attr.startswith("_")])

In [None]:
print("Number of minutes:", len(ann.symbol))
print("First labels:", ann.symbol[:10])

In [None]:
ecg_signal = signal[:, 0]

print("ECG signal shape:", ecg_signal.shape)

In [None]:
ecg_trimmed = ecg_signal[:required_samples]
print("Trimmed ECG shape:", ecg_trimmed.shape)

In [None]:
X = ecg_trimmed.reshape(
    n_minutes,
    WINDOW_SAMPLES,
    1
)

print("X shape:", X.shape)

In [None]:
y = np.array([1 if l == 'A' else 0 for l in labels])

print("y shape:", y.shape)
print("First 20 labels:", y[:20])

In [None]:
unique, counts = np.unique(y, return_counts=True)
for u, c in zip(unique, counts):
    print(f"Class {u}: {c} samples")

In [None]:
idx = np.where(y == 1)[0][0]  # πρώτο apnea window

plt.figure(figsize=(12, 3))
plt.plot(X[idx].squeeze())
plt.title("ECG window with Apnea label")
plt.xlabel("Samples")
plt.ylabel("Amplitude")
plt.show()

In [None]:
unique, counts = np.unique(y, return_counts=True)
print(dict(zip(unique, counts)))

In [None]:
import wfdb
from collections import Counter

# Training records according to PhysioNet
train_records = (
    [f"a{i:02d}" for i in range(1, 21)] +
    [f"b{i:02d}" for i in range(1, 6)] +
    [f"c{i:02d}" for i in range(1, 11)]
)

len(train_records), train_records[:5]

In [None]:
label_stats = {}

for rec in train_records:
    ann = wfdb.rdann(str(DATA_DIR / rec), 'apn')
    
    # ann.symbol is the correct attribute
    labels = ann.symbol
    
    counts = Counter(labels)
    label_stats[rec] = counts

In [None]:
for rec, counts in label_stats.items():
    n_normal = counts.get('N', 0)
    n_apnea = counts.get('A', 0)
    total = n_normal + n_apnea
    
    print(f"{rec}: N={n_normal:4d}, A={n_apnea:4d}, total={total}")

In [None]:
SELECTED_RECORDS = [
    'a01','a02','a03','a04','a05','a06','a07','a08','a09','a10',
    'a11','a12','a13','a14','a15','a16','a17','a18','a19','a20',
    'b02'
]


In [None]:
def load_record_windows(record_name, data_dir):
    record_path = data_dir / record_name

    # Load ECG
    signal, fields = wfdb.rdsamp(str(record_path))
    ecg_signal = signal[:, 0].astype(np.float32)

    # Load apnea annotations
    ann = wfdb.rdann(str(record_path), 'apn')
    labels = np.array(ann.symbol)

    # Compute valid minutes
    max_full_minutes = ecg_signal.shape[0] // WINDOW_SAMPLES
    n_minutes = min(len(labels), max_full_minutes)

    if n_minutes == 0:
        return np.empty((0, WINDOW_SAMPLES, 1)), np.empty((0,), dtype=int), {}

    # Trim
    ecg_trimmed = ecg_signal[:n_minutes * WINDOW_SAMPLES]
    labels = labels[:n_minutes]

    # Reshape
    X = ecg_trimmed.reshape(n_minutes, WINDOW_SAMPLES, 1)
    y = np.array([1 if l == 'A' else 0 for l in labels])

    meta = {
        "record": record_name,
        "num_minutes": n_minutes
    }

    return X, y, meta


In [None]:
X_all = []
y_all = []
subjects = []

for rec in SELECTED_RECORDS:
    X_rec, y_rec, meta = load_record_windows(rec, DATA_DIR)
    print(f"{rec}: X_rec {X_rec.shape}, y_rec {y_rec.shape}")

    if len(y_rec) == 0:
        continue

    X_all.append(X_rec)
    y_all.append(y_rec)
    subjects.extend([rec] * len(y_rec))


In [None]:
X = np.concatenate(X_all, axis=0)
y = np.concatenate(y_all, axis=0)
subjects = np.array(subjects)

print("FINAL DATASET")
print("X shape:", X.shape)
print("y shape:", y.shape)
print("Unique subjects:", len(np.unique(subjects)))
print("Label distribution:", dict(zip(*np.unique(y, return_counts=True))))


In [None]:
import numpy as np

SEED = 42
rng = np.random.default_rng(SEED)

unique_subjects = np.unique(subjects)
print("All subjects:", unique_subjects)
print("Number of subjects:", len(unique_subjects))


In [None]:
shuffled_subjects = unique_subjects.copy()
rng.shuffle(shuffled_subjects)

train_subjects = shuffled_subjects[:15]
val_subjects   = shuffled_subjects[15:18]
test_subjects  = shuffled_subjects[18:]

print("Train subjects:", train_subjects)
print("Val subjects:", val_subjects)
print("Test subjects:", test_subjects)


In [None]:
def subject_mask(subject_array, subject_list):
    subject_set = set(subject_list)
    return np.array([s in subject_set for s in subject_array])

train_mask = subject_mask(subjects, train_subjects)
val_mask   = subject_mask(subjects, val_subjects)
test_mask  = subject_mask(subjects, test_subjects)

X_train, y_train = X[train_mask], y[train_mask]
X_val,   y_val   = X[val_mask],   y[val_mask]
X_test,  y_test  = X[test_mask],  y[test_mask]

print("Train:", X_train.shape, y_train.shape)
print("Val:  ", X_val.shape, y_val.shape)
print("Test: ", X_test.shape, y_test.shape)


In [None]:
from collections import Counter

def print_split_stats(name, y_split):
    c = Counter(y_split)
    total = len(y_split)
    print(f"{name}: total={total}, "
          f"Normal={c.get(0,0)}, Apnea={c.get(1,0)}, "
          f"Apnea%={100*c.get(1,0)/total:.1f}%")

print_split_stats("Train", y_train)
print_split_stats("Val",   y_val)
print_split_stats("Test",  y_test)


In [None]:
import numpy as np

train_mean = X_train.mean()
train_std  = X_train.std() + 1e-8

print("Train mean:", train_mean)
print("Train std:", train_std)

def normalize(X, mean, std):
    return (X - mean) / std

X_train_n = normalize(X_train, train_mean, train_std)
X_val_n   = normalize(X_val,   train_mean, train_std)
X_test_n  = normalize(X_test,  train_mean, train_std)

print("Normalized shapes:", X_train_n.shape, X_val_n.shape, X_test_n.shape)


In [None]:
from collections import Counter

def compute_class_weights(y):
    c = Counter(y)
    total = len(y)
    w0 = total / (2.0 * c.get(0, 1))
    w1 = total / (2.0 * c.get(1, 1))
    return {0: w0, 1: w1}, c

class_weights, train_counts = compute_class_weights(y_train)
print("Train counts:", train_counts)
print("Class weights:", class_weights)


In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

WINDOW_SAMPLES = X_train.shape[1]

def make_baseline_cnn(input_shape=(WINDOW_SAMPLES, 1)):
    inp = keras.Input(shape=input_shape)

    x = layers.Conv1D(16, kernel_size=7, padding="same")(inp)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)

    x = layers.Conv1D(32, kernel_size=5, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)

    x = layers.Conv1D(64, kernel_size=3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dropout(0.3)(x)

    out = layers.Dense(1, activation="sigmoid")(x)

    model = keras.Model(inp, out)
    model.compile(
        optimizer=keras.optimizers.Adam(1e-3),
        loss="binary_crossentropy",
        metrics=[
            keras.metrics.BinaryAccuracy(name="acc"),
            keras.metrics.AUC(name="auc"),
        ]
    )
    return model

model = make_baseline_cnn()
model.summary()


In [None]:
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_auc", mode="max", patience=8, restore_best_weights=True
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_auc", mode="max", factor=0.5, patience=3, min_lr=1e-6
    ),
]

history = model.fit(
    X_train_n, y_train,
    validation_data=(X_val_n, y_val),
    epochs=50,
    batch_size=32,
    class_weight=class_weights,
    callbacks=callbacks,
    verbose=1
)


In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(history.history["loss"], label="train_loss")
plt.plot(history.history["val_loss"], label="val_loss")
plt.legend()
plt.title("Loss")
plt.show()

plt.figure()
plt.plot(history.history["auc"], label="train_auc")
plt.plot(history.history["val_auc"], label="val_auc")
plt.legend()
plt.title("AUC")
plt.show()


In [None]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix, RocCurveDisplay

def eval_split(model, Xs, ys, name="split", threshold=0.5):
    probs = model.predict(Xs, batch_size=64).ravel()
    preds = (probs >= threshold).astype(int)

    acc = accuracy_score(ys, preds)
    f1  = f1_score(ys, preds, zero_division=0)
    auc = roc_auc_score(ys, probs) if len(np.unique(ys)) > 1 else np.nan
    cm  = confusion_matrix(ys, preds)

    print(f"\n{name.upper()} @thr={threshold:.2f}: acc={acc:.3f}, f1={f1:.3f}, auc={auc:.3f}")
    print("Confusion matrix:\n", cm)
    return probs, preds

p_val, _  = eval_split(model, X_val_n,  y_val,  "val", threshold=0.5)
p_test, _ = eval_split(model, X_test_n, y_test, "test", threshold=0.5)

RocCurveDisplay.from_predictions(y_test, p_test)
plt.title("ROC (Test)")
plt.show()


In [None]:
thresholds = np.linspace(0.05, 0.95, 19)
best_thr, best_f1 = 0.5, -1

for thr in thresholds:
    preds = (p_val >= thr).astype(int)
    f1 = f1_score(y_val, preds, zero_division=0)
    if f1 > best_f1:
        best_f1 = f1
        best_thr = thr

print("Best threshold (val):", best_thr)
print("Best F1 (val):", best_f1)

# Re-evaluate with tuned threshold
_ = eval_split(model, X_val_n,  y_val,  "val",  threshold=best_thr)
_ = eval_split(model, X_test_n, y_test, "test", threshold=best_thr)


In [None]:
import matplotlib.pyplot as plt

probs_test = model.predict(X_test_n, batch_size=64).ravel()

plt.figure(figsize=(6,4))
plt.hist(probs_test[y_test==0], bins=30, alpha=0.6, label="Normal (0)")
plt.hist(probs_test[y_test==1], bins=30, alpha=0.6, label="Apnea (1)")
plt.xlabel("Predicted probability")
plt.ylabel("Count")
plt.legend()
plt.title("Predicted probability distribution (Test)")
plt.show()


In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score

rand_probs = np.random.rand(len(y_test))
rand_auc = roc_auc_score(y_test, rand_probs)

print("Model test AUC:", roc_auc_score(y_test, probs_test))
print("Random baseline AUC:", rand_auc)


In [None]:
idx_apnea  = np.where(y_test == 1)[0][0]
idx_normal = np.where(y_test == 0)[0][0]

idx_amb = np.argmin(np.abs(probs_test - 0.5))

indices = [
    ("Apnea", idx_apnea),
    ("Normal", idx_normal),
    ("Ambiguous", idx_amb),
]

plt.figure(figsize=(12,6))

for i, (name, idx) in enumerate(indices, 1):
    plt.subplot(3,1,i)
    plt.plot(X_test[idx].squeeze())
    plt.title(
        f"{name} | True label={y_test[idx]} | "
        f"Pred prob={probs_test[idx]:.2f}"
    )
    plt.ylabel("Amplitude")

plt.xlabel("Samples")
plt.tight_layout()
plt.show()


In [None]:
DS = 10

def downsample(X, factor):
    return X[:, ::factor, :]

X_train_lstm = downsample(X_train_n, DS)
X_val_lstm   = downsample(X_val_n,   DS)
X_test_lstm  = downsample(X_test_n,  DS)

print("LSTM shapes:")
print("Train:", X_train_lstm.shape)
print("Val:  ", X_val_lstm.shape)
print("Test: ", X_test_lstm.shape)


In [None]:
from tensorflow import keras
from tensorflow.keras import layers

def make_lstm_model(input_shape=(600, 1)):
    inp = keras.Input(shape=input_shape)

    x = layers.LSTM(64, return_sequences=False)(inp)
    x = layers.Dropout(0.3)(x)

    out = layers.Dense(1, activation="sigmoid")(x)

    model = keras.Model(inp, out)
    model.compile(
        optimizer=keras.optimizers.Adam(1e-3),
        loss="binary_crossentropy",
        metrics=[
            keras.metrics.BinaryAccuracy(name="acc"),
            keras.metrics.AUC(name="auc"),
        ]
    )
    return model

lstm_model = make_lstm_model(input_shape=X_train_lstm.shape[1:])
lstm_model.summary()


In [None]:
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_auc", mode="max", patience=8, restore_best_weights=True
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_auc", mode="max", factor=0.5, patience=3, min_lr=1e-6
    ),
]

history_lstm = lstm_model.fit(
    X_train_lstm, y_train,
    validation_data=(X_val_lstm, y_val),
    epochs=50,
    batch_size=32,
    class_weight=class_weights,
    callbacks=callbacks,
    verbose=1
)


In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(history_lstm.history["loss"], label="train_loss")
plt.plot(history_lstm.history["val_loss"], label="val_loss")
plt.legend()
plt.title("LSTM Loss")
plt.show()

plt.figure()
plt.plot(history_lstm.history["auc"], label="train_auc")
plt.plot(history_lstm.history["val_auc"], label="val_auc")
plt.legend()
plt.title("LSTM AUC")
plt.show()


In [None]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix

def eval_split(model, Xs, ys, name="split", threshold=0.5):
    probs = model.predict(Xs, batch_size=64).ravel()
    preds = (probs >= threshold).astype(int)

    acc = accuracy_score(ys, preds)
    f1  = f1_score(ys, preds, zero_division=0)
    auc = roc_auc_score(ys, probs) if len(set(ys)) > 1 else float("nan")
    cm  = confusion_matrix(ys, preds)

    print(f"\n{name.upper()} @thr={threshold:.2f}: acc={acc:.3f}, f1={f1:.3f}, auc={auc:.3f}")
    print("Confusion matrix:\n", cm)

    return probs

p_val_lstm  = eval_split(lstm_model, X_val_lstm,  y_val,  "val")
p_test_lstm = eval_split(lstm_model, X_test_lstm, y_test, "test")


In [None]:
SUBSEQ_LEN = 600   # 6 seconds
N_SUBSEQ = 10

def reshape_for_cnn_lstm(X):
    return X.reshape(X.shape[0], N_SUBSEQ, SUBSEQ_LEN, 1)

Xtr = reshape_for_cnn_lstm(X_train)
Xva = reshape_for_cnn_lstm(X_val)
Xte = reshape_for_cnn_lstm(X_test)

print(Xtr.shape)  # (N, 10, 600, 1)


In [None]:
from tensorflow.keras import layers, models

def build_cnn_encoder():
    inp = layers.Input(shape=(600, 1))
    
    x = layers.Conv1D(16, 7, padding="same")(inp)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(2)(x)

    x = layers.Conv1D(32, 5, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(2)(x)

    x = layers.Conv1D(64, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.GlobalAveragePooling1D()(x)

    return models.Model(inp, x, name="cnn_encoder")


In [None]:
cnn_encoder = build_cnn_encoder()

inp = layers.Input(shape=(N_SUBSEQ, SUBSEQ_LEN, 1))

x = layers.TimeDistributed(cnn_encoder)(inp)
x = layers.LSTM(64, return_sequences=False)(x)
x = layers.Dropout(0.5)(x)

out = layers.Dense(1, activation="sigmoid")(x)

model = models.Model(inp, out)


In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss="binary_crossentropy",
    metrics=[
        "accuracy",
        tf.keras.metrics.AUC(name="auc")
    ]
)

model.summary()


In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_auc",
        mode="max",
        patience=5,
        restore_best_weights=True
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_auc",
        mode="max",
        factor=0.5,
        patience=3
    )
]

history = model.fit(
    Xtr, y_train,
    validation_data=(Xva, y_val),
    epochs=50,
    batch_size=32,
    class_weight=class_weights,
    callbacks=callbacks,
    verbose=1
)


In [None]:
print("X_val shape:", X_val.shape)
print("X_test shape:", X_test.shape)


In [None]:
def to_cnn_lstm(X):
    # X: (N, 6000, 1)
    N = X.shape[0]
    return X.reshape(N, 10, 600, 1)

X_val_cnnlstm  = to_cnn_lstm(X_val)
X_test_cnnlstm = to_cnn_lstm(X_test)


In [None]:
print(X_val_cnnlstm.shape)
# (1474, 10, 600, 1)

In [None]:
y_val_prob  = model.predict(X_val_cnnlstm).ravel()
y_test_prob = model.predict(X_test_cnnlstm).ravel()


In [None]:
print(X_val.shape)


In [None]:
from sklearn.metrics import roc_auc_score

val_auc  = roc_auc_score(y_val, y_val_prob)
test_auc = roc_auc_score(y_test, y_test_prob)

print(f"VAL AUC:  {val_auc:.3f}")
print(f"TEST AUC: {test_auc:.3f}")


In [None]:
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report

thr = 0.5
y_test_pred = (y_test_prob >= thr).astype(int)

acc = accuracy_score(y_test, y_test_pred)
f1  = f1_score(y_test, y_test_pred)
cm  = confusion_matrix(y_test, y_test_pred)

print(f"TEST @thr=0.50 | acc={acc:.3f}, f1={f1:.3f}")
print("Confusion matrix:\n", cm)
print(classification_report(y_test, y_test_pred, digits=3))


In [None]:
import numpy as np
from sklearn.metrics import f1_score

thresholds = np.linspace(0.05, 0.95, 91)
f1_scores = []

for t in thresholds:
    y_val_pred = (y_val_prob >= t).astype(int)
    f1_scores.append(f1_score(y_val, y_val_pred))

best_thr = thresholds[np.argmax(f1_scores)]
best_f1  = np.max(f1_scores)

print(f"Best threshold (val): {best_thr:.2f}")
print(f"Best F1 (val): {best_f1:.3f}")


In [None]:
y_test_pred_opt = (y_test_prob >= best_thr).astype(int)

acc_opt = accuracy_score(y_test, y_test_pred_opt)
f1_opt  = f1_score(y_test, y_test_pred_opt)
cm_opt  = confusion_matrix(y_test, y_test_pred_opt)

print(f"TEST @thr={best_thr:.2f} | acc={acc_opt:.3f}, f1={f1_opt:.3f}")
print("Confusion matrix:\n", cm_opt)


In [None]:
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

fpr, tpr, _ = roc_curve(y_test, y_test_prob)

plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label=f"AUC = {test_auc:.3f}")
plt.plot([0,1],[0,1],'--',color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC curve (CNN–LSTM, Test set)")
plt.legend()
plt.grid(True)
plt.show()
