subjectlist 처음뽑고, 삽관발관 데이터 뽑기

In [17]:
import psycopg2
from dfply import *
import pandas as pd
import logging
import sys
import os
from pathlib import Path
from sshtunnel import SSHTunnelForwarder
from sqlalchemy import create_engine

# 현재 노트북 파일의 상위 디렉토리에 있는 src 디렉토리의 경로를 sys.path에 추가
module_path = Path('../src').resolve()
if module_path not in sys.path:
    sys.path.append(str(module_path))

# Logging config
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 소스코드(src)
import src.data_extraction.access_database as db
import src.data_extraction.filter_adult_patients as fap
import src.data_extraction.filter_ventilation_events as fve
from src.utils import utils


### 1. DB 연결

#### 1-1. 각종 파라미터 설정

#### 1-2. DB 연결, 데이터 추출

In [18]:
# 데이터 저장위치
output_dir = './data'


# 데이터베이스 연결 설정을 하나의 딕셔너리로 통합
db_config = {
    'database': 'mimiciv',
    'user': 'mai_onlyselect',
    'password': 'student1q2w!@',
    'host': '1.212.63.162',
    'port': '35430'
}

# 호흡기 ITEM ID 설정
vent_ids_config = {
    'INTUBATION_ITEM_IDS': "224385",
    'EXTUBATION_ITEM_IDS': "225468, 225477, 227194"
}

# 테이블 쿼리 설정
tables_query = {
    'pg_tables': 'SELECT * FROM PG_TABLES;',
    'patients': 'SELECT * FROM mimiciv_hosp.patients;',
    'admissions': 'SELECT * FROM mimiciv_hosp.admissions;',
    'transfers': 'SELECT * FROM mimiciv_hosp.transfers;',
    'icustays': 'SELECT * FROM mimiciv_icu.icustays;',
    'd_items': 'SELECT * FROM mimiciv_icu.d_items;',
    'intubation': f"SELECT * FROM mimiciv_icu.procedureevents WHERE itemid IN ({vent_ids_config['INTUBATION_ITEM_IDS']});",
    'extubation': f"SELECT * FROM mimiciv_icu.procedureevents WHERE itemid IN ({vent_ids_config['EXTUBATION_ITEM_IDS']});",
    'ventilation': 'SELECT * FROM mimiciv_derived.ventilation;'
}

# 데이터베이스 연결 함수
def connect_to_database(config):
    try:
        conn = psycopg2.connect(**config)
        print("Connected to the database successfully!")
        return conn
    except (psycopg2.Error, Exception) as error:
        print("Error while connecting to the database:", error)
        return None

# 데이터베이스 연결
conn = connect_to_database(db_config)

if conn is not None:
    # 커서 생성
    cur = conn.cursor()

    # 데이터베이스 작업 수행
    dataframes = db.retrieve_data(conn, tables_query)

    # PostgreSQL 버전 확인
    cur.execute("SELECT version();")
    version = cur.fetchone()
    print("PostgreSQL version:", version)

    # 연결 종료는 필요한 시점에 수행
    # cur.close()
    # conn.close()
else:
    print("Failed to connect to the database.")

2024-10-03 13:48:20,224 - INFO - Retrieved pg_tables: (164, 8)


Connected to the database successfully!


2024-10-03 13:48:22,206 - INFO - Retrieved patients: (299712, 6)
2024-10-03 13:48:33,979 - INFO - Retrieved admissions: (431231, 16)
2024-10-03 13:49:00,875 - INFO - Retrieved transfers: (1890972, 7)
2024-10-03 13:49:02,638 - INFO - Retrieved icustays: (73181, 8)
2024-10-03 13:49:02,713 - INFO - Retrieved d_items: (4014, 9)
2024-10-03 13:49:03,047 - INFO - Retrieved intubation: (8488, 22)
2024-10-03 13:49:03,809 - INFO - Retrieved extubation: (23122, 22)
2024-10-03 13:49:04,990 - INFO - Retrieved ventilation: (109200, 4)


PostgreSQL version: ('PostgreSQL 15.7 (Ubuntu 15.7-1.pgdg22.04+1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0, 64-bit',)


In [19]:
dataframes.keys()   # 메모리에 저장된 데이터 확인

dict_keys(['pg_tables', 'patients', 'admissions', 'transfers', 'icustays', 'd_items', 'intubation', 'extubation', 'ventilation'])

In [20]:
# 데이터프레임 변환
patients = dataframes['patients']
admissions = dataframes['admissions']
intubation_all = dataframes['intubation']
extubation_all = dataframes['extubation']
icustays = dataframes['icustays']
ventilation = dataframes['ventilation']

print(f'patients: {patients.shape}')
print(f'admissions: {admissions.shape}')
print(f'intubation_all: {intubation_all.shape}')
print(f'extubation_all: {extubation_all.shape}')
print(f'icustays: {icustays.shape}')
print(f'ventilation: {ventilation.shape}')

patients: (299712, 6)
admissions: (431231, 16)
intubation_all: (8488, 22)
extubation_all: (23122, 22)
icustays: (73181, 8)
ventilation: (109200, 4)


### 2. 데이터 처리
- 2-1. 환자 정보 정제
- 2-2. 호흡기(삽관/발관) 정보 정제

#### 2.1. 환자 정보 필터링
- 성인 환자 (anchor age >= 18) >>

    - 병원 입원(hadm_id) 정보가 있는 환자 >>
    
        -  중환자실 입원(stay_id) 정보가 있는 환자

In [21]:
## 데이터 처리: 응급병동? 환자 데이터 (filter_adult_patients)
# 성인환자 데이터 필터링(선정제외조건)
adults_pat = fap.filter_adult_patients(patients)   # 18세 이상 필터링
adults_hadm = fap.merge_patient_admissions(adults_pat, admissions)   # patient, admissions 테이블 결합
adults_hadm = fap.remove_missing_hadm(adults_hadm)   # 입원정보(hadm_id) 없는 행 삭제
adults_icu = fap.merge_with_icu(adults_hadm, icustays)   # icu (응급병동?) 테이블 결합
adults_icu = fap.remove_missing_icu_stays(adults_icu)   # icu 입원정보(stay_id) 없는 행 삭제

print(f'Processed adults_icu: {adults_icu.shape}')   # (73181, 13)
print(adults_icu.columns)

Number of adult patients retrieved: 299712
Processed adults_icu: (73181, 13)
Index(['subject_id', 'gender', 'anchor_age', 'hadm_id', 'admittime',
       'dischtime', 'deathtime', 'stay_id', 'first_careunit', 'last_careunit',
       'intime', 'outtime', 'los'],
      dtype='object')


In [22]:
# 데이터 요약
adult_hadm_count = adults_hadm.subject_id.nunique()
adults_icu_count = adults_icu.subject_id.nunique()

print(f'adult patients with hospital admission history: {adult_hadm_count}')
print(f'adult patients with ICU history: {adults_icu_count}')

adult patients with hospital admission history: 180733
adult patients with ICU history: 50920


#### 2.2. 삽관/발관 데이터 정제
- 먼저 삽관 테이블, 발관 테이블 따로 처리 (함수: filter_and_label_ventilation_data, filter_close_events)
    - 필요한 칼럼 가져오기: "subject_id", "hadm_id", "stay_id", "starttime", "itemid", "patientweight"
    - (발관 테이블) extubation cause 라벨 붙여주기
    - 근접행 제거하기 (additional_config 변수의 'TIME_DIFF_DUP' 파라미터 값 참조)
- 삽관/발관 테이블 결합해주기 (함수: join_ventilation_and_rename, join_admissions)
    - 중복되는 이름 변경해주기
    - 환자 정보와 결합해주기

In [23]:
## 데이터 처리: 삽관/발관 데이터 (filter_ventilation_events)
# 삽관/발관 데이터 필터링 및 처리
intubation_data = fve.filter_and_label_ventilation_data(intubation_all, 'intubationtime', 'intubation')
extubation_data = fve.filter_and_label_ventilation_data(extubation_all, 'extubationtime', 'extubation')

# 삽관 발관 테이블 결합
intubation_extubation = fve.join_ventilation_and_rename(intubation_data, extubation_data)

# 입원 데이터 결합
intubation_extubation = fve.join_admissions(intubation_extubation, admissions)

print(f'Processed intubation_extubation: {intubation_extubation.shape}')   # (10992, 14)
print(intubation_extubation.columns)

print(f'intubation_data: {intubation_data.shape}')
print(f'extubation_data: {extubation_data.shape}')

Processed intubation_extubation: (11528, 14)
Index(['subject_id', 'hadm_id', 'int_stayid', 'admittime', 'intubationtime',
       'int_itemid', 'int_weight', 'ext_stayid', 'extubationtime',
       'ext_itemid', 'ext_weight', 'extubationcause', 'dischtime',
       'deathtime'],
      dtype='object')
intubation_data: (8488, 6)
extubation_data: (23122, 7)


### 데이터 저장

In [24]:
# 데이터 저장
if not os.path.exists(output_dir):   # output 디렉토리가 없을 경우 생성
    os.makedirs(output_dir)

utils.save_filtered_data(adults_icu, intubation_extubation, output_dir, outputs='all')

Data extraction and processing complete. Files saved.
Data extraction and processing complete. Files saved.


In [25]:
intubation_data.to_csv('./data/intubation_data_before_unable.csv', index=True)
extubation_data.to_csv('./data/extubation_data_before_unable.csv', index=True)
intubation_extubation.to_csv('./data/intubation_extubation_before_unable.csv', index=True)