# 001 Preprocess ECG

### Purpose
Purpose of this notebook is to preprocess the raw ECG data for the 'We Love Reading' study.

- Neurokit2 package with a custom pipeline will be used for preprocessing
- Segmentation is performed afterwards
- QA figures will be exported for every partipant
- The processed data will be exported


### Input / Output
- Input: `~/data/raw`
- Outputs:
  - QA visualizations: `~/reports/ECG QA`
  - Processed data: `~/data/processed`

### Imports

In [1]:
# fmt: off
from typing import Union, Tuple, Dict, Optional, List
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import importlib
import nk_pipeline
import neurokit2 as nk
importlib.reload(nk)
importlib.reload(nk_pipeline)
import warnings
import yaml
# fmt:on


## Parameters

#### IO

In [2]:
WORKING_DIR = Path().cwd()
ROOT_DIR = WORKING_DIR.parent
DATA_DIR = ROOT_DIR / 'data'
RAW_DATA_DIR = DATA_DIR / 'raw'
PROCESSED_DATA_DIR = DATA_DIR / 'processed'
PROCESSED_DATA_DIR.mkdir(parents=False, exist_ok=True)
REPORTS_DIR = ROOT_DIR / 'reports'
REPORTS_DIR.mkdir(parents=False, exist_ok=True) 
QA_REPORTS_DIR = REPORTS_DIR / 'QA'
QA_REPORTS_DIR.mkdir(parents=False, exist_ok=True) 

#### Pipeline config

The pipeline config below is the standard config that will be applied to every dyad unless not otherwise specified using (e.g., `configure_params()` support function)

In [3]:
pipeline_params = {
    'general': {
        'sampling_frequency': 500,
        'analysis_window_seconds': 30,
        'compute_hrv_frequency_metrics': False
    },
    'cleaning': {
        'method': 'neurokit',
        'powerline': None  # or 60 or None
    },
    'peak_detection': {
        'method': 'neurokit',
        'correct_artifacts': True
    },
    'signal_quality_index': {
        'method': 'averageQRS',
        'approach': 'simple'
    },
    'hrv_frequency_settings': {
        'ulf': [0, 0.0033], # The spectral power of ultra low frequencies
        'vlf': [0.0033, 0.04], # The spectral power of very low frequencies
        'lf': [0.04, 0.15], # The spectral power of low frequencies
        'hf': [0.15, 0.4],
        'vhf': [0.4, 0.5], # The spectral power of very high frequencie
        'psd_method': 'welch',
        'normalize': True
    }
}

## Support Functions

In [4]:
def configure_params(subject_id: str, pipeline_params: Dict) -> Dict:
    """
    Configures child and mother parameters based on the subject ID.

    Args:
        subject_id (str): The ID of the subject being processed.
        pipeline_params (Dict): The base dictionary containing default pipeline parameters.

    Returns:
        Tuple[Dict, Dict]: A tuple containing customized dictionaries for child_params and mother_params.
    """
    # Create deep copies to avoid modifying the original pipeline_params
    child_params = pipeline_params.copy()
    mother_params = pipeline_params.copy()
    
    # Customize parameters based on subject_id
    # if subject_id == 8:
    #     child_params['cleaning'].update({"powerline": 60})
    # elif subject_id == "4":
    #     mother_params['general'].update({"key2": "value2"})
    
    # Add more conditions for other subject IDs as needed

    return child_params, mother_params

def process_data_in_parallel(data_filepaths: List[Union[str, Path]], 
                             pipeline_params: Dict, 
                             processed_data_dir: Union[str, Path], 
                             qa_reports_dir: Union[str, Path]) -> None:
    """
    Processes data files in parallel using multithreading, with error handling for exceptions.

    Args:
        data_filepaths (List[Union[str, Path]]): List of file paths for data files to process.
        pipeline_params (Dict): Dictionary containing the base pipeline parameters.
        processed_data_dir (Union[str, Path]): Directory for saving processed data.
        qa_reports_dir (Union[str, Path]): Directory for saving quality assurance (QA) reports.

    Returns:
        None
    """
    processed_data_dir = Path(processed_data_dir)
    qa_reports_dir = Path(qa_reports_dir)

    def process_single_subject(filepath: Union[str, Path]):
        try:
            print(f"Processing {filepath}")
            filepath = Path(filepath)
            subject_id, condition = nk_pipeline.extract_subject_id_condition_from_filepath(filepath)
            
            # Configure parameters based on subject_id
            child_params, mother_params = configure_params(subject_id, pipeline_params)
            
            # Process each subject data file
            nk_pipeline.process_subject(
                filepath, 
                child_params, 
                mother_params, 
                processed_data_dir, 
                qa_reports_dir
            )
            print(f"Successfully processed {filepath}")
        except Exception as e:
            print(f"Error processing {filepath}: {e}")
            raise  # Re-raise the exception to capture it in the future

    # Use ThreadPoolExecutor to process each subject in parallel
    with ThreadPoolExecutor() as executor:
        futures = {executor.submit(process_single_subject, filepath): filepath for filepath in data_filepaths}
        
        for future in as_completed(futures):
            filepath = futures[future]
            try:
                future.result()  # Retrieve the result to trigger any exceptions
            except Exception as e:
                print(f"Error occurred for {filepath}: {e}")


# Data

Extract data filepaths for the raw data

In [5]:
data_filepaths = list(RAW_DATA_DIR.glob('*mc.txt'))
[print(f) for f in data_filepaths];
[print(nk_pipeline.extract_subject_id_condition_from_filepath(f)) for f in data_filepaths];

/Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/C08_mc.txt
/Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/B08_mc.txt
/Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/B32_mc.txt
/Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/C32_mc.txt
/Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/W32_mc.txt
/Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/W08_mc.txt
(8, 'C')
(8, 'B')
(32, 'B')
(32, 'C')
(32, 'W')
(8, 'W')


# Analyze

In [6]:
process_data_in_parallel(data_filepaths, pipeline_params, PROCESSED_DATA_DIR, QA_REPORTS_DIR)

Processing /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/C08_mc.txt
Processing /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/B08_mc.txt
Processing /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/B32_mc.txt
Processing /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/C32_mc.txt
Processing /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/W32_mc.txt
Processing /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/W08_mc.txt


  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  out["RMSSD"] = np.sqrt(np.nanmean(diff_rri**2))
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,


Successfully processed /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/B32_mc.txt
Successfully processed /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/C32_mc.txt
Successfully processed /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/C08_mc.txt
Successfully processed /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/W32_mc.txt


  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  out["RMSSD"] = np.sqrt(np.nanmean(diff_rri**2))
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,


Successfully processed /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/B08_mc.txt
Successfully processed /Users/lukasspiess/Library/CloudStorage/OneDrive-SpiessSolution/Documents - We Love Reading/Welcome/data/raw/W08_mc.txt


In [7]:
# file = data_filepaths[1]
# process_subject(file, pipeline_params, pipeline_params, PROCESSED_DATA_DIR, QA_REPORTS_DIR)