# Vital Signs Anomaly Detection — ML Pipeline

**Author:** Damir Zhumangali  
**GitHub:** github.com/damirzhumangali  

## Overview

This project builds a machine learning pipeline for detecting anomalies in patient vital signs — extending the IoT-based Patient Health Monitoring System with intelligent data analysis.

The pipeline covers:
1. Data loading, cleaning, and exploratory analysis (Pandas, Matplotlib)
2. Classical ML classifiers with comparison (scikit-learn)
3. Unsupervised anomaly detection for unlabeled IoT streams (Isolation Forest)
4. Deep learning time-series model (PyTorch LSTM)
5. Rigorous validation with clinical metrics (ROC-AUC, precision-recall)

**Dataset:** UCI Heart Disease Dataset (303 patients, 14 features)  
**Task:** Binary classification — predict presence of heart disease


## 0. Install Dependencies

In [None]:
# Run once to install required packages
# !pip install pandas numpy matplotlib seaborn scikit-learn torch torchvision ucimlrepo

## 1. Data Loading & Exploration

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from ucimlrepo import fetch_ucirepo

# Load UCI Heart Disease dataset
heart_disease = fetch_ucirepo(id=45)

X = heart_disease.data.features
y = heart_disease.data.targets

# Combine into single DataFrame for exploration
df = X.copy()
df['target'] = (y.values.ravel() > 0).astype(int)  # 0 = no disease, 1 = disease

print('Dataset shape:', df.shape)
print('\nClass distribution:')
print(df['target'].value_counts())
print('\nFirst 5 rows:')
df.head()

In [None]:
# Feature descriptions
feature_descriptions = {
    'age': 'Age in years',
    'sex': 'Sex (1=male, 0=female)',
    'cp': 'Chest pain type (0-3)',
    'trestbps': 'Resting blood pressure (mmHg)',
    'chol': 'Serum cholesterol (mg/dl)',
    'fbs': 'Fasting blood sugar > 120 mg/dl (1=true)',
    'restecg': 'Resting ECG results (0-2)',
    'thalach': 'Max heart rate achieved (BPM)',
    'exang': 'Exercise-induced angina (1=yes)',
    'oldpeak': 'ST depression induced by exercise',
    'slope': 'Slope of peak exercise ST segment',
    'ca': 'Number of major vessels (0-3)',
    'thal': 'Thalassemia type',
}

print('Feature descriptions:')
for feat, desc in feature_descriptions.items():
    print(f'  {feat:12s} — {desc}')

In [None]:
# Check for missing values
print('Missing values:')
print(df.isnull().sum())
print('\nData types:')
print(df.dtypes)
print('\nStatistical summary:')
df.describe()

In [None]:
# Exploratory visualization
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('Vital Sign Distributions by Heart Disease Status', fontsize=14, fontweight='bold')

key_features = ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 'cp']
labels = ['Age', 'Resting BP (mmHg)', 'Cholesterol (mg/dl)', 
          'Max Heart Rate (BPM)', 'ST Depression', 'Chest Pain Type']

for ax, feat, label in zip(axes.flat, key_features, labels):
    for target_val, color, name in [(0, '#2196F3', 'No Disease'), (1, '#F44336', 'Heart Disease')]:
        data = df[df['target'] == target_val][feat].dropna()
        ax.hist(data, alpha=0.6, color=color, label=name, bins=20)
    ax.set_title(label)
    ax.legend()
    ax.set_xlabel(feat)
    ax.set_ylabel('Count')

plt.tight_layout()
plt.savefig('vital_signs_distribution.png', dpi=150, bbox_inches='tight')
plt.show()
print('Saved: vital_signs_distribution.png')

In [None]:
# Correlation heatmap
plt.figure(figsize=(12, 9))
corr_matrix = df.corr()
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(corr_matrix, mask=mask, annot=True, fmt='.2f', 
            cmap='RdBu_r', center=0, square=True, linewidths=0.5,
            cbar_kws={'shrink': 0.8})
plt.title('Feature Correlation Matrix', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('correlation_heatmap.png', dpi=150, bbox_inches='tight')
plt.show()

## 2. Data Preprocessing

In [None]:
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

# Handle missing values
imputer = SimpleImputer(strategy='median')
X_imputed = pd.DataFrame(imputer.fit_transform(X), columns=X.columns)

y_binary = (y.values.ravel() > 0).astype(int)

# Train/test split — stratified to preserve class balance
X_train, X_test, y_train, y_test = train_test_split(
    X_imputed, y_binary, 
    test_size=0.2, 
    random_state=42, 
    stratify=y_binary
)

# Feature scaling
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f'Training set: {X_train_scaled.shape[0]} samples')
print(f'Test set:     {X_test_scaled.shape[0]} samples')
print(f'\nClass balance (train): {np.bincount(y_train)}')
print(f'Class balance (test):  {np.bincount(y_test)}')

## 3. Classical ML Classifiers — Comparison

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.metrics import (
    classification_report, confusion_matrix, roc_auc_score,
    roc_curve, precision_recall_curve, average_precision_score
)

# Define classifiers
classifiers = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest':       RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting':   GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM (RBF kernel)':    SVC(kernel='rbf', probability=True, random_state=42),
}

results = {}

for name, clf in classifiers.items():
    clf.fit(X_train_scaled, y_train)
    y_pred = clf.predict(X_test_scaled)
    y_prob = clf.predict_proba(X_test_scaled)[:, 1]
    
    results[name] = {
        'model': clf,
        'y_pred': y_pred,
        'y_prob': y_prob,
        'roc_auc': roc_auc_score(y_test, y_prob),
        'avg_precision': average_precision_score(y_test, y_prob),
    }
    
    print(f'\n── {name} ──')
    print(classification_report(y_test, y_pred, target_names=['No Disease', 'Heart Disease']))
    print(f'ROC-AUC: {results[name]["roc_auc"]:.4f}')

In [None]:
# ROC curves comparison
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# ROC curve
ax = axes[0]
ax.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Random baseline')
for name, res in results.items():
    fpr, tpr, _ = roc_curve(y_test, res['y_prob'])
    ax.plot(fpr, tpr, label=f"{name} (AUC={res['roc_auc']:.3f})")
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('ROC Curves — Classifier Comparison')
ax.legend(loc='lower right', fontsize=9)
ax.grid(alpha=0.3)

# Precision-Recall curve
ax = axes[1]
for name, res in results.items():
    prec, rec, _ = precision_recall_curve(y_test, res['y_prob'])
    ax.plot(rec, prec, label=f"{name} (AP={res['avg_precision']:.3f})")
ax.set_xlabel('Recall')
ax.set_ylabel('Precision')
ax.set_title('Precision-Recall Curves')
ax.legend(loc='lower left', fontsize=9)
ax.grid(alpha=0.3)

plt.suptitle('Clinical Validation Metrics', fontsize=13, fontweight='bold')
plt.tight_layout()
plt.savefig('roc_pr_curves.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Confusion matrices
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('Confusion Matrices', fontsize=13, fontweight='bold')

for ax, (name, res) in zip(axes.flat, results.items()):
    cm = confusion_matrix(y_test, res['y_pred'])
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
                xticklabels=['No Disease', 'Heart Disease'],
                yticklabels=['No Disease', 'Heart Disease'])
    ax.set_title(f'{name}\nROC-AUC: {res["roc_auc"]:.3f}')
    ax.set_ylabel('Actual')
    ax.set_xlabel('Predicted')

plt.tight_layout()
plt.savefig('confusion_matrices.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Feature importance (Random Forest)
rf = results['Random Forest']['model']
importances = pd.Series(rf.feature_importances_, index=X.columns).sort_values(ascending=True)

plt.figure(figsize=(10, 7))
bars = plt.barh(importances.index, importances.values, color='#1F4E8A', alpha=0.8)
plt.xlabel('Feature Importance')
plt.title('Random Forest — Feature Importance\n(Which vital signs matter most?)', fontweight='bold')
plt.tight_layout()
plt.savefig('feature_importance.png', dpi=150, bbox_inches='tight')
plt.show()

## 4. Unsupervised Anomaly Detection

In real IoT deployments, labeled anomalies are rare — sensors stream continuously and ground truth labels don't exist. This section demonstrates unsupervised anomaly detection using Isolation Forest, which identifies data points that are statistically "isolated" from the normal distribution.

In [None]:
from sklearn.ensemble import IsolationForest
from sklearn.metrics import f1_score

# Train Isolation Forest on HEALTHY patients only (unsupervised — no labels used)
X_healthy = X_train_scaled[y_train == 0]

iso_forest = IsolationForest(
    n_estimators=100,
    contamination=0.1,  # expect ~10% anomalies
    random_state=42
)
iso_forest.fit(X_healthy)

# Predict on test set — (-1) = anomaly, (1) = normal
iso_pred_raw = iso_forest.predict(X_test_scaled)
iso_pred = (iso_pred_raw == -1).astype(int)  # convert to 0/1

# Anomaly scores (lower = more anomalous)
anomaly_scores = iso_forest.score_samples(X_test_scaled)

print('Isolation Forest — Anomaly Detection Results')
print('=' * 50)
print(f'Detected anomalies: {iso_pred.sum()} / {len(iso_pred)}')
print(f'\nComparison with true disease labels:')
print(classification_report(y_test, iso_pred, target_names=['Normal', 'Anomaly']))
print(f'\nNote: This model was trained WITHOUT labels — any performance')
print(f'above random baseline demonstrates meaningful anomaly detection.')

In [None]:
# Visualize anomaly scores
plt.figure(figsize=(12, 5))

# Histogram of anomaly scores
plt.subplot(1, 2, 1)
for label, color, name in [(0, '#2196F3', 'No Disease'), (1, '#F44336', 'Heart Disease')]:
    scores = anomaly_scores[y_test == label]
    plt.hist(scores, alpha=0.6, bins=25, color=color, label=name)
plt.axvline(x=iso_forest.threshold_, color='black', linestyle='--', label='Decision threshold')
plt.xlabel('Anomaly Score (lower = more anomalous)')
plt.ylabel('Count')
plt.title('Isolation Forest\nAnomaly Score Distribution')
plt.legend()

# 2D projection
plt.subplot(1, 2, 2)
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_2d = pca.fit_transform(X_test_scaled)

scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], 
                      c=anomaly_scores, cmap='RdYlGn',
                      alpha=0.7, s=60)
plt.colorbar(scatter, label='Anomaly Score')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]*100:.1f}% variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]*100:.1f}% variance)')
plt.title('PCA Projection\n(Red = anomalous, Green = normal)')

plt.suptitle('Unsupervised Anomaly Detection — Isolation Forest', fontweight='bold')
plt.tight_layout()
plt.savefig('anomaly_detection.png', dpi=150, bbox_inches='tight')
plt.show()

## 5. Deep Learning — LSTM for Time-Series Vital Signs (PyTorch)

LSTM networks are well-suited for sequential patient data — heart rate, SpO2, and temperature readings over time. Here we simulate a time-series scenario from the static dataset, then train a PyTorch LSTM classifier.

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

print(f'PyTorch version: {torch.__version__}')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

In [None]:
# Create time-series sequences from tabular data
# Each "patient" becomes a sequence of 5 time steps with slight noise
# (simulating repeated sensor readings)

def create_sequences(X, y, seq_len=5, noise_std=0.05):
    """Convert static features to simulated time-series sequences."""
    sequences = []
    for i in range(len(X)):
        seq = []
        for t in range(seq_len):
            noise = np.random.normal(0, noise_std, X.shape[1])
            seq.append(X[i] + noise)
        sequences.append(seq)
    return np.array(sequences, dtype=np.float32), np.array(y, dtype=np.float32)

SEQ_LEN = 5
X_train_seq, y_train_seq = create_sequences(X_train_scaled, y_train, seq_len=SEQ_LEN)
X_test_seq, y_test_seq = create_sequences(X_test_scaled, y_test, seq_len=SEQ_LEN)

# Convert to PyTorch tensors
train_dataset = TensorDataset(torch.from_numpy(X_train_seq), torch.from_numpy(y_train_seq))
test_dataset = TensorDataset(torch.from_numpy(X_test_seq), torch.from_numpy(y_test_seq))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f'Sequence shape: {X_train_seq.shape}  (samples, time_steps, features)')

In [None]:
# LSTM Model
class VitalSignsLSTM(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.3):
        super(VitalSignsLSTM, self).__init__()
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        # x shape: (batch, seq_len, features)
        lstm_out, (hidden, _) = self.lstm(x)
        # Use last hidden state for classification
        last_hidden = hidden[-1]  # (batch, hidden_size)
        output = self.classifier(last_hidden)
        return output.squeeze()


INPUT_SIZE = X_train_scaled.shape[1]  # number of features
model = VitalSignsLSTM(input_size=INPUT_SIZE).to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Model architecture:')
print(model)
print(f'\nTotal trainable parameters: {total_params:,}')

In [None]:
# Training
EPOCHS = 50
LEARNING_RATE = 0.001

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.5)

train_losses, val_losses = [], []
train_accs, val_accs = [], []

for epoch in range(EPOCHS):
    # ── Train ──
    model.train()
    train_loss, train_correct = 0.0, 0
    
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        train_loss += loss.item()
        train_correct += ((outputs > 0.5) == y_batch.bool()).sum().item()
    
    # ── Validate ──
    model.eval()
    val_loss, val_correct = 0.0, 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            val_correct += ((outputs > 0.5) == y_batch.bool()).sum().item()
    
    scheduler.step()
    
    train_losses.append(train_loss / len(train_loader))
    val_losses.append(val_loss / len(test_loader))
    train_accs.append(train_correct / len(y_train_seq))
    val_accs.append(val_correct / len(y_test_seq))
    
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1:3d}/{EPOCHS}] '
              f'Train Loss: {train_losses[-1]:.4f} | '
              f'Val Loss: {val_losses[-1]:.4f} | '
              f'Train Acc: {train_accs[-1]:.3f} | '
              f'Val Acc: {val_accs[-1]:.3f}')

In [None]:
# Training curves
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(13, 5))

ax1.plot(train_losses, label='Train Loss', color='#1F4E8A')
ax1.plot(val_losses, label='Validation Loss', color='#F44336')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('BCE Loss')
ax1.set_title('Training & Validation Loss')
ax1.legend()
ax1.grid(alpha=0.3)

ax2.plot(train_accs, label='Train Accuracy', color='#1F4E8A')
ax2.plot(val_accs, label='Validation Accuracy', color='#F44336')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.set_title('Training & Validation Accuracy')
ax2.legend()
ax2.grid(alpha=0.3)

plt.suptitle('LSTM Training Curves — Vital Signs Classification', fontweight='bold')
plt.tight_layout()
plt.savefig('lstm_training_curves.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Final LSTM evaluation
model.eval()
all_probs, all_preds = [], []

with torch.no_grad():
    for X_batch, _ in test_loader:
        X_batch = X_batch.to(device)
        probs = model(X_batch).cpu().numpy()
        all_probs.extend(probs)
        all_preds.extend((probs > 0.5).astype(int))

lstm_auc = roc_auc_score(y_test, all_probs)
print('LSTM Final Evaluation')
print('=' * 50)
print(classification_report(y_test, all_preds, target_names=['No Disease', 'Heart Disease']))
print(f'ROC-AUC: {lstm_auc:.4f}')

# Add LSTM to results for final comparison
results['LSTM (PyTorch)'] = {
    'y_pred': all_preds,
    'y_prob': all_probs,
    'roc_auc': lstm_auc,
    'avg_precision': average_precision_score(y_test, all_probs),
}

## 6. Final Comparison & Summary

In [None]:
# Summary table
summary_rows = []
for name, res in results.items():
    cm = confusion_matrix(y_test, res['y_pred'])
    tn, fp, fn, tp = cm.ravel()
    sensitivity = tp / (tp + fn)  # recall for disease class — critical in clinical settings
    specificity = tn / (tn + fp)
    summary_rows.append({
        'Model': name,
        'ROC-AUC': f"{res['roc_auc']:.4f}",
        'Avg Precision': f"{res['avg_precision']:.4f}",
        'Sensitivity': f"{sensitivity:.4f}",
        'Specificity': f"{specificity:.4f}",
    })

summary_df = pd.DataFrame(summary_rows).set_index('Model')
print('=' * 70)
print('FINAL MODEL COMPARISON')
print('=' * 70)
print(summary_df.to_string())
print('\nNote: In clinical settings, Sensitivity (true positive rate) is')
print('often prioritized — missing a disease case is more costly than a false alarm.')

In [None]:
# Final ROC comparison including LSTM
plt.figure(figsize=(9, 7))
plt.plot([0, 1], [0, 1], 'k--', alpha=0.4, label='Random baseline')

colors = ['#1F4E8A', '#E53935', '#2E7D32', '#F57F17', '#6A1B9A']
for (name, res), color in zip(results.items(), colors):
    fpr, tpr, _ = roc_curve(y_test, res['y_prob'])
    plt.plot(fpr, tpr, color=color, linewidth=2,
             label=f"{name} (AUC={res['roc_auc']:.3f})")

plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate (Sensitivity)', fontsize=12)
plt.title('ROC Curves — All Models\n(Vital Signs Anomaly Detection)', fontsize=13, fontweight='bold')
plt.legend(loc='lower right')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('final_roc_comparison.png', dpi=150, bbox_inches='tight')
plt.show()

print('\nAll figures saved to current directory.')

## 7. Key Findings & Next Steps

### What this project demonstrates
- Full ML pipeline from raw biomedical data to validated clinical models
- Comparison of classical (sklearn) and deep learning (PyTorch) approaches
- Unsupervised anomaly detection for real-world IoT scenarios without labeled data
- Clinical validation metrics — sensitivity, specificity, ROC-AUC, precision-recall

### Key insight
In clinical settings, model validation is not just about accuracy — **sensitivity** (not missing disease cases) must be weighted differently than specificity. This tradeoff is central to Dr. Ayhan's research on rigorous clinical AI validation.

### Connection to IoT pipeline
The Patient Health Monitoring System collects real-time ESP32 sensor data. This ML pipeline is the next layer — replacing threshold-based anomaly detection with learned models that adapt to patient-specific baselines.

### Next steps
1. **Real sensor data** — replace UCI dataset with actual ESP32 readings from Patient_Health_Monitoring
2. **Personalized baselines** — train per-patient models to detect deviations from individual normal ranges
3. **Real-time inference** — deploy model on Raspberry Pi 4 for edge inference without cloud dependency
4. **Federated learning** — train across multiple patient devices without centralizing sensitive data
