# MIMIC-III WFDB Extraction

[1] 에서 추출한 `record_names` 를 가져오고, 그 중에서 본 연구의 요구사항에 부합하는 환자들을 추려낸다.
본 연구의 요구사항은 이하와 같다:
- `24h ~ 48h` 사이의 연속적인 데이터
- `125Hz` 의 `ABP` 와 `PPG` 를 포함한 생물학적 신호
- MIMIC DB 의 45 명의 기록 + MIMIC WFDB 의 55 명의 기록 = 총 100 명의 ABP, PPG 신호를 가진 데이터
    - 위에 데이터 중 45 명의 기록은 MIMIC CDB 상에 위치한 데이터를 말하는 건가?
    - 아니면 MIMIC WFDB 에 대응하여 존재하는 CDB 의 환자 수가 45 명이고 이에 대응되지 않는 환자가 55 명이라는 뜻인가?

**References**
- [1] `/root/Workspace/Reproduction-An-Estimation-method-of-Continuous-Non-Invasive-ABP-Waveform-using-PPG/src/00_mimiciii_dataset.ipynb`

In [1]:
import os
import re
import pickle
import gzip
import numpy as np
import asyncio
import multiprocessing
import wfdb
import matplotlib.pyplot as plt
import logging
from datetime import date

%matplotlib inline
plt.style.use('ggplot')

log_format = '%(levelname)s %(asctime)s - %(message)s'
logging.basicConfig(
    filename=f'/root/Workspace/ppg2bp/Reproduction-An-Estimation-method-of-Continuous-Non-Invasive-ABP-Waveform-using-PPG/resources/logs/arca-{date.today().strftime("%b-%d-%Y")}.log',
    filemode='w',
    format=log_format,
    level=logging.DEBUG
)
logger = logging.getLogger()

CDB_PATH = '/root/Workspace/DataLake/mimic-iii_cdb/files/mimiciii/1.4/'
WFDB_PATH = '/root/Workspace/DataLake/mimic-iii_wfdb/'

logger.debug('Initiate 10_mimiciii_extract_dataset.ipynb')

In [2]:
with gzip.open('/root/Workspace/DataWarehouse/mimic-iii_wfdb/all_patient_records.pickle.gzip', 'rb') as f:
    record_names = pickle.load(f)

# verification
sum([len(record_names[i]) for i in range(len(record_names))])

10282

In [3]:
# untangled list 
new_record_names = list(map(lambda recs: np.concatenate(np.asarray(recs, dtype=object), axis=0), record_names))
# NUMERIC data 는 굳이 볼 필요는 없다.
sep_signal_regex = re.compile('/root/Workspace/DataLake/mimic-iii_wfdb/p[0-9]{2}/p[0-9]{6}/p[0-9]{6}-[0-9]{4}[0-9\-]+.hea')
waveform_record_names = [list(filter(lambda rec: sep_signal_regex.match(rec) != None, new_record_names[i])) for i in range(len(new_record_names))]

In [4]:
print(len(waveform_record_names))
waveform_record_names[0][0:10]

10


['/root/Workspace/DataLake/mimic-iii_wfdb/p06/p064100/p064100-2191-02-24-16-30.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p065490/p065490-2175-07-06-16-28.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p065490/p065490-2175-07-06-12-21.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p068543/p068543-2174-02-20-14-13.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p067154/p067154-2138-04-23-15-03.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p067154/p067154-2138-04-08-08-33.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p067154/p067154-2138-04-13-17-59.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p061949/p061949-2162-11-08-17-49.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p061949/p061949-2162-11-07-08-10.hea',
 '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p067651/p067651-2182-05-11-00-23.hea']

### Asyncio approach

In [5]:
async def get_record2(record_name, expression):
    loop = asyncio.get_running_loop()
    meta_regex = re.compile(expression)
    result = {
        'record_name': None,
        'subject_id': None,
        'base_date':  None,
        'sig_name':  None,
        'sig_len': None 
    }

    try:
        _, fields = await loop.run_in_executor(None, wfdb.rdsamp, record_name[:-4], 0, 1) # 지금 당장은 Signal value 가 필요없다.
        query_res = meta_regex.search(record_name)
        if all([signal_name in fields['sig_name'] for signal_name in ['PLETH', 'ABP']]):
            # record_name
            logger.debug(f'{query_res.groups()[0]} {fields["base_date"]} {fields["sig_name"]} saved!')
            result['record_name'] = record_name
            result['subject_id'] = query_res.groups()[0] 
            result['base_date'] = fields["base_date"]
            result['sig_name'] = fields["sig_name"]
            result['sig_len'] = fields["sig_len"]
        else:
            result = None
    except Exception as e:
        logger.debug(f'{e}')
        result = None

    return result 


async def runnable(waveform_record_name):
    tasks = [asyncio.create_task(get_record2(record_name, '([0-9]{6})-([0-9]{4}-[0-9]{2}-[0-9]{2})')) for record_name in waveform_record_name]
    results = await asyncio.gather(*tasks)
    return results

In [6]:
results = []
for i in range(len(waveform_record_names)):
    results.extend(await runnable(waveform_record_names[i]))

results = list(filter(lambda x: x!=None, results))

In [7]:
results

[{'record_name': '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p067154/p067154-2138-04-13-17-59.hea',
  'subject_id': '067154',
  'base_date': datetime.date(2138, 4, 13),
  'sig_name': ['II', 'III', 'AVR', 'V', 'MCL', 'RESP', 'PLETH', 'ABP', 'CVP'],
  'sig_len': 1},
 {'record_name': '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p069082/p069082-2180-09-21-12-07.hea',
  'subject_id': '069082',
  'base_date': datetime.date(2180, 9, 21),
  'sig_name': ['II', 'III', 'AVR', 'V', 'RESP', 'PLETH', 'ABP', 'ICP'],
  'sig_len': 1},
 {'record_name': '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p068425/p068425-2154-01-13-16-43.hea',
  'subject_id': '068425',
  'base_date': datetime.date(2154, 1, 13),
  'sig_name': ['I',
   'II',
   'III',
   'AVR',
   'V',
   'RESP',
   'PLETH',
   'ABP',
   'CVP',
   'ICP'],
  'sig_len': 1},
 {'record_name': '/root/Workspace/DataLake/mimic-iii_wfdb/p06/p068116/p068116-2123-02-03-18-32.hea',
  'subject_id': '068116',
  'base_date': datetime.date(2123, 2, 3),
  'sig_n

#### Save result by Pickle (feat. gzip)

In [8]:
# save and compress
with gzip.open('/root/Workspace/DataWarehouse/mimic-iii_wfdb/10_extracted_patient_records.pickle.gzip', 'wb') as f:
    pickle.dump(results, f)