In [1]:
import os
import pandas as pd
import numpy as np
import wfdb

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


## 1. Load Dataset

In [2]:
dir_path = 'mit-bih-arrhythmia-database-1.0.0'

In [3]:
records = [f for f in os.listdir(dir_path) if f.endswith('.dat')]
records = [os.path.splitext(f)[0] for f in records]
print(records)

['100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '111', '112', '113', '114', '115', '116', '117', '118', '119', '121', '122', '123', '124', '200', '201', '202', '203', '205', '207', '208', '209', '210', '212', '213', '214', '215', '217', '219', '220', '221', '222', '223', '228', '230', '231', '232', '233', '234']


## 2. Split the data

In [4]:
from sklearn.model_selection import train_test_split

train_records, test_records = train_test_split(records, test_size=0.3, random_state=42)
valid_records, test_records = train_test_split(test_records, test_size=0.5, random_state=42)


In [5]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [6]:
from sklearn.preprocessing import MinMaxScaler

def normalize_signal(ecg_signal):
    """Normalize ECG signal to range [0, 1]."""
    scaler = MinMaxScaler()
    ecg_signal_normalized = scaler.fit_transform(ecg_signal.reshape(-1, 1)).flatten()
    return ecg_signal_normalized

In [7]:
# Function to generate baseline wander noise
def generate_baseline_wander_noise(ecg_signal, t):
    A = np.random.uniform(0, 0.15) * np.abs((np.max(ecg_signal) - np.min(ecg_signal)))
    w = 2 * np.pi * np.random.uniform(0.15, 0.3)
    phi = np.random.uniform(-np.pi, np.pi)
    return A * np.sin(w * t + phi).reshape(-1)

# Function to generate power line interference
def generate_power_line_interference(ecg_signal, t):
    A = np.random.uniform(0, 0.5) * np.abs((np.max(ecg_signal) - np.min(ecg_signal)))
    w = 2 * np.pi * np.random.uniform(49.8, 50.2)
    phi = np.random.uniform(-np.pi, np.pi)
    return A * np.sin(w * t + phi).reshape(-1)

# Function to generate muscle artifacts
def generate_muscle_artefacts(ecg_signal, t):
    A = np.random.uniform(0, 0.1) * np.abs((np.max(ecg_signal) - np.min(ecg_signal)))
    w = 2 * np.pi * np.random.uniform(0, 10000)
    phi = np.random.uniform(-np.pi, np.pi)
    return A * np.sin(w * t + phi).reshape(-1)

In [8]:
def add_artifacts(ecg_signal, t):
    baseline_wander = generate_baseline_wander_noise(ecg_signal, t)
    power_line_interference = generate_power_line_interference(ecg_signal, t)
    muscle_artifacts = generate_muscle_artefacts(ecg_signal, t)
    noisy_signal = ecg_signal + baseline_wander + power_line_interference + muscle_artifacts
    return noisy_signal

In [9]:
def process_records(record_list, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for record in record_list:
        signal, fields = wfdb.rdsamp(os.path.join(dir_path, record))
        ecg_signal = signal[:, 0]  
        t = np.arange(len(ecg_signal)) / fields['fs']  
        
        noisy_signal = add_artifacts(ecg_signal, t)
        np.savetxt(os.path.join(output_dir, record + '_noisy.dat'), noisy_signal)
        
        plt.figure(figsize=(12, 6))
        plt.plot(t[:300], ecg_signal[:300], label='Original ECG')
        plt.plot(t[:300], noisy_signal[:300], label='Noisy ECG', alpha=0.7)
        plt.title(f'Noisy ECG Signal for {record}')
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')
        plt.legend()
        plt.savefig(os.path.join(output_dir, record + '_noisy.png'))
        plt.close()

In [10]:
# Process each split
process_records(train_records, 'noisy_data/train')
process_records(valid_records, 'noisy_data/valid')
process_records(test_records, 'noisy_data/test')

In [11]:
def load_data_from_files(file_list, directory):
    data = []
    for file in file_list:
        signal = np.loadtxt(os.path.join(directory, file + '_noisy.dat'))
        data.append(signal)
    return np.array(data)


In [12]:
X_train = load_data_from_files(train_records, 'noisy_data/train')
X_valid = load_data_from_files(valid_records, 'noisy_data/valid')
X_test = load_data_from_files(test_records, 'noisy_data/test')

In [13]:
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_valid = scaler.transform(X_valid.reshape(-1, X_valid.shape[-1])).reshape(X_valid.shape)
X_test = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

In [14]:
X_train = X_train[..., np.newaxis]
X_valid = X_valid[..., np.newaxis]
X_test = X_test[..., np.newaxis]


In [15]:
class ConvolutionalAutoencoder(tf.keras.Model):
    def __init__(self):
        super(ConvolutionalAutoencoder, self).__init__()
        
        # Encoder
        self.encoder = models.Sequential([
            layers.Conv1D(filters=32, kernel_size=3, strides=2, padding='same', activation='relu', input_shape=(650000, 1)),
            layers.Conv1D(filters=64, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv1D(filters=128, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv1D(filters=256, kernel_size=3, strides=2, padding='same', activation='relu')
        ])
        
        # Bottleneck
        self.bottleneck = layers.Conv1D(filters=512, kernel_size=3, strides=2, padding='same', activation='relu')
        
        # Decoder
        self.decoder = models.Sequential([
            layers.Conv1DTranspose(filters=256, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv1DTranspose(filters=128, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv1DTranspose(filters=64, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv1DTranspose(filters=32, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv1DTranspose(filters=1, kernel_size=3, strides=2, padding='same', activation='sigmoid')
        ])
    
    def call(self, inputs):
        x = self.encoder(inputs)
        x = self.bottleneck(x)
        x = self.decoder(x)
        return x

NameError: name 'tf' is not defined

In [None]:
model = ConvolutionalAutoencoder()
model.compile(optimizer='adam', loss='mean_squared_error')

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [None]:
print(X_train.shape)
print(X_valid.shape)
print(X_test.shape)


(33, 650000, 1)
(7, 650000, 1)
(8, 650000, 1)


In [None]:
history = model.fit(
    X_train, X_train,  # Autoencoders are trained on input = target
    epochs=50,
    batch_size=32,
    validation_data=(X_valid, X_valid),
    verbose=1
)


Epoch 1/50


ValueError: Dimensions must be equal, but are 650000 and 650016 for '{{node compile_loss/mean_squared_error/sub}} = Sub[T=DT_FLOAT](data_1, convolutional_autoencoder_4_1/sequential_9_1/conv1d_transpose_24_1/Sigmoid)' with input shapes: [?,650000,1], [?,650016,1].

In [None]:
ghgn

NameError: name 'ghgn' is not defined

In [None]:
record_name = records[0]  #'100'
record_path = os.path.join(dir_path, record_name)

# Load the signal and annotations
signal, fields = wfdb.rdsamp(record_path)
annotation = wfdb.rdann(record_path, 'atr')

print(f"Signal shape: {signal.shape}")
print(f"Sampling Frequency: {fields['fs']} Hz")
print(f"Annotation: {annotation.symbol[:10]}")  # Display first 10 annotations


Signal shape: (650000, 2)
Sampling Frequency: 360 Hz
Annotation: ['+', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'A', 'N']


In [None]:
normalized_signal = (signal - np.mean(signal)) / np.std(signal)


In [None]:
# Split the signal into segments for training
window_size = 256  
segments = []
for i in range(0, len(normalized_signal) - window_size, window_size):
    segments.append(normalized_signal[i:i+window_size])
segments = np.array(segments)
print(f"Total segments: {segments.shape[0]}")


Total segments: 2539


In [None]:
noise = np.random.normal(0, 0.1, segments.shape)  # Gaussian noise
noisy_segments = segments + noise


In [None]:
# Create a DataFrame with two columns, one for each channel of the ECG signal
df = pd.DataFrame(normalized_signal, columns=['ECG_Channel_1', 'ECG_Channel_2'])

# Initialize the 'Annotations' column to 0
df['Annotations'] = 0

# Set annotations at the specific sample indices
for idx in annotation.sample:
    df.loc[idx, 'Annotations'] = 1

print(df.head())



   ECG_Channel_1  ECG_Channel_2  Annotations
0       0.570943       1.011541            0
1       0.570943       1.011541            0
2       0.570943       1.011541            0
3       0.570943       1.011541            0
4       0.570943       1.011541            0


In [None]:
# Use only the first channel of the ECG signal
df = pd.DataFrame(normalized_signal[:, 0], columns=['ECG'])

# Initialize the 'Annotations' column to 0
df['Annotations'] = 0

# Set annotations at the specific sample indices
for idx in annotation.sample:
    df.loc[idx, 'Annotations'] = 1

print(df.head())


        ECG  Annotations
0  0.570943            0
1  0.570943            0
2  0.570943            0
3  0.570943            0
4  0.570943            0


In [None]:
X_train = noisy_segments  # Noisy input for training
y_train = segments        # Clean target signal

# Reshape for neural network input (if using CNN)
X_train = X_train.reshape(-1, window_size, 1)
y_train = y_train.reshape(-1, window_size, 1)

print(f"Training data shape: {X_train.shape}")
print(f"Target data shape: {y_train.shape}")


Training data shape: (5078, 256, 1)
Target data shape: (5078, 256, 1)


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, UpSampling1D, Dense, Flatten, Reshape

# Parameters
window_size = X_train.shape[1]  # e.g., 256
channels = 1  # Single channel for simplicity, or use 2 if using both channels

# Define the encoder
input_signal = Input(shape=(window_size, channels))
x = Conv1D(32, 3, activation='relu', padding='same')(input_signal)
x = MaxPooling1D(2, padding='same')(x)
x = Conv1D(16, 3, activation='relu', padding='same')(x)
x = MaxPooling1D(2, padding='same')(x)
x = Flatten()(x)
encoded = Dense(128, activation='relu')(x)

# Define the bottleneck
bottleneck = Dense((window_size // 4) * 16, activation='relu')(encoded)
bottleneck = Reshape((window_size // 4, 16))(bottleneck)

# Define the decoder
x = Conv1D(16, 3, activation='relu', padding='same')(bottleneck)
x = UpSampling1D(2)(x)
x = Conv1D(32, 3, activation='relu', padding='same')(x)
x = UpSampling1D(2)(x)
decoded = Conv1D(channels, 3, activation='sigmoid', padding='same')(x)

# Compile the model
autoencoder = Model(input_signal, decoded)
autoencoder.compile(optimizer='adam', loss='mean_squared_error')

autoencoder.summary()


In [None]:
# Training the model
history = autoencoder.fit(
    X_train, y_train,
    epochs=50,         # Number of epochs, adjust as necessary
    batch_size=32,     # Batch size, adjust as necessary
    validation_split=0.2,  # 20% of the data for validation
    verbose=1
)

# Save the trained model
autoencoder.save('ecg_denoising_autoencoder.h5')


Epoch 1/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 11ms/step - loss: 0.9997 - val_loss: 1.1917
Epoch 2/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 11ms/step - loss: 0.9419 - val_loss: 1.1917
Epoch 3/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 10ms/step - loss: 0.9559 - val_loss: 1.1917
Epoch 4/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 10ms/step - loss: 0.9361 - val_loss: 1.1899
Epoch 5/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step - loss: 0.9323 - val_loss: 1.1897
Epoch 6/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - loss: 0.9618 - val_loss: 1.1897
Epoch 7/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - loss: 0.9803 - val_loss: 1.1897
Epoch 8/50
[1m127/127[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 8ms/step - loss: 0.9544 - val_loss: 1.1896
Epoch 9/50
[1m127/127[0m [32m━━━━



In [None]:
# Assuming you have X_test and y_test prepared similarly to X_train and y_train
loss = autoencoder.evaluate(X_test, y_test)
print(f"Test Loss: {loss}")

# Denoise a sample and visualize
import matplotlib.pyplot as plt

denoised_signal = autoencoder.predict(X_test[0:1])  # Predict on a single sample

# Plot the original, noisy, and denoised signals
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(X_test[0].flatten(), label='Noisy Signal')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(y_test[0].flatten(), label='Clean Signal')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(denoised_signal.flatten(), label='Denoised Signal')
plt.legend()

plt.show()


NameError: name 'X_test' is not defined