In [1]:
get_ipython().system('pip install pandas numpy matplotlib torch scikit-learn')



In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Re-load raw data (to compare predictions vs true HR)
df = pd.read_csv("sample_heart_rate.csv", parse_dates=["timestamp"])
timestamps = df["timestamp"]
hr_raw = df["heart_rate"].values.astype(np.float32)

# Normalize using saved meta
hr_norm = (hr_raw - meta["mean"]) / meta["std"]

# Create sliding windows again for full-sequence predictions
window = meta["window_size"]
sequences = []
for i in range(len(hr_norm) - window):
    seq = hr_norm[i : i + window]
    sequences.append(seq)

X_full = np.array(sequences)[..., np.newaxis]  # (N, window, 1)
X_full = torch.tensor(X_full, dtype=torch.float32)

# Predict with model
model.eval()
with torch.no_grad():
    preds = model(X_full).numpy().flatten()

# Expand predictions back to original scale
preds_unscaled = preds * meta["std"] + meta["mean"]
hr_aligned = hr_raw[window:]   # true data aligned to predictions

# Compute residuals
residuals = np.abs(hr_aligned - preds_unscaled)

# Detect anomalies: threshold = mean + 2.5 * std of residuals
threshold = residuals.mean() + 2.5 * residuals.std()
anomalies = residuals > threshold

# Plot everything
plt.figure(figsize=(15, 6))
plt.plot(timestamps[window:], hr_aligned, label="Actual HR", linewidth=2)
plt.plot(timestamps[window:], preds_unscaled, label="Predicted HR", linestyle="--")
plt.scatter(
    timestamps[window:][anomalies],
    hr_aligned[anomalies],
    color="red",
    label="Anomaly Detected",
    s=80,
)
plt.xlabel("Time")
plt.ylabel("Heart Rate (bpm)")
plt.title("Heart Rate Anomaly Detection (LSTM-based)")
plt.legend()
plt.grid(True)

plt.savefig("anomaly_detection_plot.png", dpi=200)
plt.show()

print("Anomaly plot saved as anomaly_detection_plot.png")

In [5]:
import torch
import torch.optim as optim
import torch.nn as nn

# Load the data from earlier
X_train, y_train, X_test, y_test, meta = load_heart_rate_data()

# Instantiate model, loss, optimizer
model = LSTMHeartRateModel(input_dim=1, hidden_dim=32)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

EPOCHS = 40

train_losses = []
test_losses = []

for epoch in range(EPOCHS):
    model.train()
    optimizer.zero_grad()

    preds = model(X_train)
    loss = criterion(preds, y_train)
    loss.backward()
    optimizer.step()

    train_losses.append(loss.item())

    # Evaluate on test set
    model.eval()
    with torch.no_grad():
        test_pred = model(X_test)
        test_loss = criterion(test_pred, y_test)
        test_losses.append(test_loss.item())

    if epoch % 5 == 0:
        print(f"Epoch {epoch}/{EPOCHS} | Train Loss: {loss.item():.4f} | Test Loss: {test_loss.item():.4f}")

print("\nTraining complete.")

Epoch 0/40 | Train Loss: 1.5320 | Test Loss: 0.4654
Epoch 5/40 | Train Loss: 1.1125 | Test Loss: 1.1050
Epoch 10/40 | Train Loss: 0.9080 | Test Loss: 1.9379
Epoch 15/40 | Train Loss: 0.7294 | Test Loss: 1.8441
Epoch 20/40 | Train Loss: 0.5500 | Test Loss: 2.8880
Epoch 25/40 | Train Loss: 0.3838 | Test Loss: 2.9799
Epoch 30/40 | Train Loss: 0.3439 | Test Loss: 2.7150
Epoch 35/40 | Train Loss: 0.2795 | Test Loss: 2.2280

Training complete.


In [4]:
import torch
import torch.nn as nn

class LSTMHeartRateModel(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=32, num_layers=1):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        out, _ = self.lstm(x)       # (batch, seq, hidden)
        out = out[:, -1, :]         # last timestep
        out = self.fc(out)          # regression output
        return out

print("Model class ready.")

Model class ready.


In [3]:
import pandas as pd
import numpy as np
import torch

def load_heart_rate_data(
    csv_path="sample_heart_rate.csv",
    resample_freq="1min",
    window_size=5,
    test_size=0.2
):
    df = pd.read_csv(csv_path, parse_dates=["timestamp"])
    df = df.sort_values("timestamp").set_index("timestamp")

    # Resample to ensure uniform intervals
    df = df.resample(resample_freq).mean().interpolate()

    # Normalize HR
    hr = df["heart_rate"].values.astype(np.float32)
    mean, std = hr.mean(), hr.std() if hr.std() > 0 else 1.0
    hr_norm = (hr - mean) / std

    # Sliding window sequences
    sequences, labels = [], []
    for i in range(len(hr_norm) - window_size):
        seq = hr_norm[i : i + window_size]
        label = hr_norm[i + window_size]
        sequences.append(seq)
        labels.append(label)

    X = np.array(sequences)[..., np.newaxis]   # (N, seq, 1)
    y = np.array(labels)[..., np.newaxis]      # (N, 1)

    # Train-test split (no shuffle because time-series)
    split_idx = int(len(X) * (1 - test_size))
    X_train, X_test = X[:split_idx], X[split_idx:]
    y_train, y_test = y[:split_idx], y[split_idx:]

    # Convert to tensors
    return (
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.float32),
        torch.tensor(X_test,  dtype=torch.float32),
        torch.tensor(y_test,  dtype=torch.float32),
        {"mean": mean, "std": std, "window_size": window_size}
    )

print("Data loader ready.")

Data loader ready.


In [2]:
import pandas as pd

data = {
    "timestamp": [
        "2025-01-01 00:00:00","2025-01-01 00:01:00","2025-01-01 00:02:00","2025-01-01 00:03:00",
        "2025-01-01 00:04:00","2025-01-01 00:05:00","2025-01-01 00:06:00","2025-01-01 00:07:00",
        "2025-01-01 00:08:00","2025-01-01 00:09:00","2025-01-01 00:10:00","2025-01-01 00:11:00",
        "2025-01-01 00:12:00","2025-01-01 00:13:00","2025-01-01 00:14:00","2025-01-01 00:15:00",
        "2025-01-01 00:16:00","2025-01-01 00:17:00","2025-01-01 00:18:00","2025-01-01 00:19:00",
        "2025-01-01 00:20:00"
    ],
    "heart_rate": [
        72,73,72,74,75,76,110,112,114,80,78,77,120,118,117,75,76,74,73,72,71
    ]
}

df = pd.DataFrame(data)
df.to_csv("sample_heart_rate.csv", index=False)

df.head(), df.tail()

(             timestamp  heart_rate
 0  2025-01-01 00:00:00          72
 1  2025-01-01 00:01:00          73
 2  2025-01-01 00:02:00          72
 3  2025-01-01 00:03:00          74
 4  2025-01-01 00:04:00          75,
               timestamp  heart_rate
 16  2025-01-01 00:16:00          76
 17  2025-01-01 00:17:00          74
 18  2025-01-01 00:18:00          73
 19  2025-01-01 00:19:00          72
 20  2025-01-01 00:20:00          71)