# ECG Data Preprocessing

This notebook demonstrates the data preprocessing pipeline for ECG signals from the MIT-BIH Arrhythmia Database.

## Steps:
1. Load ECG signals from MIT-BIH database
2. Signal cleaning and noise removal
3. Data normalization
4. Train/test split preparation

In [None]:
# Import required libraries
import wfdb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import signal
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")

## 1. Load ECG Signals from MIT-BIH Database

In [None]:
# Load a sample ECG record
record_name = '../data/mitdb/100'
record = wfdb.rdrecord(record_name)
annotation = wfdb.rdann(record_name, 'atr')

# Extract ECG signal (first channel)
ecg_signal = record.p_signal[:, 0]
sampling_rate = record.fs

print(f"Record: {record.record_name}")
print(f"Sampling Rate: {sampling_rate} Hz")
print(f"Signal Length: {len(ecg_signal)} samples")
print(f"Duration: {len(ecg_signal)/sampling_rate:.2f} seconds")
print(f"Number of Annotations: {len(annotation.sample)}")

In [None]:
# Visualize raw ECG signal
plt.figure(figsize=(15, 4))
time = np.arange(len(ecg_signal)) / sampling_rate
plt.plot(time[:3600], ecg_signal[:3600], linewidth=0.8, color='#2E86AB')
plt.title('Raw ECG Signal (First 10 seconds)', fontsize=14, fontweight='bold')
plt.xlabel('Time (seconds)', fontsize=12)
plt.ylabel('Amplitude (mV)', fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"Signal Statistics:")
print(f"  Mean: {np.mean(ecg_signal):.4f}")
print(f"  Std Dev: {np.std(ecg_signal):.4f}")
print(f"  Min: {np.min(ecg_signal):.4f}")
print(f"  Max: {np.max(ecg_signal):.4f}")

## 2. Signal Cleaning and Noise Removal

In [None]:
def remove_baseline_wander(ecg_signal, sampling_rate):
    """Remove baseline wander using high-pass filter"""
    # Design high-pass filter (cutoff at 0.5 Hz)
    nyquist = sampling_rate / 2
    cutoff = 0.5 / nyquist
    b, a = signal.butter(4, cutoff, btype='high')
    filtered_signal = signal.filtfilt(b, a, ecg_signal)
    return filtered_signal

def remove_powerline_noise(ecg_signal, sampling_rate):
    """Remove 50/60 Hz powerline interference using notch filter"""
    # Design notch filter at 60 Hz
    nyquist = sampling_rate / 2
    freq = 60.0 / nyquist
    Q = 30.0  # Quality factor
    b, a = signal.iirnotch(freq, Q)
    filtered_signal = signal.filtfilt(b, a, ecg_signal)
    return filtered_signal

def smooth_signal(ecg_signal, window_size=5):
    """Smooth signal using moving average"""
    return np.convolve(ecg_signal, np.ones(window_size)/window_size, mode='same')

# Apply filters
ecg_no_baseline = remove_baseline_wander(ecg_signal, sampling_rate)
ecg_no_noise = remove_powerline_noise(ecg_no_baseline, sampling_rate)
ecg_cleaned = smooth_signal(ecg_no_noise)

print("Signal cleaning completed!")

In [None]:
# Compare raw vs cleaned signal
fig, axes = plt.subplots(2, 1, figsize=(15, 8))

# Raw signal
axes[0].plot(time[:3600], ecg_signal[:3600], linewidth=0.8, color='#E63946')
axes[0].set_title('Raw ECG Signal', fontsize=14, fontweight='bold')
axes[0].set_ylabel('Amplitude (mV)', fontsize=12)
axes[0].grid(True, alpha=0.3)

# Cleaned signal
axes[1].plot(time[:3600], ecg_cleaned[:3600], linewidth=0.8, color='#06A77D')
axes[1].set_title('Cleaned ECG Signal', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Time (seconds)', fontsize=12)
axes[1].set_ylabel('Amplitude (mV)', fontsize=12)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 3. Data Normalization

In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Z-score normalization
scaler_standard = StandardScaler()
ecg_standardized = scaler_standard.fit_transform(ecg_cleaned.reshape(-1, 1)).flatten()

# Min-Max normalization
scaler_minmax = MinMaxScaler(feature_range=(-1, 1))
ecg_normalized = scaler_minmax.fit_transform(ecg_cleaned.reshape(-1, 1)).flatten()

print("Normalization completed!")
print(f"\nStandardized Signal:")
print(f"  Mean: {np.mean(ecg_standardized):.4f}")
print(f"  Std Dev: {np.std(ecg_standardized):.4f}")
print(f"\nMin-Max Normalized Signal:")
print(f"  Min: {np.min(ecg_normalized):.4f}")
print(f"  Max: {np.max(ecg_normalized):.4f}")

In [None]:
# Visualize normalization effects
fig, axes = plt.subplots(3, 1, figsize=(15, 10))

# Original cleaned signal
axes[0].plot(time[:1800], ecg_cleaned[:1800], linewidth=0.8, color='#457B9D')
axes[0].set_title('Cleaned Signal', fontsize=14, fontweight='bold')
axes[0].set_ylabel('Amplitude', fontsize=12)
axes[0].grid(True, alpha=0.3)

# Standardized signal
axes[1].plot(time[:1800], ecg_standardized[:1800], linewidth=0.8, color='#F77F00')
axes[1].set_title('Standardized Signal (Z-score)', fontsize=14, fontweight='bold')
axes[1].set_ylabel('Amplitude', fontsize=12)
axes[1].grid(True, alpha=0.3)

# Min-Max normalized signal
axes[2].plot(time[:1800], ecg_normalized[:1800], linewidth=0.8, color='#06A77D')
axes[2].set_title('Min-Max Normalized Signal', fontsize=14, fontweight='bold')
axes[2].set_xlabel('Time (seconds)', fontsize=12)
axes[2].set_ylabel('Amplitude', fontsize=12)
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 4. Segmentation and Train/Test Split

In [None]:
def segment_signal(signal, segment_length, overlap=0.5):
    """Segment signal into fixed-length windows with overlap"""
    step = int(segment_length * (1 - overlap))
    segments = []
    
    for i in range(0, len(signal) - segment_length, step):
        segment = signal[i:i + segment_length]
        segments.append(segment)
    
    return np.array(segments)

# Segment the signal (1 second windows with 50% overlap)
segment_length = int(sampling_rate * 1.0)  # 1 second
segments = segment_signal(ecg_normalized, segment_length, overlap=0.5)

print(f"Total segments created: {len(segments)}")
print(f"Segment shape: {segments[0].shape}")

In [None]:
# Create labels (for demonstration, using random labels)
# In real scenario, these would be based on annotations
labels = np.random.randint(0, 5, size=len(segments))  # 5 classes

# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    segments, labels, test_size=0.2, random_state=42, stratify=labels
)

print(f"\nDataset Split:")
print(f"  Training samples: {len(X_train)}")
print(f"  Testing samples: {len(X_test)}")
print(f"  Training shape: {X_train.shape}")
print(f"  Testing shape: {X_test.shape}")

In [None]:
# Visualize sample segments
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()

for i in range(6):
    axes[i].plot(X_train[i], linewidth=0.8, color='#4361EE')
    axes[i].set_title(f'Segment {i+1} (Label: {y_train[i]})', fontsize=12)
    axes[i].grid(True, alpha=0.3)
    axes[i].set_xlabel('Sample', fontsize=10)
    axes[i].set_ylabel('Amplitude', fontsize=10)

plt.tight_layout()
plt.show()

## Summary

This notebook demonstrated:
- Loading ECG signals from MIT-BIH database
- Removing baseline wander and powerline noise
- Signal normalization techniques
- Segmentation and train/test split preparation

The preprocessed data is now ready for feature extraction and model training!