In [1]:
import os
import random
import numpy as np
import h5py
import threading
import time
import wfdb
import neurokit2 as nk
import tensorflow as tf
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from scipy.signal import butter, filtfilt, find_peaks, convolve
from scipy.spatial.distance import squareform, pdist
from sklearn.utils import class_weight
from skimage.transform import resize
from collections import deque
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam

In [2]:
def get_rri(qrs_raw_ind, fs):
    rri = np.diff(qrs_raw_ind) / fs  # convert indices to time in seconds
    rri_milliseconds = rri * 1000
    return rri_milliseconds

def neurakit_rri(record, fs):
    signals, info = nk.ecg_process(record, sampling_rate=fs)
    rpeaks = info["ECG_R_Peaks"]
    # rpeak_amplitudes = record[rpeaks]
    rri = get_rri(rpeaks, fs)
    return rri

def normalize_to_gray_scale(matrix):
    min_val = np.min(matrix)
    max_val = np.max(matrix)
    normalized_matrix = (matrix - min_val) / (max_val - min_val + 1e-8)  # Avoid division by zero
    return (normalized_matrix*255).astype(np.uint8)

def rp_plot(rri, delay, embedding_dim):
    N= len(rri)
    Nrp = N - (embedding_dim - 1) * delay
    embedded_rri = np.array([rri[i: i + embedding_dim * delay: delay] for i in range(Nrp)])
    distances = squareform(pdist(embedded_rri, metric='euclidean'))
    return distances


In [3]:
fs = 128
window_size = 128 * 30
chunk_size = 128 * 15
length_ecg= 0

buffer = deque(maxlen=window_size)
lock = threading.Lock()
model_path = "/Users/weijithwimalasiri/Desktop/WARN/NN_weights/WEIGHTS.hdf5"
record  = wfdb.rdrecord('/Users/weijithwimalasiri/Desktop/JustForFun/RPS/physionet.org/files/afpdb/1.0.0/p01')
ecg = record.p_signal[:, 0]  
total_samples = len(ecg)
fs = record.fs
model = load_model(model_path)
stream_index = [0]
stop_threads = threading.Event()

def stream_data(ecg, total_samples):
    global buffer
    while stream_index[0] + chunk_size <=  total_samples and not stop_threads.is_set():
        try: 
            with lock: 
                data_chunk = ecg[stream_index[0]:stream_index[0] + chunk_size]
                print(f"[Stream] Processing chunk from {stream_index[0]} to {stream_index[0] + chunk_size}")
                buffer.extend(data_chunk)
                stream_index[0] += chunk_size
                print(f"[Stream] Added samples. Buffer size: {len(buffer)}")
        except Exception as e:
            print(f"[ERROR] Inference error: {e}")
        time.sleep(15)

def inference_pipeline(model, img_size=(224, 224)):
    last_pred_index = -1 
    while not stop_threads.is_set():
        try: 
            with lock:
                if len(buffer) == window_size and stream_index[0] != last_pred_index:
                    window = np.array(buffer)

                    rri = neurakit_rri(window, fs)
                    distances = rp_plot(rri, delay= 3, embedding_dim=3)
                    rp_gray = normalize_to_gray_scale(distances)
                    rp_img = resize(rp_gray, img_size, anti_aliasing=True, preserve_range=True).astype(np.uint8)
                    input_tensor = np.expand_dims(rp_img, axis=(0, -1)) 

                    pred = model.predict(input_tensor) 
                    pred_class = np.argmax(pred)
                    confidence = pred[0][pred_class]
                    print(f"Predicted class: {pred_class} with confidence {confidence:.2f}")
                    last_pred_index = stream_index[0]
        except Exception as e:
            print(f"[ERROR] Inference error: {e}")
        time.sleep(15)  

threading.Thread(target=stream_data, args=(ecg, total_samples), daemon=True).start()
threading.Thread(target=inference_pipeline, args=(model,), daemon=True).start()

# Keep main thread alive
try:
    while stream_index[0] + chunk_size <= total_samples:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping threads...")
    stop_threads.set()

[Stream] Processing chunk from 0 to 1920
[Stream] Added samples. Buffer size: 1920
[Stream] Processing chunk from 1920 to 3840
[Stream] Added samples. Buffer size: 3840


2025-07-21 15:33:21.811740: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


Predicted class: 1 with confidence 0.85
Stopping threads...


In [None]:
fs = 128
window_size = 128 * 30
chunk_size = 128 * 15
length_ecg= 0

buffer = deque(maxlen=window_size)
lock = threading.Lock()
model_path = "/Users/weijithwimalasiri/Desktop/WARN/NN_weights/WEIGHTS.hdf5"
record  = wfdb.rdrecord('/Users/weijithwimalasiri/Desktop/JustForFun/RPS/physionet.org/files/afpdb/1.0.0/p01')
ecg = record.p_signal[:, 0]  
total_samples = len(ecg)
fs = record.fs

# Stream data and inference without threads
for start_idx in range(0, total_samples - chunk_size + 1, chunk_size):
    data_chunk = ecg[start_idx:start_idx + chunk_size]
    buffer.extend(data_chunk)
    print(f"[Stream] Processing chunk from {start_idx} to {start_idx + chunk_size}")
    print(f"[Stream] Added samples. Buffer size: {len(buffer)}")

    if len(buffer) == window_size:
        window = np.array(buffer)
        rri = neurakit_rri(window, fs)
        distances = rp_plot(rri, delay=3, embedding_dim=3)
        rp_gray = normalize_to_gray_scale(distances)
        rp_img = resize(rp_gray, (224, 224), anti_aliasing=True, preserve_range=True).astype(np.uint8)
        input_tensor = np.expand_dims(rp_img, axis=(0, -1))
        pred = model.predict(input_tensor)
        pred_class = np.argmax(pred)
        confidence = pred[0][pred_class]
        print(f"Predicted class: {pred_class} with confidence {confidence:.2f}")
model = load_model(model_path)
stream_index = [0]
stop_threads = threading.Event()

def stream_data(ecg, total_samples):
    global buffer
    while stream_index[0] + chunk_size <=  total_samples and not stop_threads.is_set():
        try: 
            with lock: 
                data_chunk = ecg[stream_index[0]:stream_index[0] + chunk_size]
                print(f"[Stream] Processing chunk from {stream_index[0]} to {stream_index[0] + chunk_size}")
                buffer.extend(data_chunk)
                stream_index[0] += chunk_size
                print(f"[Stream] Added samples. Buffer size: {len(buffer)}")
        except Exception as e:
            print(f"[ERROR] Inference error: {e}")
        time.sleep(15)

def inference_pipeline(model, img_size=(224, 224)):
    last_pred_index = -1 
    while not stop_threads.is_set():
        try: 
            with lock:
                if len(buffer) == window_size and stream_index[0] != last_pred_index:
                    window = np.array(buffer)

                    rri = neurakit_rri(window, fs)
                    distances = rp_plot(rri, delay= 3, embedding_dim=3)
                    rp_gray = normalize_to_gray_scale(distances)
                    rp_img = resize(rp_gray, img_size, anti_aliasing=True, preserve_range=True).astype(np.uint8)
                    input_tensor = np.expand_dims(rp_img, axis=(0, -1)) 

                    pred = model.predict(input_tensor) 
                    pred_class = np.argmax(pred)
                    confidence = pred[0][pred_class]
                    print(f"Predicted class: {pred_class} with confidence {confidence:.2f}")
                    last_pred_index = stream_index[0]
        except Exception as e:
            print(f"[ERROR] Inference error: {e}")
        time.sleep(15)  

threading.Thread(target=stream_data, args=(ecg, total_samples), daemon=True).start()
threading.Thread(target=inference_pipeline, args=(model,), daemon=True).start()

# Keep main thread alive
try:
    while stream_index[0] + chunk_size <= total_samples:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping threads...")
    stop_threads.set()

In [None]:
def generator(folder, files, batch_size):
    while True:
        random.shuffle(files)
        for current_file in files:
            with h5py.File(os.path.join(folder, current_file), 'r') as hf:
                x = tf.convert_to_tensor(hf['x'])
                y = tf.convert_to_tensor(hf['y'])
                for i in range(0, len(y), batch_size):
                    x_batch = x[i:i + batch_size]
                    y_batch = y[i:i + batch_size]
                    if len(x_batch) == batch_size:
                        yield x_batch, y_batch

def get_model(input_shape = (224, 224, 1), lr=1e-5):
    base_model = tf.keras.applications.EfficientNetV2S(
        include_top=False,
        weights = None, 
        input_shape=input_shape,
    )

    x = base_model.output
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    predictions = tf.keras.layers.Dense(3, activation='softmax')(x)
    model = tf.keras.Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(learning_rate=lr),
                  loss='categorical_crossentropy',
                  metrics=['accuracy', 'AUC'])
    return model

In [None]:
model_path = "/Users/weijithwimalasiri/Desktop/WARN/NN_weights/WEIGHTS.hdf5"
model = load_model(model_path)
print(model.summary())