In [28]:
# ECG Analysis Pipeline: R-peak Detection and Anomaly Detection
# =============================================================================

import os
import numpy as np
import pandas as pd
import wfdb
import matplotlib.pyplot as plt
from scipy.fft import fft, ifft
from scipy.signal import find_peaks, butter, sosfiltfilt
from typing import Tuple, List, Optional, Dict
from joblib import Parallel, delayed
from tqdm import tqdm
import gc

# Configuration
%matplotlib inline

# Download datasets if needed (commented out to avoid re-downloading)
# wfdb.dl_database("butqdb", "source1", keep_subdirs=False)
# wfdb.dl_database("nsrdb", "source2", keep_subdirs=False)


In [29]:

# =============================================================================
# 1. SIGNAL PREPROCESSING UTILITIES
# =============================================================================

def apply_highpass_filter(signal: np.ndarray, fs: int, low: float = 0.5, high: float = 45.0, order: int = 3) -> np.ndarray:
    """
    Apply Butterworth high-pass filter to remove baseline drift.

    Args:
        signal: Input ECG signal array
        fs: Sampling frequency in Hz
        cutoff: High-pass cutoff frequency in Hz (default: 0.5)
        order: Filter order (default: 4)

    Returns:
        Filtered signal with baseline drift removed
    """
    sos = butter(order, [low / (0.5 * fs), high / (0.5 * fs)], btype='bandpass', output='sos')
    return sosfiltfilt(sos, signal)

def normalize_signal(signal: np.ndarray) -> np.ndarray:
    """
    Normalize signal to zero mean and unit variance.

    Args:
        signal: Input signal array

    Returns:
        Normalized signal array
    """
    return (signal - np.mean(signal)) / np.std(signal)

def preprocess_ecg_signal(signal: np.ndarray, fs: int, normalize: bool = True, order: int = 4) -> np.ndarray:
    """
    Complete ECG preprocessing pipeline including DC removal and filtering.

    Args:
        signal: Raw ECG signal array
        fs: Sampling frequency in Hz
        normalize: Whether to normalize the signal (default: True)
        cutoff: High-pass cutoff frequency in Hz (default: 0.7)
        order: Filter order (default: 4)

    Returns:
        Preprocessed ECG signal ready for analysis
    """
    signal = apply_highpass_filter(signal, fs, order=order)
    if normalize:
        signal = normalize_signal(signal)
    return signal

class ECGStreamProcessor:
    """
    ECG stream processor for chunked reading to manage memory usage with large files.

    This class processes long ECG recordings by reading them in chunks rather than
    loading the entire file into memory at once.
    """

    def __init__(self, fs: int = 1000, chunk_duration_sec: int = 600):
        """
        Initialize the ECG stream processor.

        Args:
            fs: Sampling frequency in Hz (default: 1000)
            chunk_duration_sec: Duration of each processing chunk in seconds (default: 600, 10 minutes)
        """
        self.fs = fs
        self.chunk_size = chunk_duration_sec * fs
        self.overlap_size = int(fs * 5.0)  # 5 second overlap for continuity

    def process_file_in_chunks(self, record_path: str, processor_func,
                              max_duration_hours: float = None) -> List:
        """
        Process ECG file in chunks to manage memory usage.

        Args:
            record_path: Path to ECG record file
            processor_func: Function to apply to each chunk (chunk, chunk_start) -> result
            max_duration_hours: Maximum duration to process in hours (None for full file)

        Returns:
            List of results from processing each chunk
        """
        # Get file information
        header = wfdb.rdheader(record_path)
        total_samples = header.sig_len

        if max_duration_hours:
            max_samples = int(max_duration_hours * 3600 * self.fs)
            total_samples = min(total_samples, max_samples)

        results = []
        processed_samples = 0

        print(f"Processing {total_samples/self.fs/3600:.2f} hours in chunks of {self.chunk_size/self.fs/60:.1f} minutes")

        # Process file in chunks
        with tqdm(total=total_samples, desc="Processing chunks") as pbar:
            while processed_samples < total_samples:
                # Calculate chunk boundaries
                start_sample = max(0, processed_samples - self.overlap_size)
                end_sample = min(total_samples, processed_samples + self.chunk_size)

                # Read chunk from file
                record = wfdb.rdrecord(record_path, sampfrom=start_sample, sampto=end_sample)
                chunk = record.p_signal[:, 0]

                # Preprocess chunk
                chunk_processed = preprocess_ecg_signal(chunk, self.fs)

                # Apply processor function
                processor_func(chunk_processed, start_sample)

                # Update progress
                processed_samples += self.chunk_size
                pbar.update(min(self.chunk_size, end_sample - (processed_samples - self.chunk_size)))

                # Force garbage collection to manage memory
                del chunk, chunk_processed, record
                gc.collect()

        return results

In [30]:
# =============================================================================
# 2. R-PEAK DETECTION
# =============================================================================

def detect_r_peaks_threshold(signal: np.ndarray, fs: int, height_percentile: float = 87,
                            min_distance_ms: float = 250) -> np.ndarray:
    """
    Detect R-peaks using simple peak detection with dynamic threshold.

    Args:
        signal: Preprocessed ECG signal
        fs: Sampling frequency in Hz
        height_percentile: Percentile for dynamic threshold calculation (default: 85)
        min_distance_ms: Minimum distance between peaks in milliseconds assuming 240 BPM max (default: 250)

    Returns:
        Array of R-peak indices in the signal
    """
    height = np.percentile(signal, height_percentile)
    height_neg = np.percentile(-signal, height_percentile)
    min_distance = int(min_distance_ms * fs / 1000)
    peaks_pos, _ = find_peaks(signal, height=height, distance=min_distance)
    peaks_neg, _ = find_peaks(-signal, height=height_neg, distance=min_distance)
    if len(peaks_neg) > len(peaks_pos):
        peaks = peaks_neg
    else:
        peaks = peaks_pos
    return peaks

class RPeakDetector:
    """
    R-peak detector for long ECG signals using chunked processing.

    This class accumulates R-peak detections across multiple chunks while
    avoiding duplicate detections in overlap regions.
    """

    def __init__(self, fs: int = 1000):
        """
        Initialize R-peak detector.

        Args:
            fs: Sampling frequency in Hz (default: 1000)
        """
        self.fs = fs
        self.all_peaks = []
        self.overlap_samples = int(fs * 5.0)  # 5 second overlap

    def process_chunk_for_peaks(self, chunk: np.ndarray, chunk_start: int) -> None:
        """
        Process a single chunk for R-peak detection.

        Args:
            chunk: ECG signal chunk
            chunk_start: Starting sample index of this chunk in the full signal

        Returns:
            Dictionary with processing statistics for this chunk
        """
        # Detect peaks in current chunk
        peaks = detect_r_peaks_threshold(chunk, self.fs)

        if len(peaks) == 0:
            return

        # Adjust peak indices to global position
        global_peaks = peaks + chunk_start

        # Store peaks while avoiding overlap region duplicates
        if chunk_start > 0:
            # Skip overlap region for non-first chunks
            valid_mask = peaks >= self.overlap_samples
            global_peaks = global_peaks[valid_mask]

        self.all_peaks.extend(global_peaks.tolist())

        del peaks, global_peaks
        gc.collect()

    def get_all_detected_peaks(self) -> np.ndarray:
        """
        Get all detected R-peaks from all processed chunks.

        Returns:
            Sorted array of all R-peak sample indices
        """
        if not self.all_peaks:
            return np.array([])
        return np.array(sorted(self.all_peaks))

    def clear_peaks(self) -> None:
        """Clear stored peaks to free memory."""
        self.all_peaks.clear()
        gc.collect()

def calculate_bpm_from_peaks(rpeaks: np.ndarray, fs: int,
                            total_duration_sec: float, window_sec: int = 60) -> Tuple[np.ndarray, np.ndarray]:
    """
    Calculate BPM (beats per minute) in sliding windows from R-peak locations.

    Args:
        rpeaks: Array of R-peak sample indices
        fs: Sampling frequency in Hz
        total_duration_sec: Total signal duration in seconds
        window_sec: Window size for BPM calculation in seconds (default: 60)

    Returns:
        Tuple of (bpm_values, time_minutes) arrays
    """
    if len(rpeaks) == 0:
        return np.array([]), np.array([])

    n_windows = int(total_duration_sec // window_sec)
    bpm_values = np.zeros(n_windows)

    # Use binary search for efficient counting
    window_starts = np.arange(n_windows) * window_sec * fs
    window_ends = window_starts + window_sec * fs

    for i in range(n_windows):
        start_idx = np.searchsorted(rpeaks, window_starts[i], side='left')
        end_idx = np.searchsorted(rpeaks, window_ends[i], side='right')
        peaks_in_window = end_idx - start_idx
        bpm_values[i] = peaks_in_window * 60 / window_sec

    time_minutes = np.arange(n_windows) * (window_sec / 60)
    return bpm_values, time_minutes


In [31]:
# =============================================================================
# 3. SINGLE FILE ECG PROCESSING
# =============================================================================

def analyze_single_ecg_file(data_dir: str, record_name: str, plot: bool = True,
                           max_duration_hours: float = 1.0) -> Dict:
    """
    Analyze a single ECG file for R-peak detection and BPM calculation with memory optimization.

    Args:
        data_dir: Directory containing ECG files
        record_name: Name of the ECG record (without file extension)
        plot: Whether to plot the BPM over time (default: True)
        max_duration_hours: Maximum duration to process in hours (default: 1.0)

    Returns:
        Dictionary containing analysis results and statistics including:
        - record_name: Name of processed record
        - success: Boolean indicating if processing was successful
        - fs: Sampling frequency
        - total_samples: Number of samples processed
        - detected_rpeaks: Number of R-peaks detected
        - average_bpm: Average BPM across the signal
        - bpm_values: Array of BPM values over time
        - time_minutes: Array of time points in minutes
        - processing_duration_hours: Duration of processed signal
        - error: Error message if processing failed
    """
    record_path = os.path.join(data_dir, f"{record_name}_ECG")
    results = {
        'record_name': record_name,
        'success': False,
        'fs': None,
        'total_samples': None,
        'detected_rpeaks': None,
        'average_bpm': None,
        'bpm_values': None,
        'time_minutes': None,
        'processing_duration_hours': None,
        'error': None
    }

    try:
        # Get file information
        header = wfdb.rdheader(record_path)
        fs = header.fs
        total_samples = header.sig_len

        if max_duration_hours:
            max_samples = int(max_duration_hours * 3600 * fs)
            processing_samples = min(total_samples, max_samples)
        else:
            processing_samples = total_samples

        processing_duration_hours = processing_samples / fs / 3600

        print(f"  Total file duration: {total_samples/fs/3600:.2f} hours")
        print(f"  Processing duration: {processing_duration_hours:.2f} hours")

        # Initialize stream processor and R-peak detector
        processor = ECGStreamProcessor(fs=fs, chunk_duration_sec=600)  # 10-minute chunks
        detector = RPeakDetector(fs=fs)

        # Process file in chunks
        processor.process_file_in_chunks(record_path, detector.process_chunk_for_peaks, max_duration_hours)

        # Get all detected R-peaks
        rpeaks = detector.get_all_detected_peaks()

        # Compute BPM over time
        total_duration_sec = processing_samples / fs
        bpm_values, time_minutes = calculate_bpm_from_peaks(
            rpeaks, fs, total_duration_sec
        )
        avg_bpm = np.mean(bpm_values) if len(bpm_values) > 0 else 0

        # Update results
        results.update({
            'fs': fs,
            'total_samples': processing_samples,
            'detected_rpeaks': len(rpeaks),
            'average_bpm': avg_bpm,
            'bpm_values': bpm_values,
            'time_minutes': time_minutes,
            'processing_duration_hours': processing_duration_hours,
            'success': True
        })

        # Plot results if requested
        if plot and len(bpm_values) > 0:
            plt.figure(figsize=(12, 4))
            plt.plot(time_minutes, bpm_values, "-", linewidth=1, alpha=0.8)
            plt.title(f"BPM Over Time - {record_name} ({processing_duration_hours:.1f}h)")
            plt.xlabel("Time (minutes)")
            plt.ylabel("BPM")
            plt.grid(True, alpha=0.3)
            plt.tight_layout()
            plt.show()
            plt.close()

        # Print summary
        print(f"Record: {record_name}")
        print(f"Sampling rate: {fs} Hz")
        print(f"Processed samples: {processing_samples:,}")
        print(f"Detected R-peaks: {len(rpeaks):,}")
        print(f"Average BPM: {avg_bpm:.1f}")
        print("-" * 50)

        detector.clear_peaks()

    except Exception as e:
        results['error'] = str(e)
        print(f"Error processing {record_name}: {str(e)}")
    finally:
        gc.collect()

    return results

In [32]:
# =============================================================================
# 4. BATCH ECG PROCESSING
# =============================================================================

def analyze_ecg_batch(data_dir: str, plot_individual: bool = False,
                     max_duration_hours: float = 1.0) -> pd.DataFrame:
    """
    Batch process multiple ECG files in a directory with memory optimization.

    Args:
        data_dir: Directory containing ECG files
        plot_individual: Whether to plot individual file results (default: False)
        max_duration_hours: Maximum duration to process per file in hours (default: 1.0)

    Returns:
        DataFrame containing summary statistics for all processed files including:
        - Record: Record name
        - Sampling_Rate_Hz: Sampling frequency
        - Total_Samples: Number of samples processed
        - Detected_R_peaks: Number of R-peaks detected
        - Average_BPM: Average BPM for the record
    """
    results = []

    if not os.path.exists(data_dir):
        print(f"Directory {data_dir} not found!")
        return pd.DataFrame()

    all_files = os.listdir(data_dir)
    ecg_files = [f for f in all_files if f.endswith('_ECG.hea')]
    record_names = sorted([f.replace('_ECG.hea', '') for f in ecg_files])

    print(f"Processing {len(record_names)} ECG records from {data_dir}")
    print("=" * 60)

    for i, record_name in enumerate(record_names):
        print(f"\nProcessing file {i+1}/{len(record_names)}: {record_name}")

        result = analyze_single_ecg_file(
            data_dir, record_name, plot=plot_individual,
            max_duration_hours=max_duration_hours
        )

        if result.get('success', False):
            results.append({
                'Record': result['record_name'],
                'Sampling_Rate_Hz': result['fs'],
                'Total_Samples': result['total_samples'],
                'Detected_R_peaks': result['detected_rpeaks'],
                'Average_BPM': result['average_bpm']
            })

        # Force garbage collection between files
        del result
        gc.collect()

    # Create summary DataFrame
    if results:
        df = pd.DataFrame(results)
        print("\nBatch Processing Summary:")
        return df
    else:
        print("No files were successfully processed.")
        return pd.DataFrame()

In [33]:
# =============================================================================
# 5. ANOMALY DETECTION
# =============================================================================

class AnomalyDetector:
    """
    Anomaly detector for ECG signals using multiple detection methods.

    This class applies various anomaly detection techniques and tracks
    where in the signal anomalies are detected for visualization.
    """

    def __init__(self, fs: int = 1000):
        """
        Initialize anomaly detector.

        Args:
            fs: Sampling frequency in Hz (default: 1000)
        """
        self.fs = fs
        self.all_anomalies = []
        self.overlap_samples = int(fs * 5.0)  # 5 second overlap

    def detect_amplitude_anomalies(self, signal: np.ndarray, k: float = 3.5) -> Tuple[np.ndarray, float]:
        """
        Detect anomalies based on amplitude threshold using standard deviation.

        Args:
            signal: ECG signal chunk
            k: Number of standard deviations for threshold (default: 3.5)

        Returns:
            Tuple of (anomaly_indices, threshold_value)
        """
        mean_val = signal.mean()
        std_val = signal.std()
        threshold = mean_val + k * std_val
        anomaly_indices = np.where(np.abs(signal) > threshold)[0]
        return anomaly_indices, threshold

    def detect_rolling_statistics_anomalies(self, signal: np.ndarray, window: int = 1000, threshold: float = 5.5) -> np.ndarray:
        """
        Detect anomalies using rolling mean and standard deviation.

        Args:
            signal: ECG signal chunk
            window: Rolling window size in samples (default: 1000)
            threshold: Number of rolling standard deviations for anomaly threshold (default: 5.5)

        Returns:
            Array of anomaly indices
        """
        if len(signal) < window:
            window = len(signal) // 2

        rolling_mean = np.convolve(signal, np.ones(window) / window, mode='same')
        rolling_std = np.sqrt(np.convolve((signal - rolling_mean)**2, np.ones(window) / window, mode='same'))

        # Avoid division by zero
        rolling_std = np.maximum(rolling_std, 1e-8)
        anomaly_indices = np.where(np.abs(signal - rolling_mean) > threshold * rolling_std)[0]
        return anomaly_indices

    def detect_iqr_anomalies(self, signal: np.ndarray, k: float = 5.0) -> np.ndarray:
        """
        Detect anomalies using Interquartile Range (IQR) method.

        Args:
            signal: ECG signal chunk
            k: IQR multiplier for outlier detection (default: 5.0)

        Returns:
            Array of anomaly indices
        """
        q1, q3 = np.percentile(signal, [25, 75])
        iqr = q3 - q1
        if iqr == 0:  # Handle constant signal
            return np.array([])
        lower_bound = q1 - k * iqr
        upper_bound = q3 + k * iqr
        anomaly_indices = np.where((signal < lower_bound) | (signal > upper_bound))[0]
        return anomaly_indices

    def process_chunk_for_anomalies(self, chunk: np.ndarray, chunk_start: int) -> None:
        """
        Process a single chunk for anomaly detection using multiple methods.

        Args:
            chunk: ECG signal chunk
            chunk_start: Starting sample index of this chunk in the full signal

        Returns:
            Dictionary with anomaly detection statistics for this chunk
        """
        # Apply multiple detection methods
        methods_results = []

        # Amplitude-based detection
        amp_anomalies, _ = self.detect_amplitude_anomalies(chunk)
        methods_results.append(amp_anomalies)

        # Rolling statistics detection
        rolling_anomalies = self.detect_rolling_statistics_anomalies(chunk)
        methods_results.append(rolling_anomalies)

        # IQR-based detection
        iqr_anomalies = self.detect_iqr_anomalies(chunk)
        methods_results.append(iqr_anomalies)

        # Combine results from all methods
        if any(len(arr) > 0 for arr in methods_results):
            combined_anomalies = np.unique(np.concatenate([arr for arr in methods_results if len(arr) > 0]))
        else:
            combined_anomalies = np.array([])

        if len(combined_anomalies) == 0:
            return

        # Adjust indices to global position
        global_anomalies = combined_anomalies + chunk_start

        # Store anomalies while avoiding overlap region duplicates
        if chunk_start > 0:
            # Skip overlap region for non-first chunks
            valid_mask = combined_anomalies >= self.overlap_samples
            global_anomalies = global_anomalies[valid_mask]

        self.all_anomalies.extend(global_anomalies.tolist())

        del amp_anomalies, rolling_anomalies, iqr_anomalies, combined_anomalies, global_anomalies
        gc.collect()

    def get_all_detected_anomalies(self) -> np.ndarray:
        """
        Get all detected anomalies from all processed chunks.

        Returns:
            Sorted array of all anomaly sample indices
        """
        if not self.all_anomalies:
            return np.array([])
        return np.array(sorted(self.all_anomalies))

    def clear_anomalies(self) -> None:
        """Clear stored anomalies to free memory."""
        self.all_anomalies.clear()
        gc.collect()

def analyze_anomalies_in_file(data_dir: str, record_name: str,
                             max_duration_hours: float = 1.0,
                             plot_anomalies: bool = True) -> Dict:
    """
    Detect and analyze anomalies in a single ECG file with location plotting.

    Args:
        data_dir: Directory containing ECG files
        record_name: Name of the ECG record (without file extension)
        max_duration_hours: Maximum duration to process in hours (default: 1.0)
        plot_anomalies: Whether to plot anomaly locations in the signal (default: True)

    Returns:
        Dictionary containing anomaly detection results including:
        - record_name: Name of processed record
        - success: Boolean indicating if processing was successful
        - detected_anomalies: Number of anomalies detected
        - anomaly_rate_per_hour: Rate of anomalies per hour
        - processing_duration_hours: Duration of processed signal
        - anomaly_times_minutes: Array of anomaly times in minutes
        - error: Error message if processing failed
    """
    record_path = os.path.join(data_dir, f"{record_name}_ECG")
    results = {
        'record_name': record_name,
        'success': False,
        'detected_anomalies': 0,
        'anomaly_rate_per_hour': 0,
        'processing_duration_hours': 0,
        'anomaly_times_minutes': None,
        'error': None
    }

    try:
        # Get file information
        header = wfdb.rdheader(record_path)
        fs = header.fs
        total_samples = header.sig_len

        if max_duration_hours:
            max_samples = int(max_duration_hours * 3600 * fs)
            processing_samples = min(total_samples, max_samples)
        else:
            processing_samples = total_samples

        processing_duration_hours = processing_samples / fs / 3600

        # Initialize stream processor and anomaly detector
        processor = ECGStreamProcessor(fs=fs, chunk_duration_sec=600)  # 10-minute chunks
        detector = AnomalyDetector(fs=fs)

        # Process file in chunks
        processor.process_file_in_chunks(record_path, detector.process_chunk_for_anomalies, max_duration_hours)

        # Get all detected anomalies
        anomalies = detector.get_all_detected_anomalies()
        anomaly_rate = len(anomalies) / processing_duration_hours if processing_duration_hours > 0 else 0

        # Convert anomaly indices to time in minutes
        anomaly_times_minutes = anomalies / fs / 60

        results.update({
            'success': True,
            'detected_anomalies': len(anomalies),
            'anomaly_rate_per_hour': anomaly_rate,
            'processing_duration_hours': processing_duration_hours,
            'anomaly_times_minutes': anomaly_times_minutes
        })

        # Plot anomaly locations if requested
        if plot_anomalies and len(anomalies) > 0:
            plt.figure(figsize=(14, 8))

            # Top subplot: Anomaly timeline
            plt.subplot(2, 1, 1)
            plt.scatter(anomaly_times_minutes, np.ones(len(anomaly_times_minutes)),
                       c='red', alpha=0.6, s=20)
            plt.title(f"Anomaly Locations in {record_name} ({len(anomalies)} anomalies detected)")
            plt.xlabel("Time (minutes)")
            plt.ylabel("Anomalies")
            plt.ylim(0.5, 1.5)
            plt.grid(True, alpha=0.3)

            # Bottom subplot: Anomaly density histogram
            plt.subplot(2, 1, 2)
            if len(anomaly_times_minutes) > 1:
                bins = min(50, len(anomaly_times_minutes) // 2 + 1)
                plt.hist(anomaly_times_minutes, bins=bins, alpha=0.7, color='red', edgecolor='black')
                plt.title("Anomaly Density Distribution")
                plt.xlabel("Time (minutes)")
                plt.ylabel("Number of Anomalies")
                plt.grid(True, alpha=0.3)
            else:
                plt.text(0.5, 0.5, "Insufficient anomalies for density plot",
                        transform=plt.gca().transAxes, ha='center', va='center')
                plt.title("Anomaly Density Distribution")

            plt.tight_layout()
            plt.show()
            plt.close()

        print(f"Record: {record_name}")
        print(f"Detected anomalies: {len(anomalies):,}")
        print(f"Anomaly rate: {anomaly_rate:.1f} per hour")
        if len(anomalies) > 0:
            print(f"First anomaly at: {anomaly_times_minutes[0]:.2f} minutes")
            print(f"Last anomaly at: {anomaly_times_minutes[-1]:.2f} minutes")
        print("-" * 50)

        detector.clear_anomalies()

    except Exception as e:
        results['error'] = str(e)
        print(f"Error processing {record_name}: {str(e)}")
    finally:
        gc.collect

    return results

In [34]:
# =============================================================================
# 6. R-PEAK EVALUATION
# =============================================================================

def evaluate_peak_detection_accuracy(true_peaks: np.ndarray, detected_peaks: np.ndarray, fs: int = 1000,
                                   tolerance_ms: int = 100) -> Tuple[int, int, int]:
    """
    Evaluate R-peak detection accuracy by comparing with ground truth annotations.

    Args:
        true_peaks: Ground truth R-peak indices
        detected_peaks: Algorithm-detected R-peak indices
        fs: Sampling frequency in Hz (default: 1000)
        tolerance_ms: Relative tolerance for matching peaks in miliseconds(default: 100)

    Returns:
        Tuple of (correct_detections, total_true_peaks, total_detected_peaks)
    """
    tolerance_samples = int((tolerance_ms / 1000) * fs)

    if len(true_peaks) == 0 or len(detected_peaks) == 0:
        return 0, len(true_peaks), len(detected_peaks)

    true_peaks = np.asarray(true_peaks)
    detected_peaks = np.asarray(detected_peaks)

    correct = 0
    used = np.zeros(len(detected_peaks), dtype=bool)

    for tp in true_peaks:
        # Find detected peak within ±tolerance
        idx = np.where((detected_peaks >= tp - tolerance_samples) &
                       (detected_peaks <= tp + tolerance_samples) &
                       (~used))[0]
        if len(idx) > 0:
            correct += 1
            used[idx[0]] = True

    return correct, len(true_peaks), len(detected_peaks)

def evaluate_single_file_rpeak_detection(test_dir: str, base_name: str, fs: int = 1000,
                                        max_duration_hours: float = 1.0) -> Dict:
    """
    Evaluate R-peak detection performance on a single file with ground truth.

    Args:
        test_dir: Directory containing test files with annotations
        base_name: Base name of the file (without extension)
        fs: Sampling frequency in Hz (default: 1000)
        max_duration_hours: Maximum duration to process in hours (default: 1.0)

    Returns:
        Dictionary containing evaluation metrics including:
        - File: Base filename
        - Correct: Number of correctly detected peaks
        - Missed: Number of missed peaks
        - Total_True: Total number of true peaks
        - Total_Detected: Total number of detected peaks
        - Accuracy_Percent: Detection accuracy percentage
        - Error: Error message if processing failed
    """
    file_path = os.path.join(test_dir, base_name)
    result = {'File': base_name}

    try:
        # Get file information
        header = wfdb.rdheader(file_path)
        total_samples = header.sig_len

        if max_duration_hours:
            max_samples = int(max_duration_hours * 3600 * fs)
            processing_samples = min(total_samples, max_samples)
        else:
            processing_samples = total_samples

        # Load ground truth annotations
        annotations = wfdb.rdann(file_path, 'atr')
        true_peaks = annotations.sample
        true_peaks = true_peaks[true_peaks < processing_samples]

        del annotations

        # Process ECG in chunks for R-peak detection
        processor = ECGStreamProcessor(fs=fs, chunk_duration_sec=600)
        detector = RPeakDetector(fs=fs)

        processor.process_file_in_chunks(file_path, detector.process_chunk_for_peaks, max_duration_hours)

        detected_peaks = detector.get_all_detected_peaks()

        # Evaluate accuracy
        correct, total_true, total_detected = evaluate_peak_detection_accuracy(true_peaks, detected_peaks)
        accuracy = (correct / total_true * 100) if total_true > 0 else 0

        result.update({
            'Correct': correct,
            'Missed': total_true - correct,
            'Total_True': total_true,
            'Total_Detected': total_detected,
            'Accuracy_Percent': round(accuracy, 2)
        })

        del true_peaks, detected_peaks, processor, detector

    except Exception as e:
        result['Error'] = str(e)
    finally:
        gc.collect()

    return result

def evaluate_rpeak_detection_batch(test_dir: str, fs: int = 1000,
                                  max_duration_hours: float = 1.0) -> pd.DataFrame:
    """
    Evaluate R-peak detection performance on multiple ECG files with ground truth annotations.

    Processes files sequentially with aggressive memory management to prevent overflow.
    Each file is processed independently with immediate cleanup to minimize memory footprint.

    Args:
        test_dir (str): Directory path containing test files with annotations (.dat files)
        fs (int, optional): Sampling frequency in Hz. Defaults to 1000.
        max_duration_hours (Optional[float], optional): Maximum duration to process per file in hours.
                                                       If None, processes entire files. Defaults to 1.0.

    Returns:
        pd.DataFrame: Evaluation results for all successfully processed files with columns:
            - File: Filename
            - Correct: Number of correctly detected peaks
            - Missed: Number of missed peaks
            - Total_True: Total number of true peaks
            - Total_Detected: Total number of detected peaks
            - Accuracy_Percent: Detection accuracy percentage

    Prints:
        - Progress information and overall accuracy statistics
        - Error messages for failed file processing
    """
    if not os.path.isdir(test_dir):
        print(f"Directory {test_dir} not found!")
        return pd.DataFrame()

    dat_files = sorted([f for f in os.listdir(test_dir) if f.endswith('.dat')])
    base_names = [f[:-4] for f in dat_files]

    if not base_names:
        print(f"No .dat files found in {test_dir}.")
        return pd.DataFrame()

    print(f"Evaluating R-peak detection on {len(base_names)} files from {test_dir}")
    if max_duration_hours:
        print(f"Processing {max_duration_hours:.1f} hours per file")
    else:
        print("Processing complete files.")
    print("=" * 60)

    results = []
    for name in tqdm(base_names, desc="Evaluating files"):
        file_result = evaluate_single_file_rpeak_detection(test_dir, name, fs=fs,
                                                  max_duration_hours=max_duration_hours)
        if 'Error' in file_result:
            print(f"Error processing {name}: {file_result['Error']}")
        else:
            results.append(file_result)

        # Force garbage collection
        del file_result
        gc.collect()

    if results:
        # Create DataFrame with proper rounding
        df = pd.DataFrame(results)

        # Round numerical columns
        numeric_columns = ['Accuracy_Percent']
        for col in numeric_columns:
            if col in df.columns:
                df[col] = df[col].round(1)

        print("\nR-peak Detection Evaluation Results:")
        print(f"Successfully processed {len(df)} out of {len(base_names)} files")

        # Calculate and display overall accuracy
        total_correct = df['Correct'].sum()
        total_true = df['Total_True'].sum()

        if total_true > 0:
            overall_accuracy = (total_correct / total_true) * 100
            print(f"Overall Accuracy: {overall_accuracy:.2f}%")
            print(f"Total True Peaks: {total_true}")
            print(f"Total Correct Detections: {total_correct}")
        else:
            print("No ground truth peaks found across all files")

        # Clean up before returning
        del results
        gc.collect()

        return df
    else:
        print("No files were successfully processed.")
        return pd.DataFrame()


In [35]:
# =============================================================================
# 7. MAIN EXECUTION FUNCTIONS
# =============================================================================

def run_ecg_analysis_pipeline(max_duration_hours: float = 1.0):
    """
    Execute the complete ECG analysis pipeline with processing time limits.

    Performs a comprehensive ECG analysis workflow including single file processing,
    batch processing, R-peak detection evaluation, and anomaly detection on
    specified data sources.

    Args:
        max_duration_hours (float): Maximum duration in hours to process per file.
                                  Defaults to 1.0 hour to manage memory usage
                                  and processing time.

    Returns:
        dict: Dictionary containing results from all pipeline steps:
            - 'single_ecg': Results from single ECG file processing
            - 'batch_ecg': Results from batch ECG file processing
            - 'rpeak_evaluation': Results from R-peak detection evaluation
            - 'anomaly_result': Results from anomaly detection evaluation

    Note:
        Pipeline processes data from 'source1' and 'source2' directories.
        Processing time is limited per file to prevent memory overflow.
    """
    print("ECG Analysis Pipeline")
    print("=" * 50)
    print(f"Processing up to {max_duration_hours:.1f} hours per file")

    # Step 1: Process single ECG file from source1
    print("\n1. Processing single ECG file from source1...")
    single_result = analyze_single_ecg_file("source1", "100001",
                                           plot=True, max_duration_hours=max_duration_hours)

    # Step 2: Batch process files in source1
    print("\n2. Batch processing ECG files in source1...")
    batch_results = analyze_ecg_batch("source1", plot_individual=False,
                                    max_duration_hours=max_duration_hours)

    # Step 3: Evaluate R-peak detection on source2
    print("\n3. Evaluating R-peak detection on source2...")
    rpeak_results = evaluate_rpeak_detection_batch("source2", max_duration_hours=max_duration_hours)

    # Step 4: Anomaly detection evaluation
    print("\n4. Sample anomaly detection on source1...")
    anomaly_result = None
    if not batch_results.empty:
        sample_record = batch_results['Record'].iloc[0]
        anomaly_result = analyze_anomalies_in_file("source1", sample_record,
                                                       max_duration_hours=max_duration_hours)

    print("\nPipeline execution completed!")

    return {
        'single_ecg': single_result,
        'batch_ecg': batch_results,
        'rpeak_evaluation': rpeak_results,
        'anomaly_result': anomaly_result
    }

In [36]:

# =============================================================================
# 8. USAGE EXAMPLES
# =============================================================================

# Run the memory-efficient pipeline
# results = run_ecg_analysis_pipeline(max_duration_hours=0.5)

# Or run individual components with limited duration:
# analyze_single_ecg_file("source1", "100001", max_duration_hours=None)
# analyze_ecg_batch("source1", max_duration_hours=None)
evaluate_rpeak_detection_batch("source2", max_duration_hours=None)
# analyze_anomalies_in_file("source1", "100001")


Evaluating R-peak detection on 18 files from source2
Processing complete files.


Evaluating files:   0%|          | 0/18 [00:00<?, ?it/s]

Processing 3.26 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11730944/11730944 [00:07<00:00, 1585244.81it/s]
Evaluating files:   6%|▌         | 1/18 [00:10<03:06, 10.98s/it]

Processing 3.20 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11520000/11520000 [00:04<00:00, 2585152.22it/s]
Evaluating files:  11%|█         | 2/18 [00:18<02:18,  8.65s/it]

Processing 3.15 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11354112/11354112 [00:04<00:00, 2430157.94it/s]
Evaluating files:  17%|█▋        | 3/18 [00:27<02:17,  9.17s/it]

Processing 3.07 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11051008/11051008 [00:10<00:00, 1063637.56it/s]
Evaluating files:  22%|██▏       | 4/18 [00:45<02:58, 12.73s/it]

Processing 3.32 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11960320/11960320 [00:07<00:00, 1589955.98it/s]
Evaluating files:  28%|██▊       | 5/18 [00:57<02:37, 12.13s/it]

Processing 3.15 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11327488/11327488 [00:07<00:00, 1572596.96it/s]
Evaluating files:  33%|███▎      | 6/18 [01:11<02:34, 12.89s/it]

Processing 3.07 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11046912/11046912 [00:04<00:00, 2420590.82it/s]
Evaluating files:  39%|███▉      | 7/18 [01:21<02:10, 11.87s/it]

Processing 3.13 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11284480/11284480 [00:04<00:00, 2440741.01it/s]
Evaluating files:  44%|████▍     | 8/18 [01:30<01:51, 11.15s/it]

Processing 3.02 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 10866688/10866688 [00:04<00:00, 2334444.81it/s]
Evaluating files:  50%|█████     | 9/18 [01:40<01:35, 10.60s/it]

Processing 2.96 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 10659840/10659840 [00:09<00:00, 1162350.69it/s]
Evaluating files:  56%|█████▌    | 10/18 [01:55<01:36, 12.05s/it]

Processing 3.12 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11235328/11235328 [00:04<00:00, 2382629.07it/s]
Evaluating files:  61%|██████    | 11/18 [02:03<01:15, 10.80s/it]

Processing 3.32 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11960320/11960320 [00:05<00:00, 2091029.96it/s]
Evaluating files:  67%|██████▋   | 12/18 [02:13<01:04, 10.67s/it]

Processing 3.04 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 10942464/10942464 [00:06<00:00, 1756501.88it/s]
Evaluating files:  72%|███████▏  | 13/18 [02:23<00:51, 10.24s/it]

Processing 3.05 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 10967040/10967040 [00:03<00:00, 3167676.00it/s]
Evaluating files:  78%|███████▊  | 14/18 [02:33<00:40, 10.16s/it]

Processing 3.10 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11143168/11143168 [00:04<00:00, 2684855.56it/s]
Evaluating files:  83%|████████▎ | 15/18 [02:39<00:27,  9.17s/it]

Processing 2.97 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 10705920/10705920 [00:02<00:00, 4507226.40it/s]
Evaluating files:  89%|████████▉ | 16/18 [02:44<00:15,  7.76s/it]

Processing 3.09 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 11139072/11139072 [00:03<00:00, 3597429.43it/s]
Evaluating files:  94%|█████████▍| 17/18 [02:55<00:08,  8.64s/it]

Processing 2.97 hours in chunks of 10.0 minutes


Processing chunks: 100%|██████████| 10701824/10701824 [00:05<00:00, 2016541.94it/s]
Evaluating files: 100%|██████████| 18/18 [03:05<00:00, 10.32s/it]


R-peak Detection Evaluation Results:
Successfully processed 18 out of 18 files
Overall Accuracy: 26.93%
Total True Peaks: 1806792
Total Correct Detections: 486581





Unnamed: 0,File,Correct,Missed,Total_True,Total_Detected,Accuracy_Percent
0,16265,29279,71676,100955,31152,29.0
1,16272,27730,69416,97146,28687,28.5
2,16273,26311,63786,90097,28217,29.2
3,16420,27494,74942,102436,28400,26.8
4,16483,26812,77749,104561,29159,25.6
5,16539,29728,78946,108674,30331,27.4
6,16773,26416,86481,112897,27496,23.4
7,16786,30567,71172,101739,31186,30.0
8,16795,26622,61056,87678,28159,30.4
9,17052,26962,61040,88002,27647,30.6
