In [1]:
import numpy as np
import wfdb
import scipy.signal as signal
from multiprocessing import Pool, cpu_count

In [2]:
# Function to read .dat and .hea files and return the ECG signals
def read_ecg_files(file_path):
    record = wfdb.rdrecord(file_path)
    ecg_signals = record.p_signal
    return ecg_signals

# List all files in the database
arr_files = [f'ARR_{i:02d}' for i in range(1, 13)]
nrr_files = [f'NR_{i:02d}' for i in range(1, 15)]

# Read all files and store signals
arr_signals = [read_ecg_files(file) for file in arr_files]
nrr_signals = [read_ecg_files(file) for file in nrr_files]

In [3]:
# Normalize Least Mean Square (NLMS) adaptive filter
def nlms_filter(primary_signal, reference_signal, step_size=0.1, num_coeffs=15):
    num_samples = primary_signal.shape[0]
    output_signal = np.zeros_like(primary_signal)
    weights = np.zeros((num_coeffs, primary_signal.shape[1]))

    # Padding the primary_signal to handle the initial window
    padded_signal = np.pad(primary_signal, ((num_coeffs - 1, 0), (0, 0)), mode='constant')

    for n in range(num_samples):
        for ch in range(primary_signal.shape[1]):
            x = padded_signal[n:num_coeffs + n, ch][::-1]
            y = np.dot(weights[:, ch], x)
            e = reference_signal[n, ch] - y
            mu = step_size / (np.dot(x, x) + 1e-6)  # Update step size
            weights[:, ch] += mu * e * x
            output_signal[n, ch] = y

    return output_signal

In [None]:
# Apply NLMS filter in parallel
def parallel_nlms_filter(signals):
    with Pool(cpu_count()) as pool:
        filtered_signals = pool.starmap(nlms_filter, [(sig, sig) for sig in signals])
    return filtered_signals

filtered_arr_signals = parallel_nlms_filter(arr_signals)
filtered_nrr_signals = parallel_nlms_filter(nrr_signals)
# One-dimensional convolution with Gaussian wavelet
def gaussian_wavelet(size, sigma):
    x = np.linspace(-size // 2, size // 2, size)
    wavelet = np.exp(-0.5 * (x / sigma) ** 2)
    return wavelet / np.sum(wavelet)

wavelet = gaussian_wavelet(size=15, sigma=2)

def apply_wavelet_convolution(signal, wavelet):
    convoluted_signal = np.zeros_like(signal)
    for ch in range(signal.shape[1]):
        convoluted_signal[:, ch] = convolve(signal[:, ch], wavelet, mode='same')
    return convoluted_signal

In [10]:
# Custom implementation of approximate entropy
def approximate_entropy(U, m, r):
    def _maxdist(x_i, x_j):
        return max([abs(ua - va) for ua, va in zip(x_i, x_j)])
    
    N = len(U)
    x = [[U[j] for j in range(i, i + m)] for i in range(N - m + 1)]
    C = []
    for x_i in x:
        C.append(len([1 for x_j in x if _maxdist(x_i, x_j) <= r]) / (N - m + 1))
    C = np.log(C)
    return sum(C) / (N - m + 1)

In [None]:
# Define feature extraction function
def extract_features(signal):
    features = []
    for ch in range(signal.shape[1]):
        ch_signal = signal[:, ch]
        mean = np.mean(ch_signal)
        std_dev = np.std(ch_signal)
        rms = np.sqrt(np.mean(ch_signal**2))
        skewness = skew(ch_signal)
        kurt = kurtosis(ch_signal)
        zero_crossings = ((ch_signal[:-1] * ch_signal[1:]) < 0).sum()
        entropy = approximate_entropy(ch_signal, m=2, r=0.2 * np.std(ch_signal))
        
        # Here we assume autoregressive coefficients and Higuchi’s fractal dimension are provided by custom functions
        ar_coeffs = compute_ar_coeffs(ch_signal, order=4)
        higuchi_fd = compute_higuchi_fd(ch_signal, kmax=10)
        
        features.extend([mean, std_dev, rms, skewness, kurt, zero_crossings, entropy, higuchi_fd, *ar_coeffs])
    
    return np.array(features)

def compute_ar_coeffs(signal, order):
    from statsmodels.tsa.ar_model import AutoReg
    model = AutoReg(signal, lags=order)
    model_fit = model.fit()
    return model_fit.params[1:]

def compute_higuchi_fd(signal, kmax):
    # Placeholder for Higuchi's fractal dimension calculation
    # Implement Higuchi's algorithm or use an appropriate library function
    return np.random.random()


In [None]:
# Extract features from all convoluted signals
arr_features = np.array([extract_features(sig) for sig in convoluted_arr_signals])
nrr_features = np.array([extract_features(sig) for sig in convoluted_nrr_signals])


In [None]:

from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

In [None]:
# Assume labels are 0 for NRR and 1 for ARR
labels_arr = np.ones(len(arr_features))
labels_nrr = np.zeros(len(nrr_features))

In [None]:
# Combine features and labels
features = np.vstack((arr_features, nrr_features))
labels = np.hstack((labels_arr, labels_nrr))

In [None]:
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.5, random_state=42)

# Build the ANN model
model = Sequential()
model.add(Dense(64, input_dim=X_train.shape[1], activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.05), loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=50, batch_size=10, validation_data=(X_test, y_test))

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Loss: {loss}, Accuracy: {accuracy}')