In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import signal
from fitparse import FitFile
import neurokit2 as nk

In [9]:
class FitEKGAnalyzer:
    """
    A class to analyze EKG data from FIT files, detect abnormal beats
    and calculate heart rate variability metrics.
    """
    
    def __init__(self, file_path):
        """
        Initialize the analyzer with a FIT file path.
        
        Parameters:
        -----------
        file_path : str
            Path to the FIT file
        """
        self.file_path = file_path
        self.raw_data = None
        self.ecg_signal = None
        self.sampling_rate = None
        self.r_peaks = None
        self.heartbeats = None
        self.beat_classifications = None
        self.metrics = {}
        
    def load_fit_file(self):
        """Load and parse the FIT file"""
        try:
            fit_file = FitFile(self.file_path)
            
            # Extract EKG data - field names may vary by device
            ecg_data = []
            timestamps = []
            i=0
            for record in fit_file.get_messages('record'):
                # Look for ECG data fields
                for field in record:
                    i += 1
                    if 'ecg' in field.name.lower() or 'ekg' in field.name.lower():
                        ecg_data.append(field.value)
                    if field.name == 'timestamp':
                        timestamps.append(field.value)
            print(i)
            # If no explicit ECG data is found, try to use other fields
            if not ecg_data:
                for record in fit_file.get_messages('record'):
                    # Some devices store ECG under different names
                    for field in record:
                        if field.name in ['heart_rate_raw', 'heart_waveform']:
                            ecg_data.append(field.value)
                        if field.name == 'timestamp':
                            timestamps.append(field.value)
            
            if not ecg_data:
                raise ValueError("No ECG/EKG data found in the FIT file")
                
            # Estimate sampling rate from timestamps if available
            if len(timestamps) > 1:
                time_diff = (timestamps[-1] - timestamps[0]).total_seconds()
                self.sampling_rate = len(ecg_data) / time_diff
            else:
                # Default to a common ECG sampling rate if can't be determined
                self.sampling_rate = 250  # Hz
            
            self.raw_data = pd.DataFrame({
                'timestamp': timestamps if len(timestamps) == len(ecg_data) else range(len(ecg_data)),
                'ecg': ecg_data
            })
            
            self.ecg_signal = np.array(ecg_data)
            
            print(f"Loaded ECG data with {len(ecg_data)} samples at {self.sampling_rate:.2f} Hz")
            return True
            
        except Exception as e:
            print(f"Error loading FIT file: {e}")
            return False
    
    def preprocess_ecg(self):
        """Preprocess the ECG signal by removing noise and baseline wander"""
        if self.ecg_signal is None:
            print("No ECG data loaded. Please load a FIT file first.")
            return False
        
        # Apply bandpass filter to remove noise
        # ECG typically has frequency components between 0.5 and 40 Hz
        self.ecg_signal = nk.ecg_clean(self.ecg_signal, sampling_rate=self.sampling_rate)
        return True
    
    def detect_r_peaks(self):
        """Detect R-peaks in the ECG signal"""
        if self.ecg_signal is None:
            print("No ECG data loaded. Please load a FIT file first.")
            return False
        
        # Use neurokit2 for R-peak detection
        _, info = nk.ecg_peaks(self.ecg_signal, sampling_rate=self.sampling_rate)
        self.r_peaks = info['ECG_R_Peaks']
        
        print(f"Detected {len(self.r_peaks)} R-peaks")
        return True
    
    def segment_heartbeats(self):
        """Segment the ECG signal into individual heartbeats"""
        if self.r_peaks is None:
            print("No R-peaks detected. Please run detect_r_peaks first.")
            return False
        
        # Segment heartbeats around R-peaks
        # Typical heartbeat is ~200ms before R-peak and ~400ms after
        before = int(0.2 * self.sampling_rate)
        after = int(0.4 * self.sampling_rate)
        
        self.heartbeats = []
        for r_peak in self.r_peaks:
            if r_peak - before >= 0 and r_peak + after < len(self.ecg_signal):
                beat = self.ecg_signal[r_peak - before : r_peak + after]
                self.heartbeats.append(beat)
        
        print(f"Segmented {len(self.heartbeats)} heartbeats")
        return True
    
    def classify_beats(self):
        """Classify beats as normal or abnormal with sub-classifications"""
        if self.heartbeats is None:
            print("No heartbeats segmented. Please run segment_heartbeats first.")
            return False
        
        # This is a simplified classification model
        # In a real application, you would use a more sophisticated model
        # trained on labeled ECG data
        
        # For demonstration, we'll use a simple approach based on 
        # beat morphology and RR intervals
        
        classifications = []
        rr_intervals = np.diff(self.r_peaks) / self.sampling_rate * 1000  # in ms
        
        # First beat doesn't have a preceding RR interval
        first_classification = self.classify_single_beat(self.heartbeats[0], None)
        classifications.append(first_classification)
        
        # Classify remaining beats
        for i in range(1, len(self.heartbeats)):
            beat_class = self.classify_single_beat(self.heartbeats[i], rr_intervals[i-1])
            classifications.append(beat_class)
        
        self.beat_classifications = classifications
        
        # Calculate statistics
        self.calculate_classification_stats()
        
        return True
    
    def classify_single_beat(self, beat, rr_interval):
        """
        Classify a single heartbeat
        
        This is a simplified classification for demonstration purposes.
        A real implementation would use a trained model.
        
        Classifications:
        - Normal: Normal sinus rhythm
        - PVC: Premature Ventricular Contraction
        - PAC: Premature Atrial Contraction
        - LBBB: Left Bundle Branch Block
        - RBBB: Right Bundle Branch Block
        - APC: Atrial Premature Complex
        """
        # This is a placeholder implementation
        # In reality, you would use machine learning or rule-based algorithms
        
        # Calculate features
        beat_length = len(beat)
        peak_amplitude = np.max(beat)
        trough_amplitude = np.min(beat)
        peak_to_peak = peak_amplitude - trough_amplitude
        
        # Simplified logic
        # Note: These are not medically accurate classifiers
        if rr_interval is not None:
            # Check for premature beats
            if rr_interval < 600:  # Shorter RR interval suggests premature beat
                if peak_to_peak > 1.5 * np.mean([b.max() - b.min() for b in self.heartbeats]):
                    return 'PVC'  # Premature Ventricular Contraction
                else:
                    return 'PAC'  # Premature Atrial Contraction
            
            # Check for bundle branch blocks (simplified)
            # These would normally use QRS width and morphology
            r_peak_idx = np.argmax(beat)
            if r_peak_idx < beat_length/3:
                return 'RBBB'  # Right Bundle Branch Block
            elif r_peak_idx > beat_length/2:
                return 'LBBB'  # Left Bundle Branch Block
                
        # If nothing specific is detected, classify as normal
        return 'Normal'
    
    def calculate_classification_stats(self):
        """Calculate statistics for beat classifications"""
        if self.beat_classifications is None:
            return
        
        total_beats = len(self.beat_classifications)
        unique_classifications = set(self.beat_classifications)
        
        stats = {}
        for classification in unique_classifications:
            count = self.beat_classifications.count(classification)
            percentage = (count / total_beats) * 100
            stats[classification] = {
                'count': count,
                'percentage': percentage
            }
        
        self.metrics['beat_classifications'] = stats
        self.metrics['total_beats'] = total_beats
        self.metrics['abnormal_percentage'] = 100 - stats.get('Normal', {'percentage': 0})['percentage']
    
    def calculate_hrv_metrics(self):
        """Calculate Heart Rate Variability metrics"""
        if self.r_peaks is None:
            print("No R-peaks detected. Please run detect_r_peaks first.")
            return False
        
        # Calculate RR intervals in seconds
        rr_intervals = np.diff(self.r_peaks) / self.sampling_rate
        
        # Time domain HRV metrics
        self.metrics['hrv'] = {}
        self.metrics['hrv']['mean_hr'] = 60 / np.mean(rr_intervals)
        self.metrics['hrv']['sdnn'] = np.std(rr_intervals) * 1000  # in ms
        self.metrics['hrv']['rmssd'] = np.sqrt(np.mean(np.square(np.diff(rr_intervals)))) * 1000  # in ms
        
        # Calculate pNN50 (percentage of successive RR intervals that differ by more than 50 ms)
        nn50 = sum(abs(np.diff(rr_intervals)) > 0.05)  # 0.05s = 50ms
        self.metrics['hrv']['pnn50'] = (nn50 / len(rr_intervals)) * 100 if len(rr_intervals) > 0 else 0
        
        return True
    
    def run_full_analysis(self):
        """Run the complete analysis pipeline"""
        if not self.load_fit_file():
            return False
        
        if not self.preprocess_ecg():
            return False
        
        if not self.detect_r_peaks():
            return False
        
        if not self.segment_heartbeats():
            return False
        
        if not self.classify_beats():
            return False
        
        if not self.calculate_hrv_metrics():
            return False
        
        return True
    
    def generate_report(self):
        """Generate a report with the analysis results"""
        if not self.metrics:
            print("No analysis results available. Please run the analysis first.")
            return
        
        print("\n===== EKG Analysis Report =====")
        
        # Basic information
        print(f"\nFile: {os.path.basename(self.file_path)}")
        print(f"Total Duration: {len(self.ecg_signal)/self.sampling_rate:.2f} seconds")
        print(f"Total Beats Analyzed: {self.metrics['total_beats']}")
        
        # Beat classifications
        print("\nBeat Classifications:")
        for classification, data in self.metrics['beat_classifications'].items():
            print(f"  {classification}: {data['count']} beats ({data['percentage']:.2f}%)")
        
        print(f"\nAbnormal Beats: {self.metrics['abnormal_percentage']:.2f}%")
        
        # HRV metrics
        print("\nHeart Rate Variability Metrics:")
        print(f"  Mean Heart Rate: {self.metrics['hrv']['mean_hr']:.2f} bpm")
        print(f"  SDNN: {self.metrics['hrv']['sdnn']:.2f} ms")
        print(f"  RMSSD: {self.metrics['hrv']['rmssd']:.2f} ms")
        print(f"  pNN50: {self.metrics['hrv']['pnn50']:.2f}%")
    
    def plot_ecg_with_classifications(self, save_path=None):
        """Plot the ECG signal with beat classifications"""
        if self.ecg_signal is None or self.r_peaks is None or self.beat_classifications is None:
            print("Missing data for plotting. Please run the full analysis first.")
            return
        
        plt.figure(figsize=(15, 8))
        
        # Plot the ECG signal
        time = np.arange(len(self.ecg_signal)) / self.sampling_rate
        plt.plot(time, self.ecg_signal, 'b-', alpha=0.5, label='ECG Signal')
        
        # Plot R-peaks with classification colors
        colors = {
            'Normal': 'green',
            'PVC': 'red',
            'PAC': 'orange',
            'LBBB': 'purple',
            'RBBB': 'brown',
            'APC': 'magenta'
        }
        
        for i, r_peak in enumerate(self.r_peaks):
            if i < len(self.beat_classifications):
                beat_class = self.beat_classifications[i]
                color = colors.get(beat_class, 'blue')
                plt.plot(r_peak/self.sampling_rate, self.ecg_signal[r_peak], 'o', 
                         color=color, markersize=8)
        
        # Create legend
        legend_elements = [plt.Line2D([0], [0], marker='o', color='w', 
                          markerfacecolor=color, markersize=8, label=classification)
                          for classification, color in colors.items()]
        
        plt.legend(handles=legend_elements, loc='upper right')
        plt.title('ECG Signal with Beat Classifications')
        plt.xlabel('Time (seconds)')
        plt.ylabel('Amplitude')
        plt.grid(True, alpha=0.3)
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"Plot saved to {save_path}")
        
        plt.show()




In [10]:
def main():
    """
    Main function to demonstrate the FitEKGAnalyzer
    """
    # Get file path from user
    file_path = input("Enter the path to your FIT file: ")
    
    # Create analyzer and run analysis
    analyzer = FitEKGAnalyzer(file_path)
    if analyzer.run_full_analysis():
        analyzer.generate_report()
        
        # Ask if user wants to save a plot
        save_plot = input("Do you want to save a plot of the ECG with classifications? (y/n): ")
        if save_plot.lower() == 'y':
            plot_path = input("Enter the path to save the plot (or press Enter for default): ")
            if not plot_path:
                plot_path = os.path.splitext(file_path)[0] + "_ecg_analysis.png"
            analyzer.plot_ecg_with_classifications(save_path=plot_path)
        else:
            analyzer.plot_ecg_with_classifications()
    else:
        print("Analysis failed. Please check the file and try again.")


if __name__ == "__main__":
    main()


Enter the path to your FIT file:  /Users/emccullough/Downloads/Activity_on_20250403_082416_by_Etienne_5010176_8cf6812e7faf_FITFILE.fit


4548
Error loading FIT file: No ECG/EKG data found in the FIT file
Analysis failed. Please check the file and try again.


In [11]:
#/Users/emccullough/Downloads/Activity_on_20250403_082416_by_Etienne_5010176_8cf6812e7faf_FITFILE.fit
