In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
pip install --upgrade polars

Collecting polars
  Downloading polars-1.7.1-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Downloading polars-1.7.1-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (32.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m32.2/32.2 MB[0m [31m64.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: polars
  Attempting uninstall: polars
    Found existing installation: polars 1.6.0
    Uninstalling polars-1.6.0:
      Successfully uninstalled polars-1.6.0
Successfully installed polars-1.7.1


In [5]:
import os
import polars as pl
import gc


pl.Config.set_tbl_rows(30)# 출력할 행의 수를 설정 (예: 10)
pl.Config.set_tbl_cols(30)# 출력할 열의 수를 설정 (예: 30)
pl.Config.set_tbl_width_chars(1000)
# pl.Config.set_global_string("memory_limit", "45GB")  # 사용 가능한 메모리에 맞게 조정

In [6]:
def load_admission_balanced():
    # labevents 데이터 로드 (LazyFrame 사용)
    admissions = pl.scan_csv(os.path.join(hosp_path, 'admissions.csv.gz'), infer_schema_length=10000, ignore_errors=True)

    # 원본 labevents의 행 수 계산
    # original_row_count = admissions.select(pl.len()).collect()[0, 0]
    # print(f"Original admissions row count: {original_row_count}")

    '''
    subject_id: 환자 고유 식별자
    hadm_id: 입원 고유 식별자
    admittime: 입원 시간
    dischtime: 퇴원 시간
    deathtime: 사망 시간 (해당되는 경우)
    admission_type: 입원 유형 (예: 응급, 선택적 등)
    admission_location: 입원 장소
    discharge_location: 퇴원 장소
    insurance: 보험 유형
    language: 환자의 선호 언어
    marital_status: 결혼 상태
    ethnicity: 민족성
    edregtime: 응급실 등록 시간
    edouttime: 응급실 퇴실 시간
    hospital_expire_flag: 병원 내 사망 여부 플래그
    admit_provider_id: 입원 담당 의료진 ID (MIMIC-IV v2.2에서 추가됨)
    '''
    # Medicaid 수치를 기준으로 설정
    medicaid_count = 104229

    # Medicare, Private, 그리고 나머지 보험 데이터 분리
    medicare = admissions.filter(pl.col("insurance") == "Medicare")
    private = admissions.filter(pl.col("insurance") == "Private")
    other = admissions.filter(pl.col("insurance").is_in(["Medicaid", "Other", "No charge"]) | pl.col("insurance").is_null())

    # Medicare와 Private의 총 행 수 계산
    medicare_count = medicare.select(pl.len()).collect().item()
    private_count = private.select(pl.len()).collect().item()

    # Medicare와 Private에서 각각 medicaid_count만큼 샘플링
    medicare_sampled = medicare.collect().sample(n=medicaid_count)
    private_sampled = private.collect().sample(n=medicaid_count)

    # 모든 데이터 결합 (LazyFrame 상태 유지)
    result_medicare = pl.concat([medicare_sampled.lazy(), private_sampled.lazy(), other])

    # 'WHITE' 인종과 그 외 인종으로 분리
    white = result_medicare.filter(pl.col("race") == "WHITE")
    non_white = result_medicare.filter(pl.col("race") != "WHITE")

    # 'WHITE' 인종의 1/4 샘플링
    white_count = white.select(pl.count()).collect().item()
    sample_size = white_count // 4

    # LazyFrame을 DataFrame으로 변환 후 샘플링
    white_sampled = white.collect().sample(n=sample_size, seed=42)

    # 다운샘플링된 'WHITE' 인종과 다른 인종 결합
    admissions_downsampled = pl.concat([white_sampled.lazy(), non_white])

    # 선택 및 필터링 수행
    admissions_selected = admissions_downsampled.select([
        "subject_id", "hadm_id",  "admittime", "dischtime", "deathtime","admission_type",
        "admission_location","discharge_location",
        "insurance","language","marital_status","race",
        "edregtime","edouttime","hospital_expire_flag","admit_provider_id"
    ])

    return admissions_selected

In [7]:
def load_diagnoses_icd_AKI():
    # diagnoses_icd 데이터 로드 (LazyFrame 사용)
    diagnoses_icd = pl.scan_csv(os.path.join(hosp_path, 'diagnoses_icd.csv.gz'), infer_schema_length=10000, ignore_errors=True)

    # # 선택 수행 (diagnoses_icd 테이블의 실제 열들을 선택)
    # diagnoses_icd_selected = diagnoses_icd.select([
    #     "subject_id", "hadm_id", "seq_num", "icd_code", "icd_version"
    # ]).filter(pl.col("icd_code").str.starts_with("N17")).drop(["seq_num", "icd_version"])
    # 선택 수행 (diagnoses_icd 테이블의 실제 열들을 선택)
    diagnoses_icd_selected = diagnoses_icd.select([
        "subject_id", "hadm_id", "seq_num", "icd_code", "icd_version"
    ]).filter(
        pl.col("icd_code").str.starts_with("250") |
        pl.col("icd_code").str.starts_with("E10") |
        pl.col("icd_code").str.starts_with("E11") |
        pl.col("icd_code").str.starts_with("E13") |
        pl.col("icd_code").str.starts_with("N17")
    ).drop(["seq_num", "icd_version"])
    return diagnoses_icd_selected
'''
ICD-9 코드:
250.xx: 당뇨병의 다양한 유형과 합병증을 나타냅니다.
ICD-10 코드:
E10.xx: 제1형 당뇨병
E11.xx: 제2형 당뇨병
E13.xx: 기타 명시된 당뇨병
'''

'\nICD-9 코드:\n250.xx: 당뇨병의 다양한 유형과 합병증을 나타냅니다.\nICD-10 코드:\nE10.xx: 제1형 당뇨병\nE11.xx: 제2형 당뇨병\nE13.xx: 기타 명시된 당뇨병\n'

In [19]:
import os
import polars as pl
import gc

def load_mimic_data(icu_path, hosp_path, output_file, temp_dir):
    print('Loading all datasets...')
    gc.collect()

    # 1. labevents 처리
    parquet_path_labevents = os.path.join(temp_dir, '1-labevents_join.parquet')
    if not os.path.exists(parquet_path_labevents):
        labevents_path = os.path.join(hosp_path, 'labevents.csv')
        labevents = pl.scan_csv(labevents_path, infer_schema_length=10000, ignore_errors=True)

        # AKI 관련 itemid
        aki_itemids = [
            50912,  # 혈청 크레아티닌: AKI 진단의 주요 기준입니다. KDIGO 가이드라인에 따르면, 혈청 크레아티닌의 급격한 증가는 AKI를 나타냅니다.
            51080,  # BUN: 크레아티닌과 함께 신장 기능을 평가하는 데 사용됩니다. BUN/크레아티닌 비율은 AKI의 원인을 파악하는 데 도움이 될 수 있습니다.
            51006,  # 소변 비중: 신장의 농축 능력을 반영하며, AKI에서 변화할 수 있습니다.
        ]

        # Hypoalbuminemia, Anemia, Hyponatremia 관련 itemid
        additional_itemids = [
            50862,  # Albumin (Blood): Hypoalbuminemia 진단에 사용
            53085,  # Albumin (Blood): Hypoalbuminemia 진단에 사용 (다른 측정 방법)
            50811,  # Hemoglobin (Blood Gas): Anemia 진단에 사용
            51222,  # Hemoglobin (Hematology): Anemia 진단에 사용
            51640,  # Hemoglobin (Chemistry): Anemia 진단에 사용
            50824,  # Sodium, Whole Blood (Blood Gas): Hyponatremia 진단에 사용
            50983,  # Sodium (Blood): Hyponatremia 진단에 사용
            52623,  # Sodium (Blood): Hyponatremia 진단에 사용 (다른 측정 방법)
        ]

        # 모든 관심 있는 itemid를 하나의 리스트로 통합
        all_itemids = aki_itemids + additional_itemids

        outputevents = pl.scan_csv(os.path.join(icu_path, 'outputevents.csv'), infer_schema_length=10000, ignore_errors=True)
        aki_related_itemids = [
            226559,  # 소변량 (Urine Output)
            226560,  # 자연 배뇨량 (Void Output)
            226561,  # 카테터 소변량 (Foley Output)
            226584,  # 요루 배출량 (Ostomy Output)
        ]

        # 필요한 열만 선택하고 필터링
        filtered_labevents = labevents.select([
            "subject_id", "hadm_id", "itemid", "charttime", "valuenum", "valueuom"
        ]).filter(pl.col("itemid").is_in(all_itemids)).rename({"valuenum": "value_labevent"})



        filtered_outputevents = outputevents.select([
            "subject_id", "hadm_id", "stay_id", "charttime", "itemid", "value"
        ]).filter(pl.col("itemid").is_in(aki_related_itemids)).rename({"itemid": "itemid_urine","value": "value_urine"})

        labevent_columns = set(filtered_labevents.collect_schema().names())
        outputevent_columns = set(filtered_outputevents.collect_schema().names())

        # 공통 열 확인
        common_columns = labevent_columns & outputevent_columns
        print("Labevents columns:", filtered_labevents.collect_schema().names())
        print("Outputevents columns:", filtered_outputevents.collect_schema().names())
        print("common_columns:",common_columns)

        # full join (LazyFrame만 사용)
        # labevents_join = filtered_labevents.join(filtered_outputevents, on=["subject_id", "hadm_id","charttime"], how="full")
        labevents_join = filtered_labevents.join(filtered_outputevents, on=common_columns, how="left")

        # 병합된 데이터 저장
        labevents_join.collect(streaming=True).write_parquet(parquet_path_labevents)

        del labevents, outputevents, filtered_labevents, filtered_outputevents
        gc.collect()

    else:
        labevents_join = pl.scan_parquet(parquet_path_labevents)

    print(f"saving {parquet_path_labevents} finished")



    # 2. balaced_admission 처리
    parquet_path_admission_join = os.path.join(temp_dir, '2-admission_join.parquet')
    if not os.path.exists(parquet_path_admission_join):
        balaced_admission = load_admission_balanced()
        print("Join labevents_join and balaced_admission_join:")


        labevents_join_columns = set(labevents_join.collect_schema().names())
        balaced_admission_columns = set(balaced_admission.collect_schema().names())

        # 공통 열 확인
        common_columns = labevents_join_columns & balaced_admission_columns
        print("Labevents columns:", labevents_join.collect_schema().names())
        print("admission columns:", balaced_admission.collect_schema().names())
        print("common_columns:",common_columns)


        # full join (LazyFrame만 사용)
        labevents_join = labevents_join.join(balaced_admission, on=common_columns, how="left")

        # 병합된 데이터 저장
        labevents_join.collect(streaming=True).write_parquet(parquet_path_admission_join)
    else:
        labevents_join = pl.scan_parquet(parquet_path_admission_join)

    # balaced_admission = labevents_join
    # del balaced_admission
    # gc.collect()


    # 3. AKI_icd 처리
    parquet_path_AKI_icd_join = os.path.join(temp_dir, '3-parquet_path_AKI_icd_join.parquet')
    if not os.path.exists(parquet_path_AKI_icd_join):
        diagnoses_icd_AKI = load_diagnoses_icd_AKI()
        print("Join labevents_join and diagnoses_icd_AKI:")

        diagnoses_icd_AKI_columns = set(diagnoses_icd_AKI.collect_schema().names())
        labevents_join_columns = set(labevents_join.collect_schema().names())

        # 공통 열 확인
        common_columns = diagnoses_icd_AKI_columns & labevents_join_columns
        print("Labevents columns:", diagnoses_icd_AKI.collect_schema().names())
        print("admission columns:", labevents_join.collect_schema().names())
        print("common_columns:",common_columns)

        AKI_icd_join = labevents_join.join(diagnoses_icd_AKI, on=common_columns, how="left")
        AKI_icd_join.collect(streaming=True).write_parquet(parquet_path_AKI_icd_join)

        del diagnoses_icd_AKI
        gc.collect()
    else:
        AKI_icd_join = pl.scan_parquet(parquet_path_AKI_icd_join)



    # 4. chartevents 처리 (날짜 변환 없이 처리)
    parquet_path_AKI_icd_join=os.path.join(temp_dir,'4-chartevent_join.parquet')
    if not os.path.exists(parquet_path_AKI_icd_join):
        chartevents_path = os.path.join(icu_path, 'chartevents.csv')


        print("Join previous and chartevents:")
        chartevents = pl.scan_csv(chartevents_path, infer_schema_length=10000, ignore_errors=True)
        aki_chart_itemids = [
            220615,  # Creatinine (Blood) / 혈중 크레아티닌
            220624,  # BUN (Blood Urea Nitrogen) / 혈중 요소 질소
            226512,  # Urine Output / 소변 배출량
            227005,  # GCS - Verbal Response / GCS - 언어 반응
            226566,  # Foley Catheter / 폴리 카테터
        ]


        # charttime을 변환하지 않고 처리
        chartevents_selected = chartevents.select([
            'subject_id',
            'hadm_id',
            'charttime',  # 변환 없이 문자열로 처리
            'itemid',
            'value',
            'valuenum'
        ]).filter(pl.col("itemid").is_in(aki_chart_itemids))


        AKI_icd_join_columns = set(AKI_icd_join.collect_schema().names())
        chartevents_selected_columns = set(chartevents_selected.collect_schema().names())

        # 공통 열 확인
        common_columns = AKI_icd_join_columns & chartevents_selected_columns
        print("Labevents columns:", AKI_icd_join.collect_schema().names())
        print("admission columns:", chartevents_selected.collect_schema().names())
        print("common_columns:",common_columns)

        chartevent_join = AKI_icd_join.join(chartevents_selected, on=common_columns, how="left")

        # 결과 저장
        chartevent_join_path = os.path.join(temp_dir, '4-chartevent_join.parquet')
        print("Saving the chartevent join data...")
        chartevent_join.collect(streaming=True).write_parquet(chartevent_join_path)
        print(f"Saved chartevent join to {chartevent_join_path}.")

            # 메모리 해제
        del chartevents_selected, chartevents
        gc.collect()
    else:
        chartevent_join = pl.scan_parquet(parquet_path_AKI_icd_join)




    # 5. patients 테이블 합치기.
    parquet_path_patients = os.path.join(temp_dir, '5-parquet_path_patients.parquet')
    if not os.path.exists(parquet_path_patients):
        patients_AKI = pl.scan_csv(os.path.join(hosp_path, 'patients.csv.gz'), infer_schema_length=10000, ignore_errors=True)
        print("Join previous and patients:")

        patients_AKI_columns = set(patients_AKI.collect_schema().names())
        chartevent_join_columns = set(chartevent_join.collect_schema().names())

        # 공통 열 확인
        common_columns = patients_AKI_columns & chartevent_join_columns
        print("patients columns:", patients_AKI.collect_schema().names())
        print("chartevent columns:", chartevent_join.collect_schema().names())
        print("common_columns:",common_columns)

        patients_join = chartevent_join.join(patients_AKI, on=common_columns, how="left")
        patients_join.collect(streaming=True).write_parquet(parquet_path_patients)

        del chartevent_join
        gc.collect()
    else:
        patients_join = pl.scan_parquet(parquet_path_patients)


    # 6. procedures_icd 테이블 합치기.
    parquet_path_procedures_icd = os.path.join(temp_dir, '6-parquet_path_procedures_icd.parquet')
    if not os.path.exists(parquet_path_procedures_icd):
        procedures_icd = pl.scan_csv(os.path.join(hosp_path, 'procedures_icd.csv.gz'), infer_schema_length=10000, ignore_errors=True)
        print("Join previous and procedures_icd:")

        # 공통 열 확인
        procedures_icd_columns = set(procedures_icd.collect_schema().names())
        patients_join_columns = set(patients_join.collect_schema().names())

        common_columns = procedures_icd_columns & patients_join_columns
        print("procedures_icd columns:", procedures_icd.collect_schema().names())
        print("patients_join columns:", patients_join.collect_schema().names())
        print("common_columns:", common_columns)

        # LazyFrame join 수행
        procedures_icd_join = patients_join.join(procedures_icd, on=common_columns, how="left")

        # 병합된 결과 저장
        procedures_icd_join.collect(streaming=True).write_parquet(parquet_path_procedures_icd)
        print(f"Saved procedures_icd join to {parquet_path_procedures_icd}.")

        # 메모리 해제
        del patients_join, procedures_icd
        gc.collect()
    else:
        procedures_icd_join = pl.scan_parquet(parquet_path_procedures_icd)


    # 7. prescriptions 추가..
    parquet_path_prescriptions = os.path.join(temp_dir, '7-prescriptions.parquet')
    if not os.path.exists(parquet_path_prescriptions):  # 7번 parquet 파일 경로를 확인해야 합니다
        prescriptions = pl.scan_csv(os.path.join(hosp_path, 'prescriptions.csv'), infer_schema_length=100, ignore_errors=True)
        # 관심 있는 약물 리스트
        medications_of_interest = ["ARB", "Spironolactone", "Eplerenone"]
        # 필요한 열만 선택하고 필터링
        filtered_prescriptions = prescriptions.select([
            "subject_id", "hadm_id", "drug", "starttime", "stoptime"
        ]).filter(
            pl.col("drug").str.to_lowercase().str.contains(
                "|".join(med.lower() for med in medications_of_interest)
            )
        )

        print("Join previous and prescriptions:")

        # 공통 열 확인
        prescriptions_columns = set(filtered_prescriptions.collect_schema().names())
        procedures_icd_join_columns = set(procedures_icd_join.collect_schema().names())
        common_columns = prescriptions_columns & procedures_icd_join_columns

        print("prescriptions columns:", filtered_prescriptions.collect_schema().names())
        print("procedures_icd_join columns:", procedures_icd_join.collect_schema().names())
        print("common_columns:", common_columns)

        # LazyFrame join 수행
        prescriptions_join = procedures_icd_join.join(filtered_prescriptions, on=common_columns, how="left")
        prescriptions_join = prescriptions_join.drop([
            'value', 'valuenum', 'seq_num', 'chartdate', 'icd_version', 'anchor_year', 'anchor_year_group'
        ])
        # 병합된 결과 저장
        prescriptions_join.collect(streaming=True).write_parquet(parquet_path_prescriptions)  # prescriptions에 저장
        print(f"Saved prescriptions join to {parquet_path_prescriptions}.")

        # 메모리 해제
        del prescriptions
        gc.collect()
    else:
        prescriptions_join = pl.scan_parquet(parquet_path_prescriptions)

    prescriptions_join = pl.scan_parquet(parquet_path_prescriptions)


    # 8. procedureevents 읽기 procedures_icd.csv.gz, procedureevents.csv.gz
    parquet_path_procedureevents = os.path.join(temp_dir, '8-procedureevents.parquet')

    if not os.path.exists(parquet_path_procedureevents):  # 8번 parquet 파일 경로를 확인해야 합니다
        # procedureevents 데이터 읽기
        procedureevents = pl.scan_csv(os.path.join(icu_path, 'procedureevents.csv'), infer_schema_length=100, ignore_errors=True)
        #['subject_id', 'hadm_id', 'stay_id', 'caregiver_id', 'starttime', 'endtime', 'storetime', 'itemid', 'value', 'valueuom',
        # 'location', 'locationcategory', 'orderid', 'linkorderid', 'ordercategoryname', 'ordercategorydescription', 'patientweight',
        # 'isopenbag', 'continueinnextdept', 'statusdescription', 'originalamount', 'originalrate']


        # 메모리 정리
        gc.collect()

        # 공통 열 확인
        prescriptions_join_columns = set(prescriptions_join.collect_schema().names())
        procedureevents_columns = set(procedureevents.collect_schema().names())
        common_columns = prescriptions_join_columns & procedureevents_columns

        print("Join previous and procedureevents:")

        # LazyFrame join 수행
        prescriptions_join_ = procedureevents.join(prescriptions_join, on=common_columns, how="left")

        # 병합된 결과 저장
        prescriptions_join_.collect(streaming=True).write_parquet(parquet_path_procedureevents)
        print(f"Saved procedureevents join to {parquet_path_procedureevents}.")

        # 메모리 해제
        del procedureevents, prescriptions_join, prescriptions_join_
        gc.collect()

    else:
        # 병합된 parquet 파일 읽기
        prescriptions_join = pl.scan_parquet(parquet_path_procedureevents)



if __name__ == "__main__":
    base_path = '/content/drive/MyDrive/AKI_공유/3.0'
    hosp_path = os.path.join(base_path, 'hosp')
    icu_path = os.path.join(base_path, 'icu')
    output_file = "/content/drive/MyDrive/AKI_공유/mimic_data_output_240920.csv"
    temp_dir = '/content/drive/MyDrive/AKI_공유/3.0/full_join'

    from time import time
    start_time = time()
    load_mimic_data(icu_path, hosp_path, output_file, temp_dir)
    print(f"Total execution time: {time() - start_time} seconds")


Loading all datasets...
saving /content/drive/MyDrive/AKI_공유/3.0/full_join/1-labevents_join.parquet finished
Join previous and procedureevents:
Saved procedureevents join to /content/drive/MyDrive/AKI_공유/3.0/full_join/8-procedureevents.parquet.
Total execution time: 8.997939109802246 seconds


In [13]:
parquet_path_prescriptions = os.path.join(temp_dir, '7-prescriptions.parquet')
# procedureevents 데이터 스캔
prescriptions_join = pl.scan_parquet(parquet_path_prescriptions)
# 열 이름 출력
print(prescriptions_join.columns)


['subject_id', 'hadm_id', 'itemid', 'charttime', 'value_labevent', 'valueuom', 'stay_id', 'itemid_urine', 'value_urine', 'admittime', 'dischtime', 'deathtime', 'admission_type', 'admission_location', 'discharge_location', 'insurance', 'language', 'marital_status', 'race', 'edregtime', 'edouttime', 'hospital_expire_flag', 'admit_provider_id', 'icd_code', 'value', 'valuenum', 'gender', 'anchor_age', 'anchor_year', 'anchor_year_group', 'dod', 'seq_num', 'chartdate', 'icd_version', 'drug', 'starttime', 'stoptime']


  print(prescriptions_join.columns)


subject_id: 환자 고유 식별자
hadm_id: 입원 고유 식별자
admittime: 입원 시간
dischtime: 퇴원 시간
deathtime: 사망 시간 (해당되는 경우)
admission_type: 입원 유형 (예: 응급, 선택적 등)
admission_location: 입원 장소
discharge_location: 퇴원 장소
insurance: 보험 유형
language: 환자의 선호 언어
marital_status: 결혼 상태
ethnicity: 민족성
edregtime: 응급실 등록 시간
edouttime: 응급실 퇴실 시간
hospital_expire_flag: 병원 내 사망 여부 플래그
admit_provider_id: 입원 담당 의료진 ID (MIMIC-IV v2.2에서 추가됨)

In [6]:
import polars as pl
import os
base_path = '/content/drive/MyDrive/AKI_공유/3.0'
hosp_path = os.path.join(base_path, 'hosp')
# prescriptions 테이블 로드 (파일 경로는 실제 환경에 맞게 수정해야 합니다)
prescriptions = pl.scan_csv(os.path.join(hosp_path, 'prescriptions.csv') ) # 또는 pl.scan_csv() 사용

# 관심 있는 약물 리스트
medications_of_interest = ["ACEI", "ARB", "Spironolactone", "Eplerenone"]

# drug 컬럼의 고유 값 확인 (상위 20개)
unique_drugs = prescriptions.select(pl.col("drug").unique().alias("unique_drugs")).collect()
print("Drug 컬럼의 고유 값 (상위 20개):")
print(unique_drugs.head(20))

# 관심 있는 약물 검색
for med in medications_of_interest:
    count = prescriptions.filter(pl.col("drug").str.to_lowercase().str.contains(med.lower())).select(pl.count()).collect()
    print(f"{med}를 포함하는 행의 수: {count[0, 0]}")

# 전체 약물 목록 중 관심 있는 약물과 유사한 이름 찾기
similar_drugs = prescriptions.select(
    pl.col("drug").filter(
        pl.col("drug").str.to_lowercase().str.contains("|".join(med.lower() for med in medications_of_interest))
    ).unique()
).collect()
# 출력할 행의 수를 설정 (예: 10)
pl.Config.set_tbl_rows(100)
# 출력할 열의 수를 설정 (예: 30)
pl.Config.set_tbl_cols(30)
pl.Config.set_tbl_width_chars(1000)
print("\n관심 있는 약물과 유사한 이름:")
print(similar_drugs)

Drug 컬럼의 고유 값 (상위 20개):
shape: (20, 1)
┌────────────────────────────┐
│ unique_drugs               │
│ ---                        │
│ str                        │
╞════════════════════════════╡
│ acyclovir                  │
│ Mesalamine Delayed-Release │
│ Esomep                     │
│ zidovuDINE                 │
│ Symbicort 160/4.5 mcg      │
│ Relpax                     │
│ nimodipine                 │
│ INV-LCZ696                 │
│ Imiquimod                  │
│ Exelon  9.5mg/24hr         │
│ Metadate CD                │
│ ixazomib                   │
│ belumosudil                │
│ Chloroprocaine 2%          │
│ Isentress                  │
│ Creon 24,000               │
│ Fiber                      │
│ rosuvastatin               │
│ tamsuLOSIN                 │
│ Tretinoin 0.025% Gel       │
└────────────────────────────┘


  count = prescriptions.filter(pl.col("drug").str.to_lowercase().str.contains(med.lower())).select(pl.count()).collect()


ACEI를 포함하는 행의 수: 0
ARB를 포함하는 행의 수: 204804
Spironolactone를 포함하는 행의 수: 34979
Eplerenone를 포함하는 행의 수: 1294


In [None]:
import polars as pl
import os
base_path = '/content/drive/MyDrive/AKI_공유/3.0'
hosp_path = os.path.join(base_path, 'hosp')

labevents_path = os.path.join(hosp_path, 'labevents.csv') # 또는 .parquet 파일

# LazyFrame으로 labevents 읽기
labevents = pl.scan_csv(labevents_path)  # 또는 pl.scan_parquet() 사용

# 테이블 구조 확인
print("Labevents 테이블 구조:")
print(labevents.columns)

# 관심 있는 검사 항목들의 키워드
keywords = [
    "albumin", "alb",  # 혈청 알부민
    "hemoglobin", "hgb", "hb",  # 빈혈 관련
    "sodium", "na"  # 나트륨
]

# itemid와 검사 이름을 포함하는 컬럼 선택 (실제 컬럼 이름으로 수정 필요)
unique_items = (
    labevents
    .select(["itemid", "label"])  # 'label' 대신 실제 컬럼 이름 사용
    .unique()
    .collect()
)

# 키워드를 포함하는 항목 필터링
filtered_items = unique_items.filter(
    pl.col("label").str.to_lowercase().str.contains("|".join(keywords))
)

# 결과 출력
print("\n관련 가능성이 있는 검사 항목들:")
print(filtered_items)

# 각 키워드별로 가장 빈번한 itemid 찾기
for keyword in keywords:
    most_common = (
        labevents
        .filter(pl.col("label").str.to_lowercase().str.contains(keyword))
        .groupby("itemid")
        .agg(pl.count().alias("count"))
        .sort("count", descending=True)
        .limit(1)
        .collect()
    )
    if not most_common.is_empty():
        print(f"\n'{keyword}' 관련 가장 빈번한 itemid:")
        print(most_common)

In [11]:
import polars as pl
import os
base_path = '/content/drive/MyDrive/AKI_공유/3.0'
hosp_path = os.path.join(base_path, 'hosp')

d_labitems_path = os.path.join(hosp_path, 'd_labitems.csv.gz') # 또는 .parquet 파일

# LazyFrame으로 d_labitems 읽기
d_labitems = pl.scan_csv(d_labitems_path)  # 또는 pl.scan_parquet() 사용

# 관심 있는 검사 항목들의 키워드
keywords = [
    "albumin", "alb",  # 혈청 알부민
    "hemoglobin", "hgb", "hb",  # 빈혈 관련
    "sodium", "na"  # 나트륨
]

# 키워드를 포함하는 항목 필터링
filtered_items = (
    d_labitems
    .filter(
        pl.col("label").str.to_lowercase().str.contains("|".join(keywords)) |
        pl.col("fluid").str.to_lowercase().str.contains("|".join(keywords)) |
        pl.col("category").str.to_lowercase().str.contains("|".join(keywords))
    )
    .select(["itemid", "label", "fluid", "category"])
    .collect()
)

# 결과 출력
print("관련 가능성이 있는 검사 항목들:")
print(filtered_items)

# 각 키워드별로 관련 itemid 찾기
for keyword in keywords:
    related_items = (
        d_labitems
        .filter(
            pl.col("label").str.to_lowercase().str.contains(keyword) |
            pl.col("fluid").str.to_lowercase().str.contains(keyword) |
            pl.col("category").str.to_lowercase().str.contains(keyword)
        )
        .select(["itemid", "label", "fluid", "category"])
        .collect()
    )
    if not related_items.is_empty():
        print(f"\n'{keyword}' 관련 itemid:")
        print(related_items)

관련 가능성이 있는 검사 항목들:
shape: (228, 4)
┌────────┬─────────────────────────────────┬─────────────────────┬────────────┐
│ itemid ┆ label                           ┆ fluid               ┆ category   │
│ ---    ┆ ---                             ┆ ---                 ┆ ---        │
│ i64    ┆ str                             ┆ str                 ┆ str        │
╞════════╪═════════════════════════════════╪═════════════════════╪════════════╡
│ 50803  ┆ Calculated Bicarbonate, Whole … ┆ Blood               ┆ Blood Gas  │
│ 50805  ┆ Carboxyhemoglobin               ┆ Blood               ┆ Blood Gas  │
│ 50811  ┆ Hemoglobin                      ┆ Blood               ┆ Blood Gas  │
│ 50814  ┆ Methemoglobin                   ┆ Blood               ┆ Blood Gas  │
│ 50824  ┆ Sodium, Whole Blood             ┆ Blood               ┆ Blood Gas  │
│ 50834  ┆ Sodium, Body Fluid              ┆ Other Body Fluid    ┆ Blood Gas  │
│ 50835  ┆ Albumin, Ascites                ┆ Ascites             ┆ Chemistry  │
│ 508

검색 결과를 바탕으로 Hypoalbuminemia, Anemia, Hyponatremia와 관련된 항목들을 선별해 드리겠습니다.
Hypoalbuminemia (저알부민혈증) 관련:
itemid: 50862 (Albumin, Blood)
itemid: 53085 (Albumin, Blood)
참고: 혈청 알부민 수치가 낮을 때 Hypoalbuminemia로 진단됩니다.
Anemia (빈혈) 관련:
itemid: 50811 (Hemoglobin, Blood)
itemid: 51222 (Hemoglobin, Blood)
itemid: 51640 (Hemoglobin, Blood)
참고: 헤모글로빈 수치가 낮을 때 Anemia로 진단됩니다.
Hyponatremia (저나트륨혈증) 관련:
itemid: 50824 (Sodium, Whole Blood)
itemid: 50983 (Sodium, Blood)
itemid: 52623 (Sodium, Blood)
참고: 혈중 나트륨 수치가 낮을 때 Hyponatremia로 진단됩니다.
이 itemid들은 각각 혈액 내 알부민, 헤모글로빈, 나트륨 수치를 측정하는 검사들입니다. 이 검사 결과들을 통해 Hypoalbuminemia, Anemia, Hyponatremia의 여부를 판단할 수 있습니다.
주의할 점은 같은 검사항목에 대해 여러 itemid가 존재할 수 있다는 것입니다. 이는 검사 방법이나 장비의 차이, 또는 데이터베이스의 구조적 특성 때문일 수 있습니다. 실제 분석 시에는 이 중 가장 적절한 itemid를 선택하거나, 모든 관련 itemid의 데이터를 종합적으로 고려해야 할 수 있습니다.