# Preprocess the MESA Dataset

The MESA dataset is an extensive collection of sleep-related data. This notebook is designed to process the downloaded MESA dataset, extract the required information, and store it in a processed format for easier analysis.

## Source of Dataset
You can obtain the MESA dataset from [sleepdata.org](https://sleepdata.org/datasets/mesa). Please note that approval is required before you can download the data.

## Data Structure and Processing Steps

### 1. Data Locations:
- **EDF Files (for HR)**: `data/mesa/polysomnography/edfs/`
- **Activity CSV Files**: `data/mesa/actigraphy/`
- **Sleep Scoring XML Files**: `data/mesa/polysomnography/annotations-events-nsrr/`

### 2. Processed Data Storage:
The processed data, along with the relevant timestamps, will be stored in the `data/mesa/processed/` directory. Three files will be generated for each subject.

### 3. Naming Convention for Processed Files:
- **Activity Count**: `activity_min_len-{fileid}.csv`
- **HR**: `hr_min_len-{fileid}.csv`
- **Sleep Scoring**: `sleep_min_len-{fileid}.csv`


## Constants

In [None]:
# Constants used throughout the data processing workflow

EDF_FOLDER_PATH = 'data/mesa/polysomnography/edfs/'
PSG_FOLDER_PATH = 'data/mesa/polysomnography/annotations-events-nsrr/'
ACTIVITY_FOLDER_PATH = 'data/mesa/actigraphy/'
OVERLAY_FILE_PATH = 'data/mesa/overlap/mesa-actigraphy-psg-overlap.csv'
EDF_FILE_PATTERN = 'mesa-sleep-{}.edf'
PSG_FILE_PATTERN = 'mesa-sleep-{}-nsrr.xml'
ACTIVITY_FILE_PATTERN = 'mesa-sleep-{}.csv'
EPOCH_DURATION_IN_SECONDS = 30
PROCESSED_FOLDER_PATH_PREFIX = 'data/mesa/processed/'


## Utility Functions

In [None]:

# Utility functions assist in various tasks such as:
# - Extracting numeric IDs from filenames.
# - Finding common IDs across multiple directories.
import csv
import numpy as np
import logging
import re
import os
import pyedflib


# Setting up logging
logging.basicConfig(level=logging.INFO)

# Constants for clarity
OFF_WRIST = '1'
EXCLUDED = 'excluded'

def read_activity(actigraphy_file_path, file_id):
    logging.info(f"Trying to read file {actigraphy_file_path}")
    overlap_csv_path = OVERLAY_FILE_PATH

    line_align = find_line_alignment(file_id, overlap_csv_path)

    if line_align == -1:
        logging.warning(f"Line alignment not found for file id {file_id}")
    else:
        logging.info(f"Line alignment found for file id {file_id} and number is {line_align}")

    activity_data = extract_activity(line_align, actigraphy_file_path)
    activity_data = clean_data(activity_data)
    return activity_data

def read_csv_file(file_path, start_from_second_row=True):
    with open(file_path, 'r') as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        if start_from_second_row:
            next(csv_reader)
        return list(csv_reader)

def find_line_alignment(file_id, overlap_csv_path):
    rows = read_csv_file(overlap_csv_path)
    for row in rows:
        if int(row[0]) == int(file_id):
            return int(row[1])
    return -1

def extract_activity(line_align, actigraphy_csv_path):
    rows = read_csv_file(actigraphy_csv_path)
    elapsed_time_counter = 0
    activity = []

    for row in rows:
        if row[3] == OFF_WRIST or row[11] == EXCLUDED:
            continue
        if int(row[1]) >= line_align:
            activity_value = np.nan if row[4] == '' else float(row[4])
            activity.append([elapsed_time_counter, activity_value])
            elapsed_time_counter += EPOCH_DURATION_IN_SECONDS

    activity_array = np.array(activity)
    if activity_array.size == 0:
        logging.warning(f"Warning: Empty activity data for file: {actigraphy_csv_path}")
        return np.array([[0, np.nan]])
    return activity_array

def clean_data(array):
    # Only remove rows with NaN values in the second column
    valid_rows = ~np.isnan(array[:, 1])
    array = array[valid_rows]
    
    # Remove rows containing infinity values (if any)
    array = array[~np.isinf(array).any(axis=1)]
    return array

def get_common_ids(folder_paths):
        """
        Finds the numeric IDs that are common across all given folders.
        
        Parameters:
        - folder_paths (list): A list of folder paths to compare.
        
        Returns:
        - common_ids (set): A set of numeric IDs that are common to all folders.
        """
        common_ids = None
        
        for folder in folder_paths:
            # List all files in the folder
            filenames = os.listdir(folder)
            
            # Extract numeric IDs from the filenames
            ids_in_current_folder = {get_numeric_id(filename) for filename in filenames}
            
            if common_ids is None:
                # For the first folder, set the common IDs set as the IDs in this folder
                common_ids = ids_in_current_folder
            else:
                # Find the intersection with the IDs from this folder
                common_ids = common_ids.intersection(ids_in_current_folder)
                
        return common_ids

def get_numeric_id(filename):
        """
        Extracts the numeric ID from a filename using regular expressions.
        """
        match = re.search(r'\d+', filename)
        return match.group(0) if match else None


## Main Processing Functions

In [None]:

# Main functions for processing different aspects of the MESA dataset:
# - HR Data Processing
# - PSG Data Processing
# - Activity Data Processing
import os
import csv
import numpy as np
import logging

# Setting up logging
logging.basicConfig(level=logging.INFO)

def process_subjects(common_ids):
    for file_id in common_ids:
        # Check if files exist
        if files_already_processed(file_id):
            logging.info(f"Files for file id {file_id} already processed. Skipping...")
            continue
        # HR Data
        hr_data = read_hr(os.path.join(EDF_FOLDER_PATH, EDF_FILE_PATTERN.format(file_id)))
        # PSG Data
        psg_data = read_psg(os.path.join(PSG_FOLDER_PATH, PSG_FILE_PATTERN.format(file_id)),
                                     EPOCH_DURATION_IN_SECONDS)
        # Activity Data
        activity_data = read_activity(os.path.join(ACTIVITY_FOLDER_PATH, ACTIVITY_FILE_PATTERN.format(file_id)),
                                                    file_id)
        # Generate output files
        output_folder_prefix = PROCESSED_FOLDER_PATH_PREFIX
        write_data_with_min_len(file_id, activity_data, hr_data, psg_data, output_folder_prefix)

def read_hr(file_path):
    """
    Reads heart rate data from an EDF file.

    Parameters:
        file_path (str): The full path to the EDF file.

    Returns:
        np.ndarray: A 2D array where the first column is time in seconds, 
        and the second column is the heart rate data, or None if there was an error.
    """
    try:
        with pyedflib.EdfReader(file_path) as edf_file:
            signal_labels = edf_file.getSignalLabels()
            if 'HR' not in signal_labels:
                logging.warning("HR signal not found in this file.")
                return None

            hr_index = signal_labels.index('HR')
            sample_frequencies = edf_file.getSampleFrequencies()
            hr = edf_file.readSignal(hr_index)
            fs = sample_frequencies[hr_index]

        time_hr = np.arange(len(hr)) / fs
        data = np.column_stack((time_hr, hr))
        data = data[data[:, 1] != 0]
        
        return data

    except FileNotFoundError:
        logging.error(f"File not found: {file_path}")
        return None
    except Exception as e:
        logging.error(f"An error occurred while processing the file {file_path}: {e}")
        return None

def read_psg(file_path, epoch_duration_in_seconds):
    logging.info(f"Reading PSG data from: {file_path}")
    xml_root = parse_xml_file(file_path)
    if xml_root is None:
        logging.error(f"Failed to parse XML from: {file_path}")
        return []

    scored_events = extract_scored_events(xml_root)
    stages = process_staged_windows(scored_events, epoch_duration_in_seconds)
    return stages

def files_already_processed(file_id):
    activity_output_path = get_output_path('activity', file_id)
    hr_output_path = get_output_path('hr', file_id)
    sleep_output_path = get_output_path('sleep', file_id)
    return os.path.exists(activity_output_path) and os.path.exists(hr_output_path) and os.path.exists(sleep_output_path)

def get_output_path(data_type, file_id):
    return os.path.join(PROCESSED_FOLDER_PATH_PREFIX, f'{data_type}_min_len_{file_id}.csv')

def determine_min_len(activity_data, hr_data, sleep_data):
    hr_data_len_30s_equiv = hr_data // 30  # Convert HR data length to its 30-second equivalent
    return int(min(activity_data, hr_data_len_30s_equiv, sleep_data))

def write_data_with_min_len(file_id, activity_data, hr_data, sleep_data, output_folder_prefix):
    # Create the directory if it doesn't exist, relative to the current working directory
    full_path = os.path.join(os.getcwd(), PROCESSED_FOLDER_PATH_PREFIX)
    if not os.path.exists(full_path):
        os.makedirs(full_path)  
    # Check if the activity_data only contains NaNs and skip if it does
    if np.isnan(activity_data).all():
        logging.warning(f"No valid activity data detected for file id: {file_id}. Skipping...")
        return
    min_len = determine_min_len(activity_data[-1][0], hr_data[-1][0], sleep_data[-1][0])
    hr_min_len = min_len * 30  # Adjusting for the HR data since it's in seconds
    _write_to_csv(activity_data[:min_len],
                  os.path.join(output_folder_prefix, f'activity_min_len_{file_id}.csv'),
                  ['Elapsed Time', 'Activity Value'])
    _write_to_csv(hr_data[:hr_min_len],
                  os.path.join(output_folder_prefix, f'hr_min_len_{file_id}.csv'),
                  ['Time (s)', 'Heart Rate'])
    _write_to_csv(sleep_data[:min_len],
                  os.path.join(output_folder_prefix, f'sleep_min_len_{file_id}.csv'),
                  ['Timestamp', 'Stage Value'])

def _write_to_csv(data, file_path, headers):
    with open(file_path, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)
        writer.writerows(data)


## Execution

In [None]:

# Main execution flow to process the MESA data
import xml.etree.ElementTree as ET
import logging

# Setting up logging
logging.basicConfig(level=logging.INFO)

# Constants for stage mapping
STAGE_TO_NUM = {'Wake': 0, 'REM sleep': 5, 'Stage 1 sleep': 1, 'Stage 2 sleep': 2, 'Stage 3 sleep': 3, 'Stage 4 sleep': 4}  

def read_psg(file_path, epoch_duration_in_seconds):
    logging.info(f"Reading PSG data from: {file_path}")
    xml_root = parse_xml_file(file_path)
    if xml_root is None:
        logging.error(f"Failed to parse XML from: {file_path}")
        return []

    scored_events = extract_scored_events(xml_root)
    stages = process_staged_windows(scored_events, epoch_duration_in_seconds)
    return stages

def parse_xml_file(file_path):
    try:
        tree = ET.parse(file_path)
        return tree.getroot()
    except ET.ParseError as e:
        logging.error(f"Error parsing XML file {file_path}: {e}")
        return None
    
def extract_scored_events(root_element):
    stage_data = []
    for scored_event in root_element.findall('.//ScoredEvent'):
        duration = float(scored_event.find('Duration').text)
        start = float(scored_event.find('Start').text)
        stage = scored_event.find('EventConcept').text.split('|')[0]
        
        if stage in STAGE_TO_NUM:
            stage_data.append([STAGE_TO_NUM[stage], start, duration])
    return stage_data
    
def process_staged_windows(stage_data, epoch_duration_in_seconds):
    stages = []
    for staged_window in stage_data:
        elapsed_time_counter = 0
        stage_value = staged_window[0]
        duration = staged_window[2]
        start = staged_window[1]

        while elapsed_time_counter < duration:
            timestamp = start + elapsed_time_counter
            stages.append([timestamp, stage_value])
            elapsed_time_counter += epoch_duration_in_seconds
    return stages

def process_mesa_data():
    folder_paths = [EDF_FOLDER_PATH, PSG_FOLDER_PATH, ACTIVITY_FOLDER_PATH]
    common_ids = get_common_ids(folder_paths)
    sorted_common_ids = sorted(common_ids, key=int)
    process_subjects(sorted_common_ids)    
    print(f"Total number of subjects: {len(sorted_common_ids)}")


process_mesa_data()

