In [None]:
import cv2
import numpy as np
import os
import time
import joblib
import matplotlib.pyplot as plt

from scipy.signal import find_peaks, butter, filtfilt, welch
from scipy.ndimage import gaussian_filter1d

from sklearn.linear_model import SGDRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Config
FPS = 30
DURATION = 30
WINDOW_SIZE = 90
STEP_SIZE = 45
MIN_SAMPLES = 5
METRICS = ['heart_rate', 'hrv', 'resp_rate', 'pulse_amp']
VALIDATION_RATIO = 0.2
DRIFT_THRESHOLD = 0.15  # Concept drift threshold (MSE increase)

# Historical MSE for drift tracking
history_mse = {metric: [] for metric in METRICS}

def record_video(filename='ppg_capture.avi', duration=DURATION, fps=FPS):
    cap = cv2.VideoCapture(0)
    width, height = int(cap.get(3)), int(cap.get(4))
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(filename, fourcc, fps, (width, height))
    print(f"Recording for {duration} seconds. Cover camera with your fingertip.")
    start_time = time.time()
    while (time.time() - start_time) < duration:
        ret, frame = cap.read()
        if not ret: break
        remaining = max(0, int(duration - (time.time() - start_time)))
        cv2.putText(frame, f"Recording: {remaining}s", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
        out.write(frame)
        cv2.imshow('Recording (Press Q to stop)', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'): break
    cap.release()
    out.release()
    cv2.destroyAllWindows()

def process_signal(signal, fps):
    nyq = 0.5 * fps
    b, a = butter(2, [0.5/nyq, 4.0/nyq], btype='band')
    filtered = filtfilt(b, a, signal)
    return (filtered - np.mean(filtered)) / np.std(filtered)

def extract_features(signal, fps):
    processed = process_signal(signal, fps)
    features = []
    for i in range(0, len(processed) - WINDOW_SIZE, STEP_SIZE):
        window = processed[i:i+WINDOW_SIZE]
        fft = np.abs(np.fft.fft(window))
        freqs = np.fft.fftfreq(len(window), 1/fps)
        features.append([
            np.mean(window),
            np.std(window),
            freqs[np.argmax(fft)],
            np.sum(fft**2),
            len(find_peaks(window)[0]),
            np.percentile(window, 75)
        ])
    return np.array(features)

def traditional_measures(signal, fps):
    smoothed = gaussian_filter1d(signal - np.mean(signal), 2)
    peaks, _ = find_peaks(smoothed, distance=fps * 0.5)
    duration_secs = len(signal) / fps
    hr = len(peaks) / duration_secs * 60
    rr_intervals = np.diff(peaks) / fps * 1000
    hrv = np.std(rr_intervals)
    freqs, psd = welch(smoothed, fs=fps, nperseg=256)
    resp_band = (freqs > 0.1) & (freqs < 0.5)
    resp_rate = freqs[resp_band][np.argmax(psd[resp_band])] * 60
    pulse_amplitude = np.max(smoothed) - np.min(smoothed)
    return hr, hrv, resp_rate, pulse_amplitude

def load_or_init_models(metrics):
    models = {}
    for metric in metrics:
        model_file = f"{metric}_model.pkl"
        if os.path.exists(model_file):
            models[metric] = joblib.load(model_file)
        else:
            model = make_pipeline(
                StandardScaler(),
                SGDRegressor(learning_rate='adaptive', eta0=0.01, power_t=0.25, warm_start=True)
            )
            models[metric] = model
    return models

def train_models_online(models, X, y_true, val_X, val_y):
    for i, metric in enumerate(METRICS):
        model = models[metric]
        scaler = model.named_steps['standardscaler']
        regressor = model.named_steps['sgdregressor']

        if not hasattr(scaler, 'mean_'):
            scaler.fit(X)
        X_scaled = scaler.transform(X)
        val_X_scaled = scaler.transform(val_X)

        for x_scaled in X_scaled:
            regressor.partial_fit(x_scaled.reshape(1, -1), [y_true[i]])

        preds = regressor.predict(val_X_scaled)
        val_target = np.array([val_y[i]] * len(val_X_scaled))
        mse = mean_squared_error(val_target, preds)
        history_mse[metric].append(mse)

        print(f"[Validation] {metric} MSE: {mse:.4f}")

        if len(history_mse[metric]) >= 5:
            recent = history_mse[metric][-3:]
            old_avg = np.mean(history_mse[metric][:-3])
            recent_avg = np.mean(recent)
            if old_avg and abs(recent_avg - old_avg) / old_avg > DRIFT_THRESHOLD:
                print(f"[Drift Warning] Significant change in '{metric}' performance. Consider model reset or retraining.")

        joblib.dump(model, f"{metric}_model.pkl")

def predict_metrics(models, X):
    predictions = {}
    X_avg = X.mean(axis=0).reshape(1, -1)
    for metric in METRICS:
        predictions[metric] = models[metric].predict(X_avg)[0]
    return predictions

def evaluate_model(preds, truths):
    print("\n--- Model Evaluation (vs Traditional) ---")
    for i, metric in enumerate(METRICS):
        true_val = truths[i]
        pred_val = preds[metric]
        mae = mean_absolute_error([true_val], [pred_val])
        mse = mean_squared_error([true_val], [pred_val])
        rel_error = abs(true_val - pred_val) / true_val * 100 if true_val != 0 else 0

        print(f"{metric.capitalize()}:")
        print(f"  MAE         = {mae:.2f}")
        print(f"  MSE         = {mse:.2f}")
        print(f"  Relative Error (%) = {rel_error:.2f}%")

# --- Main Execution ---

record_video()

cap = cv2.VideoCapture('ppg_capture.avi')
signal_data = []
while True:
    ret, frame = cap.read()
    if not ret:
        break
    cyan_signal = np.mean(frame[:, :, [0, 1]], axis=2)  # Cyan = average of blue and green
    signal_data.append(np.mean(cyan_signal))
cap.release()
signal_data = np.array(signal_data)

X_new = extract_features(signal_data, fps=FPS)
if len(X_new) < MIN_SAMPLES:
    raise ValueError(f"Need at least {MIN_SAMPLES} windows. Got {len(X_new)}")

split_idx = int((1 - VALIDATION_RATIO) * len(X_new))
X_train, X_val = X_new[:split_idx], X_new[split_idx:]

true_vals = traditional_measures(signal_data, fps=FPS)
models = load_or_init_models(METRICS)
train_models_online(models, X_train, true_vals, X_val, true_vals)

ml_preds = predict_metrics(models, X_new)

print("\n--- PPG Analysis Results (Traditional) ---")
print(f"Heart Rate (BPM): {true_vals[0]:.2f}")
print(f"Heart Rate Variability (ms): {true_vals[1]:.2f}")
print(f"Respiratory Rate (breaths/min): {true_vals[2]:.2f}")
print(f"Pulse Amplitude: {true_vals[3]:.2f}")

print("\n--- ML Predictions ---")
print(f"Heart Rate (BPM): {ml_preds['heart_rate']:.2f}")
print(f"Heart Rate Variability (ms): {ml_preds['hrv']:.2f}")
print(f"Respiratory Rate (breaths/min): {ml_preds['resp_rate']:.2f}")
print(f"Pulse Amplitude: {ml_preds['pulse_amp']:.2f}")

evaluate_model(ml_preds, true_vals)

# Plot signal and peaks
smoothed_signal = gaussian_filter1d(signal_data - np.mean(signal_data), sigma=2)
peaks, _ = find_peaks(smoothed_signal, distance=FPS * 0.5)

plt.figure(figsize=(12, 6))
plt.plot(smoothed_signal, label='Smoothed PPG Signal (Cyan)', color='c')
plt.plot(peaks, smoothed_signal[peaks], 'rx', label='Detected Peaks')
plt.title("Traditional PPG Signal Analysis")
plt.xlabel("Frame")
plt.ylabel("Filtered Cyan Channel Intensity")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
