In [1]:
import time
import json
import joblib
import numpy as np
import pandas as pd
from kafka import KafkaConsumer
from utils import format_ecg_input, update_drift_window, check_drift
from retraining_module import retrain_model

# Load the latest model
model = joblib.load("models/model_v1.pkl")

# Initialize Kafka Consumer
consumer = KafkaConsumer(
    'ecg_stream',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='latest',
    enable_auto_commit=True
)

print("Listening to Kafka topic 'ecg_stream'...")

# Drift and logging config
WINDOW_SIZE = 500
drift_window = []
prediction_log = []
MAX_IDLE_TIME = 60  # seconds
last_message_time = time.time()
csv_output_path = "data/prediction_log.csv"

while True:
    message_pack = consumer.poll(timeout_ms=1000)

    if message_pack:
        for tp, messages in message_pack.items():
            for message in messages:
                try:
                    last_message_time = time.time()

                    data = message.value
                    ecg1 = float(data["ECG1"])
                    ecg2 = float(data["ECG2"])
                    timestamp = data["timestamp"]

                    # Format input and predict
                    x = format_ecg_input(ecg1, ecg2)
                    prediction = model.predict(x)[0]
                    label = "Anomaly" if prediction == -1 else "Normal"

                    print(f"[{timestamp}] ECG1={ecg1:.4f}, ECG2={ecg2:.4f} → {label}")

                    # Save prediction to list
                    prediction_log.append({
                        "timestamp": timestamp,
                        "ECG1": ecg1,
                        "ECG2": ecg2,
                        "label": label
                    })

                    # Update drift window
                    drift_window = update_drift_window(drift_window, [ecg1, ecg2])

                    # Check for drift
                    if check_drift(drift_window):
                        print("Drift detected! Retraining model...")
                        new_model_path = retrain_model(drift_window)
                        model = joblib.load(new_model_path)
                        drift_window = []

                except Exception as e:
                    print(f"Error: {e}")
    else:
        if time.time() - last_message_time > MAX_IDLE_TIME:
            print("No messages in last 60 seconds — writing log and exiting.")
            
            # Save predictions to CSV
            if prediction_log:
                pd.DataFrame(prediction_log).to_csv(csv_output_path, index=False)
                print(f"Saved predictions to {csv_output_path}")
            else:
                print("No predictions to save.")
            break

Listening to Kafka topic 'ecg_stream'...




[0.703125] ECG1=-0.1150, ECG2=-0.0550 → Normal
[0.7109375] ECG1=-0.1550, ECG2=-0.0450 → Normal
[0.71875] ECG1=-0.1050, ECG2=-0.0650 → Normal
[0.7265625] ECG1=-0.1250, ECG2=-0.0450 → Normal
[0.734375] ECG1=-0.1350, ECG2=-0.0350 → Normal
[0.7421875] ECG1=-0.1250, ECG2=-0.0450 → Normal




[0.75] ECG1=-0.1150, ECG2=-0.0450 → Normal
[0.7578125] ECG1=-0.1050, ECG2=-0.0350 → Normal
[0.765625] ECG1=-0.1350, ECG2=-0.0250 → Normal
[0.7734375] ECG1=-0.1350, ECG2=-0.0350 → Normal
[0.78125] ECG1=-0.1350, ECG2=-0.0550 → Normal




[0.7890625] ECG1=-0.1450, ECG2=-0.0550 → Normal
[0.796875] ECG1=-0.1250, ECG2=-0.0350 → Normal
[0.8046875] ECG1=-0.1150, ECG2=-0.0550 → Normal
[0.8125] ECG1=-0.1350, ECG2=-0.0650 → Normal
[0.8203125] ECG1=-0.1350, ECG2=-0.0650 → Normal




[0.828125] ECG1=-0.1650, ECG2=-0.0550 → Normal
[0.8359375] ECG1=-0.1550, ECG2=-0.0350 → Normal
[0.84375] ECG1=-0.1150, ECG2=-0.0450 → Normal
[0.8515625] ECG1=-0.0850, ECG2=-0.0050 → Normal




[0.859375] ECG1=-0.0550, ECG2=0.0350 → Normal
[0.8671875] ECG1=-0.0550, ECG2=0.0550 → Normal
[0.875] ECG1=-0.0450, ECG2=0.0250 → Normal
[0.8828125] ECG1=0.0650, ECG2=-0.0550 → Normal
[0.890625] ECG1=0.0050, ECG2=-0.0550 → Normal




[0.8984375] ECG1=-0.0150, ECG2=-0.0850 → Normal
[0.90625] ECG1=-0.0550, ECG2=-0.0650 → Normal
[0.9140625] ECG1=-0.1150, ECG2=-0.0450 → Normal
[0.921875] ECG1=-0.1350, ECG2=-0.0150 → Normal
[0.9296875] ECG1=-0.1350, ECG2=-0.0550 → Normal




[0.9375] ECG1=-0.1750, ECG2=-0.0750 → Normal
[0.9453125] ECG1=-0.2050, ECG2=-0.0750 → Normal
[0.953125] ECG1=-0.2250, ECG2=-0.0550 → Normal
[0.9609375] ECG1=-0.1750, ECG2=-0.0550 → Normal
[0.96875] ECG1=-0.1750, ECG2=-0.0550 → Normal




[0.9765625] ECG1=-0.1550, ECG2=-0.0550 → Normal
[0.984375] ECG1=-0.2550, ECG2=0.1050 → Normal
[0.9921875] ECG1=-0.8750, ECG2=0.1650 → Normal
[1.0] ECG1=-0.6450, ECG2=-0.1050 → Normal




[1.0078125] ECG1=0.6450, ECG2=-0.4450 → Normal
[1.015625] ECG1=2.3150, ECG2=-0.2450 → Normal
[1.0234375] ECG1=2.9250, ECG2=0.0450 → Normal
[1.03125] ECG1=2.7250, ECG2=0.4450 → Normal




[1.0390625] ECG1=1.2250, ECG2=-0.6750 → Normal
[1.046875] ECG1=-0.1850, ECG2=-0.7850 → Normal
[1.0546875] ECG1=-0.2750, ECG2=-0.2150 → Normal
[1.0625] ECG1=-0.2750, ECG2=-0.0350 → Normal
[1.0703125] ECG1=-0.2850, ECG2=-0.0250 → Normal
Drift detected! Retraining model...
Final model retrained and saved as final_model.pkl
[1.078125] ECG1=-0.2850, ECG2=-0.0450 → Normal
[1.0859375] ECG1=-0.3350, ECG2=-0.0450 → Normal
[1.09375] ECG1=-0.3050, ECG2=-0.0450 → Normal
[1.1015625] ECG1=-0.2850, ECG2=-0.0350 → Normal
[1.109375] ECG1=-0.2750, ECG2=-0.0550 → Normal
Drift detected! Retraining model...
Final model retrained and saved as final_model.pkl
[1.1171875] ECG1=-0.2750, ECG2=-0.0550 → Anomaly
[1.125] ECG1=-0.2450, ECG2=-0.0650 → Anomaly
[1.1328125] ECG1=-0.2450, ECG2=-0.0450 → Normal
[1.140625] ECG1=-0.2250, ECG2=-0.0750 → Anomaly
[1.1484375] ECG1=-0.1950, ECG2=-0.0850 → Anomaly
Drift detected! Retraining model...
Final model retrained and saved as final_model.pkl
[1.15625] ECG1=-0.2050, ECG2=