<a href="https://colab.research.google.com/github/AdeshYadav8918/Smart-Sleep-Analytics-Automated-Apnea-Detection-using-AI/blob/master/chapter_appendix-tools-for-deep-learning/jupyter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Smart Sleep Analytics: Automated Apnea Detection using AI
**Comprehensive Google Colab notebook** (Academic style)
**Includes:** Synthetic data generation, optional PhysioNet data instructions, preprocessing, feature extraction, classical ML (Random Forest, SVM, Logistic Regression), deep models (CNN, LSTM), evaluation, visualizations, and discussion.

**Author:** Adesh Yadav
**Date:** October 2025

---
**How to use:**

- Run the cells sequentially in Google Colab.
- Use the Synthetic Data path to run immediately.
 - Or follow the PhysioNet section to download real data (requires internet and larger runtime).


#1) Install required packages
The following cell installs necessary packages. In Colab some are preinstalled, but we include them for completeness.

In [None]:
# Install packages (uncomment if needed in Colab)
# Note: TensorFlow is usually preinstalled in Colab; adjust version if required.
!pip install -q wfdb==4.1.1  # for PhysioNet (optional)
!pip install -q tensorflow==2.19.0 scikit-learn matplotlib pandas seaborn joblib

## 2) Imports and helper functions


In [None]:
# Standard imports
import os, random, math, time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal, stats
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
import joblib

# TensorFlow/Keras
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks

# ensure reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)

print('numpy', np.__version__, 'pandas', pd.__version__, 'tf', tf.__version__)


#3) Synthetic data generation (recommended to run immediately)
This generates respiratory and SpO2-like signals and injects apnea events. Use this to run the whole pipeline without external data.

In [None]:
def generate_synthetic(duration_sec=900, fs=100, apnea_rate_per_min=0.8):
    """Generate synthetic respiratory and SpO2 signals with labeled apnea events.
    Returns a pandas DataFrame with columns ['timestamp','resp','spo2','label']"""
    t = np.arange(0, duration_sec, 1/fs)
    # respiratory signal: base sine (breathing ~0.25 Hz) + small noise
    resp = 0.6 * np.sin(2*np.pi*0.25*t) + 0.05*np.random.randn(len(t))
    # SpO2 baseline with slow fluctuation and measurement noise
    spo2 = 98 + 0.8 * np.sin(2*np.pi*0.05*t) + 0.2*np.random.randn(len(t))
    labels = np.zeros_like(t, dtype=int)
    rng = np.random.RandomState(SEED)
    # Insert apnea events (randomly)
    num_events = int(duration_sec/60 * apnea_rate_per_min)
    for _ in range(num_events):
        start = rng.randint(0, len(t)-fs*20)
        length = rng.randint(int(fs*5), int(fs*15))  # 5-15 seconds
        resp[start:start+length] *= rng.uniform(0.05,0.2)  # suppressed breathing amplitude
        spo2[start:start+length] -= rng.uniform(2,8)        # SpO2 dip
        labels[start:start+length] = 1
    df = pd.DataFrame({'timestamp': t, 'resp': resp, 'spo2': spo2, 'label': labels})
    return df

# Generate data
df = generate_synthetic(duration_sec=900, fs=100)
df.head(), df.shape


#Quick plot of signals (first 60 seconds)

In [None]:
plt.figure(figsize=(12,4))
plt.subplot(2,1,1)
plt.plot(df['timestamp'][:6000], df['resp'][:6000])
plt.title('Respiratory signal (first 60s)')
plt.subplot(2,1,2)
plt.plot(df['timestamp'][:6000], df['spo2'][:6000])
plt.title('SpO2 signal (first 60s)')
plt.tight_layout()
plt.show()


#4) Segment the signals into windows for ML
We segment into overlapping windows and label a window as positive if a fraction of samples inside have apnea label >= threshold.

In [None]:
def segment_signal(df, window_size=300, step_size=150, cols=['resp','spo2'], label_col='label', threshold=0.5):
    X = []
    y = []
    n = len(df)
    for start in range(0, n - window_size + 1, step_size):
        window = df.iloc[start:start+window_size]
        X.append(window[cols].values)
        lbl = (window[label_col].mean() >= threshold).astype(int)
        y.append(lbl)
    return np.array(X), np.array(y)

# Parameters (3s windows at 100Hz -> 300 samples -> change if needed)
WINDOW_SIZE = 300
STEP_SIZE = 150
CHANNELS = ['resp','spo2']

X_segments, y = segment_signal(df, window_size=WINDOW_SIZE, step_size=STEP_SIZE, cols=CHANNELS, label_col='label', threshold=0.1)
print('Segments shape:', X_segments.shape, 'Labels distribution:', np.bincount(y))


#5) Feature extraction (statistical + spectral)
We extract stats (mean,std,skew,kurtosis) and spectral band powers via Welch.

In [None]:
from scipy import stats, signal

def stat_features(window):
    feats = []
    for c in range(window.shape[1]):
        col = window[:, c]
        feats.extend([np.mean(col), np.std(col), np.min(col), np.max(col),
                      np.median(col), np.percentile(col,25), np.percentile(col,75),
                      stats.skew(col), stats.kurtosis(col)])
    return np.array(feats)

def bandpower(x, fs, fmin, fmax):
    f, Pxx = signal.welch(x, fs=fs, nperseg=min(256, len(x)))
    mask = (f >= fmin) & (f <= fmax)
    return np.trapz(Pxx[mask], f[mask]) if np.any(mask) else 0.0

def spectral_features(window, fs=100.0):
    feats = []
    for c in range(window.shape[1]):
        col = window[:, c]
        feats.append(bandpower(col, fs, 0.1, 0.5))  # very low freq
        feats.append(bandpower(col, fs, 0.5, 2))    # low freq
        feats.append(bandpower(col, fs, 2, 5))      # respiratory band
    return np.array(feats)

def extract_features_from_segments(X_segments, fs=100.0):
    rows = []
    for w in X_segments:
        s = stat_features(w)
        p = spectral_features(w, fs=fs)
        rows.append(np.concatenate([s,p]))
    cols = [f'f{i}' for i in range(len(rows[0]))]
    return pd.DataFrame(rows, columns=cols)

# Extract features
X_feat_df = extract_features_from_segments(X_segments, fs=100.0)
X_feat_df.shape, X_feat_df.head()


#6) Train classical ML models (feature-based)
We train RandomForest, SVM, and Logistic Regression and evaluate using stratified split.

In [None]:
# Train-test split
X = X_feat_df.values
y_lbl = y
X_train, X_test, y_train, y_test = train_test_split(X, y_lbl, test_size=0.2, random_state=SEED, stratify=y_lbl)

# Scale features
scaler = StandardScaler().fit(X_train)
X_train_s = scaler.transform(X_train)
X_test_s = scaler.transform(X_test)

models = {
    'RandomForest': RandomForestClassifier(n_estimators=150, random_state=SEED),
    'SVM': SVC(kernel='rbf', probability=True, random_state=SEED),
    'LogisticRegression': LogisticRegression(max_iter=200, random_state=SEED)
}

results = {}
for name, m in models.items():
    print('\nTraining', name)
    m.fit(X_train_s, y_train)
    preds = m.predict(X_test_s)
    probs = m.predict_proba(X_test_s)[:,1] if hasattr(m, 'predict_proba') else None
    acc = accuracy_score(y_test, preds)
    prec = precision_score(y_test, preds, zero_division=0)
    rec = recall_score(y_test, preds, zero_division=0)
    f1 = f1_score(y_test, preds, zero_division=0)
    auc = roc_auc_score(y_test, probs) if probs is not None else None
    results[name] = {'model': m, 'acc': acc, 'prec': prec, 'rec': rec, 'f1': f1, 'auc': auc}
    print(f'ACC={acc:.3f}, PREC={prec:.3f}, REC={rec:.3f}, F1={f1:.3f}, AUC={auc if auc is not None else "N/A"}')

# Save scaler and best model (by F1)
best_name = max(results.keys(), key=lambda k: results[k]['f1'])
joblib.dump(scaler, 'scaler.joblib')
joblib.dump(results[best_name]['model'], f'{best_name}_model.joblib')
print('\nSaved best classical model:', best_name)


## 7) Deep learning models (end-to-end on windows)
Train a small LSTM and 1D-CNN on the raw windows (shape: N, T, C).

In [None]:
# Prepare data for deep models
X_deep = X_segments.astype('float32')
y_deep = y.astype('float32')
X_train_d, X_test_d, y_train_d, y_test_d = train_test_split(X_deep, y_deep, test_size=0.2, random_state=SEED, stratify=y_deep)

timesteps = X_train_d.shape[1]; n_features = X_train_d.shape[2]
print('Deep input shape', X_train_d.shape)

# Correctly import layers and models from tensorflow.keras
from tensorflow.keras import layers, models, callbacks

def build_lstm(timesteps, n_features):
    model = models.Sequential([
        layers.Input(shape=(timesteps, n_features)),
        layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
        layers.Bidirectional(layers.LSTM(32)),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_cnn(timesteps, n_features):
    model = models.Sequential([
        layers.Input(shape=(timesteps, n_features)),
        layers.Conv1D(64, kernel_size=5, activation='relu'),
        layers.MaxPooling1D(2),
        layers.Conv1D(32, kernel_size=3, activation='relu'),
        layers.GlobalAveragePooling1D(),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Training LSTM (light training for demo)
lstm = build_lstm(timesteps, n_features)
es = callbacks.EarlyStopping(patience=5, restore_best_weights=True)
history_lstm = lstm.fit(X_train_d, y_train_d, epochs=20, batch_size=32, validation_split=0.1, callbacks=[es], verbose=2)

# Evaluate LSTM
probs_lstm = lstm.predict(X_test_d).ravel()
preds_lstm = (probs_lstm > 0.5).astype(int)
print('LSTM - acc:', accuracy_score(y_test_d, preds_lstm), 'f1:', f1_score(y_test_d, preds_lstm, zero_division=0))

In [None]:
# Training CNN (light training for demo)
cnn = build_cnn(timesteps, n_features)
history_cnn = cnn.fit(X_train_d, y_train_d, epochs=20, batch_size=32, validation_split=0.1, callbacks=[es], verbose=2)

# Evaluate CNN
probs_cnn = cnn.predict(X_test_d).ravel()
preds_cnn = (probs_cnn > 0.5).astype(int)
print('CNN - acc:', accuracy_score(y_test_d, preds_cnn), 'f1:', f1_score(y_test_d, preds_cnn, zero_division=0))

# Save models
lstm.save('lstm_model.h5')
cnn.save('cnn_model.h5')
print('Saved LSTM and CNN models.')



#8) Evaluation & Visualizations
Confusion matrices, ROC curves, and short discussion.

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, ConfusionMatrixDisplay

def plot_confusion(y_true, y_pred, title='Confusion Matrix'):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot(cmap=plt.cm.Blues)
    plt.title(title)
    plt.show()

# Classical best model evaluation
best_clf = joblib.load(f'{best_name}_model.joblib')
X_test_s = scaler.transform(X_test)
probs_clf = best_clf.predict_proba(X_test_s)[:,1] if hasattr(best_clf, 'predict_proba') else None
preds_clf = best_clf.predict(X_test_s)
print('Best classical model:', best_name)
print('Metrics:', 'acc', accuracy_score(y_test, preds_clf), 'f1', f1_score(y_test, preds_clf, zero_division=0))
plot_confusion(y_test, preds_clf, title=f'{best_name} Confusion Matrix')

# LSTM/CNN confusion matrices
plot_confusion(y_test_d, preds_lstm, title='LSTM Confusion Matrix')
plot_confusion(y_test_d, preds_cnn, title='CNN Confusion Matrix')

# ROC curves
plt.figure(figsize=(8,6))
if probs_clf is not None:
    fpr, tpr, _ = roc_curve(y_test, probs_clf)
    plt.plot(fpr, tpr, label=f'{best_name} (AUC={auc(fpr,tpr):.3f})')
fpr, tpr, _ = roc_curve(y_test_d, probs_lstm)
plt.plot(fpr, tpr, label=f'LSTM (AUC={auc(fpr,tpr):.3f})')
fpr, tpr, _ = roc_curve(y_test_d, probs_cnn)
plt.plot(fpr, tpr, label=f'CNN (AUC={auc(fpr,tpr):.3f})')
plt.plot([0,1],[0,1],'--', color='gray')
plt.xlabel('FPR'); plt.ylabel('TPR'); plt.legend(); plt.title('ROC Curves'); plt.show()


#9) (Optional) Using real datasets from PhysioNet
This section explains how to download and use PhysioNet Sleep-EDF or Apnea-ECG datasets. Running this in Colab requires internet access and may take time.

Note: Data must be preprocessed into aligned CSV of channels used (resp, spo2, etc.) with a label column (0/1).

In [None]:
# Example: using wfdb to download a record (user must adapt for dataset and labeling)
# Uncomment and modify record names as required.
# import wfdb
# record = wfdb.rdrecord('apnea-database-1.0.0/records/xxxx')  # adjust path
# print(record.sig_name, record.fs, record.p_signal.shape)
# Convert signals to pandas and then follow segmentation/feature extraction steps


#10) Save artifacts & Next steps
Models (scaler.joblib, RandomForest_model.joblib, lstm_model.h5, cnn_model.h5) are saved in the working directory.
Next: try with real PhysioNet data, add explainability (SHAP), or deploy as a Flask app for inference.

In [None]:
# @title Default title text
# List saved files
import os
print('Files in workspace:')
for f in os.listdir('.')[:50]:
    if any(ext in f for ext in ['.h5','.joblib','.png']):
        print('-', f)
print('\nYou can download models from the Colab Files sidebar.')


Files in workspace:
- SVM_model.joblib
- lstm_model.h5
- scaler.joblib
- cnn_model.h5

You can download models from the Colab Files sidebar.
