In [19]:
import os
import csv
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import cvnn.layers as complex_layers
from tensorflow.keras.models import Sequential
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from imblearn.under_sampling import RandomUnderSampler
import mlflow
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
from tensorflow import convert_to_tensor, complex64, expand_dims
import seaborn as sns
from scipy import signal


In [20]:
WINDOW_SIZE = 360
TEST_SIZE = 0.25
EPOCHS = 100
BATCH_SIZE = 128

In [21]:
path = "Dataset/mitbih_database"
filenames = next(os.walk(path))[2]
records=list()
annotations=list()
filenames.sort()
for f in filenames:
    filename, file_extension = os.path.splitext(f)
    if(file_extension=='.csv'):
        records.append(path+'/'+filename+file_extension)
    else:
        annotations.append(path+'/'+filename+file_extension)

In [22]:
def get_record_signals(index):
    signals = []
    labels = []
    with open(records[index],'r') as csvfile:
        filereader = csv.reader(csvfile,delimiter=',',quotechar='|')
        row_index = -1
        for row in filereader:
            if(row_index >= 0):
                signals.insert(row_index, int(row[1]))
            row_index += 1
        signals = np.array(signals)
    with open(annotations[index],'r') as csvfile:
        filereader = csv.reader(csvfile,delimiter=',',quotechar='|')
        row_index = -1
        for row in filereader:
            if(row_index >= 0):
                elements = list(filter(lambda x: len(x) > 0, row[0].split(" ")))
                labels.insert(row_index, [int(elements[1]), elements[2]])
            row_index += 1
        labels = np.array(labels)
    return signals, labels

In [23]:
def apply_detrend_and_butterworth(signals):
    fs = 360.0
    N = 650000
    T = N / fs
    t = np.linspace(0, T, N, endpoint=False)

    # Detrend (usuniecie DC/linearna sk≈Çadowa)
    data_detrended = signal.detrend(signals)

    # Butterworth bandpass (np. 0.5 - 40 Hz)
    lowcut = 0.5
    highcut = 40.0
    nyq = 0.5 * fs
    b, a = signal.butter(4, [lowcut/nyq, highcut/nyq], btype='band')
    data_filt = signal.filtfilt(b, a, data_detrended)  # zero-phase
    return data_filt

In [24]:
def zscore_per_beat(x, eps=1e-8):
    """
    Z-score normalization per beat
    """
    return (x - np.mean(x)) / (np.std(x) + eps)

In [25]:
def apply_welch(data):
    fs = 360.0
    nperseg = 64
    noverlap = nperseg // 2
    f_welch, Pxx = signal.welch(data, fs=fs, nperseg=nperseg, noverlap=noverlap, window='hann')
    return f_welch, Pxx

In [26]:
def apply_normalization(data):
    data = data[:len(data)//2]
    data = data / np.linalg.norm(data)
    return data

In [27]:
def process_data(window_size):
    X = []
    y = []

    half_w = window_size // 2
    valid_labels = {'A', 'L', 'N', 'R', 'V'}

    for i in range(len(records)):
        signals, labels = get_record_signals(i)
        signals = apply_detrend_and_butterworth(signals)
        sig_len = len(signals)

        mask = np.isin(labels[:, 1], list(valid_labels))
        filtered_labels = labels[mask]
        
        for j in range(3, len(filtered_labels)):
            prev_2_label_pos = int(filtered_labels[j - 2][0])
            prev_1_label_pos = int(filtered_labels[j - 1][0])
            prev_3_label_pos = int(filtered_labels[j - 3][0])
            label_pos = int(filtered_labels[j][0])
            
            RR_j_2 = prev_2_label_pos - prev_3_label_pos
            start_prev_2 = int(int(filtered_labels[j-2][0]) - 0.6 * RR_j_2)
            end_prev_2  = int(int(filtered_labels[j-2][0]) + 0.8 * RR_j_2)

            RR_j_1 = prev_1_label_pos - prev_2_label_pos
            start_prev_1 = int(int(filtered_labels[j-1][0]) - 0.6 * RR_j_1)
            end_prev_1  = int(int(filtered_labels[j-1][0]) + 0.8 * RR_j_1)
            
            RR_j = label_pos - prev_1_label_pos
            start_curr = int(int(filtered_labels[j][0]) - 0.6 * RR_j)
            end_curr  = int(int(filtered_labels[j][0]) + 0.8 * RR_j)
            

            if start_prev_2 < 0 or end_curr > sig_len:
                continue
            
            X.append([
                *np.fft.fft(zscore_per_beat(signal.resample(signals[start_prev_2:end_prev_2], 256))),
                *np.fft.fft(zscore_per_beat(signal.resample(signals[start_prev_1:end_prev_1], 256))),
                *np.fft.fft(zscore_per_beat(signal.resample(signals[start_curr:end_curr], 256)))
            ])
            y.append(filtered_labels[j][1])

    return X, y

In [28]:
X, y = process_data(window_size=WINDOW_SIZE)

In [29]:
# values, counts = np.unique(y, return_counts=True)
# plt.bar(values, counts)
# plt.title('Class Distribution')
# plt.xlabel('Class')
# plt.ylabel('Number of Samples')
# plt.show()

In [30]:
from collections import Counter

def random_undersample_indices(y, random_state=42):
    rng = np.random.default_rng(random_state)

    classes, counts = np.unique(y, return_counts=True)
    min_count = counts.min()

    indices = []
    for c in classes:
        class_idx = np.where(y == c)[0]
        sampled_idx = rng.choice(class_idx, size=min_count, replace=False)
        indices.append(sampled_idx)

    return np.concatenate(indices)

In [31]:
mlflow.end_run()

In [32]:
for i in range(5):
    X, y = process_data(window_size=WINDOW_SIZE)
    
    mlflow.start_run()
    mlflow.set_experiment("CVNN_ECG_Classification-Detrend_and_Butterworth_ComplexBatch_ZScore_RelativeWindow_v2")
    mlflow.log_param("model", "CVNN-128-ComplexDense-128-ComplexDense")
    mlflow.log_param("input_dim", (256*3,))
    mlflow.log_param("epochs", EPOCHS)
    mlflow.log_param("batch_size", BATCH_SIZE)
    mlflow.log_param("optimizer", "adam")
    mlflow.log_param("loss", "categorical_crossentropy")
    mlflow.log_param("test_size", TEST_SIZE)
    mlflow.log_param("scaler", "ComplexBatch ZScore")
    mlflow.log_param("classes", "A,L,N,R,V")
    mlflow.log_param("window_size", 256)
    
    # Train/test Split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=42)

    #Label Binarization
    lb = preprocessing.LabelBinarizer()
    y_train = lb.fit_transform(y_train)
    y_test = lb.transform(y_test)

    # # #Under sampling
    y_train_int = np.argmax(y_train, axis=1)
    idx = random_undersample_indices(y_train_int)
    X_train = np.array(X_train)[idx]
    y_train = np.array(y_train)[idx]
    
    model = Sequential()
    model.add(complex_layers.ComplexInput(input_shape=(int(256*3),), name='InputLayer'))
    model.add(complex_layers.ComplexBatchNormalization())

    model.add(complex_layers.ComplexDense(128, activation='cart_relu', name='HiddenLayer-1'))
    model.add(complex_layers.ComplexBatchNormalization())

    model.add(complex_layers.ComplexDense(128, activation='cart_relu', name='HiddenLayer-2'))
    model.add(complex_layers.ComplexBatchNormalization())

    model.add(complex_layers.ComplexDense(5, activation='convert_to_real_with_abs', name='OutputLayer'))
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
        loss='categorical_crossentropy',
        metrics=['accuracy'])
    
    history = model.fit(X_train, y_train, epochs=EPOCHS)
    
    X_test_tf = convert_to_tensor(X_test, dtype=complex64)
    y_test_tf = convert_to_tensor(y_test)
    # X_test_tf = expand_dims(X_test_tf, axis=-1)

    y_pred = np.argmax(model.predict(X_test_tf), axis=1)
    y_true = np.argmax(y_test, axis=1)

    accuracy = accuracy_score(y_true, y_pred)

    precision, recall, f1, support = precision_recall_fscore_support(
        y_true, y_pred, average=None
    )

    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(
        y_true, y_pred, average="macro"
    )

    precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(
        y_true, y_pred, average="weighted"
    )

    mlflow.log_metric("accuracy", accuracy)

    mlflow.log_metric("precision_macro", precision_macro)
    mlflow.log_metric("recall_macro", recall_macro)
    mlflow.log_metric("f1_macro", f1_macro)

    mlflow.log_metric("precision_weighted", precision_weighted)
    mlflow.log_metric("recall_weighted", recall_weighted)
    mlflow.log_metric("f1_weighted", f1_weighted)

    cm = confusion_matrix(y_true, y_pred)
    cm_norm = cm / cm.sum(axis=1, keepdims=True)

    plt.figure(figsize=(7, 6))
    sns.heatmap(
        cm_norm,
        annot=True,
        fmt=".2f",
        cmap="Blues"
    )
    plt.title("Normalized Confusion Matrix")
    plt.ylabel("True label")
    plt.xlabel("Predicted label")

    plt.tight_layout()
    plt.savefig("confusion_matrix.png")
    plt.close()

    mlflow.log_artifact("confusion_matrix.png")
    report = classification_report(y_true, y_pred)
    with open("classification_report.txt", "w") as f:
        f.write(report)
    mlflow.log_artifact("classification_report.txt")
    mlflow.end_run()    

2026/01/05 17:38:02 INFO mlflow.tracking.fluent: Experiment with name 'CVNN_ECG_Classification-Detrend_and_Butterworth_ComplexBatch_ZScore_RelativeWindow_v2' does not exist. Creating a new experiment.


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [36]:
experiment_name = "CVNN_ECG_Classification-Detrend_and_Butterworth_ComplexBatch_ZScore_RelativeWindow_v2"

runs = mlflow.search_runs(
    experiment_names=[experiment_name],
    output_format="pandas"
)

summary = (
    runs
    .groupby(["params.window_size", "params.epochs", "params.model"])
    .agg(
        accuracy_mean=("metrics.accuracy", "mean"),
        accuracy_std=("metrics.accuracy", "std"),
        f1_macro_mean=("metrics.f1_macro", "mean"),
        f1_macro_std=("metrics.f1_macro", "std"),
    )
    .reset_index()
)

print(summary)

  params.window_size params.epochs                            params.model  \
0                256           100  CVNN-128-ComplexDense-128-ComplexDense   

   accuracy_mean  accuracy_std  f1_macro_mean  f1_macro_std  
0       0.971496      0.003884       0.926698      0.007777  
