# Preprocess subject data

In [5]:
import sys
import os
import logging
import ipyparallel as ipp
from src.utils.config import Config
import src.configs.config as configs
import src.preprocessing.utils as pre_utils
from src.preprocessing.classes import Subject

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Add src path
src_path = os.path.abspath("../../src")
sys.path.append(src_path)

# Load in subject IDs and paths from configuration
config = Config.from_json(configs.CFGLog)

# Define subject IDs
subject_ids = {
    "chronic_low_back_pain": config.data.chronic_low_back_pain.subject_ids.CP + config.data.chronic_low_back_pain.subject_ids.HC,
    "chronic_pancreatitis": config.data.chronic_pancreatitis.subject_ids.CP,
    "lupus": config.data.lupus.subject_ids.CP + config.data.lupus.subject_ids.NP
}

# CLBP processed path
clbp_processed_path = config.data.chronic_low_back_pain.processed_path

# Define preprocessing parameters
PERISTIM_TIME_WIN = 5
times_tup, time_win_path = pre_utils.get_time_window(PERISTIM_TIME_WIN)
time_range = (times_tup[0], times_tup[-1])

# Log total subjects
total_subjects = sum(len(ids) for ids in subject_ids.values())
logger.info(f"Total subjects: {total_subjects}")
logger.info(subject_ids)

# Function to preprocess each subject
def preprocess_subject(sub_id, group, clbp_processed_path, time_range, PERISTIM_TIME_WIN):
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    try:
        preprocessed_data_path = clbp_processed_path if group == "chronic_low_back_pain" else None
        subject = Subject(sub_id, group, preprocessed_data_path=preprocessed_data_path)
        logger.info(f"Processing subject: {sub_id} in group: {group}")
        subject.preprocess()
        subject.get_cleaned_epochs(time_range, PERISTIM_TIME_WIN)
        logger.info(f"Completed processing subject: {sub_id} in group: {group}")
    except FileNotFoundError as e:
        logger.error(f"FileNotFoundError for subject: {sub_id} - {e}")
    except Exception as e:
        logger.error(f"An error occurred for subject: {sub_id} - {e}")

# Connect to IPyParallel cluster
rc = ipp.Client()
dview = rc[:]

# Add src path to sys.path on all engines
dview.execute(f"sys.path.append('{src_path}')")

# Sync necessary imports on all engines
with dview.sync_imports():
    from src.preprocessing.classes import Subject
    from src.preprocessing.utils import get_time_window
    from src.utils.config import Config
    import logging

# Push necessary configurations and functions to all engines
dview.push({
    'config': config,
    'clbp_processed_path': clbp_processed_path,
    'time_range': time_range,
    'PERISTIM_TIME_WIN': PERISTIM_TIME_WIN,
    'preprocess_subject': preprocess_subject
})

# Map the function to the subjects in parallel
results = []
for group in subject_ids:
    results += dview.map(
        preprocess_subject, 
        subject_ids[group], 
        [group] * len(subject_ids[group]), 
        [clbp_processed_path] * len(subject_ids[group]), 
        [time_range] * len(subject_ids[group]), 
        [PERISTIM_TIME_WIN] * len(subject_ids[group])
    )

# Wait for results to complete
rc.wait_interactive()


2024-07-12 16:29:22,415 - __main__ - INFO - Total subjects: 64
2024-07-12 16:29:22,415 - __main__ - INFO - {'chronic_low_back_pain': ['018', '022', '024', '031', '032', '034', '036', '039', '040', '045', '046', '052', '020', '021', '023', '029', '037', '041', '042', '044', '048', '049', '050', '056', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C2.', 'C24', 'C25', 'C26', 'C27', 'C3.', 'C6.', 'C7.', 'C9.'], 'chronic_pancreatitis': ['002', '003', '006', '007', '008', '009', '010', '011', '012', '013', '014'], 'lupus': ['5186', '6310', '5295', '5873', '6100', '6106', '5648', '5675', '5845', '5713']}


[-2.5,0.0,2.5]
importing Subject from src.preprocessing.classes on engine(s)
importing get_time_window from src.preprocessing.utils on engine(s)
importing Config from src.utils.config on engine(s)
importing logging on engine(s)


unknown:   0%|          | 0/1 [00:00<?, ?tasks/s]

In [17]:
# inspect log
logger.info(results)

2024-07-12 19:58:23,666 - __main__ - INFO - [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]


Process each subject in sequence