In [0]:
spark.conf.set("fs.azure.account.auth.type.ise534projectadls.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.ise534projectadls.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.ise534projectadls.dfs.core.windows.net", "sv=2022-11-02&ss=bfqt&srt=sco&sp=rltf&se=2025-05-14T11:56:58Z&st=2025-03-13T03:56:58Z&spr=https&sig=8x2cbB8cen%2FHK88g95qU6gGKl%2FrYQneC7lGscFLjR8I%3D")

In [0]:
%python
%pip install requests

import requests

url = "https://physionet.org/files/mimiciv/3.1/"
response = requests.get(url)
if response.status_code == 200:
    display(response.content)
else:
    print(f"Failed to retrieve content. Status code: {response.status_code}")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Failed to retrieve content. Status code: 403


In [0]:
lab_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_hosp/labevents")

In [0]:
inputevents_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_icu/inputevents")

In [0]:
outputevents_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_icu/outputevents")

In [0]:
item_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_icu/d_items")

In [0]:
icu_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_icu/icustays")

In [0]:
diagnoses_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_hosp/diagnoses_icd/")

In [0]:
icd_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_hosp/d_icd_diagnoses/")

In [0]:
patients_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_hosp/patients/")

In [0]:
chartevents_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_icu/chartevents/")

In [0]:
admissions_df = spark.read.parquet("abfss://ise534-project@ise534projectadls.dfs.core.windows.net/mimiciv_3_1_hosp/admissions/")

In [0]:
from pyspark.sql.functions import col, year, unix_timestamp, row_number
from pyspark.sql.window import Window

ards_codes = ['J80', 'J9600', 'J9601', 'J9602', 'J961']

ards_df = diagnoses_df.filter(col("icd_code").isin(ards_codes)).select("subject_id").distinct()

row_count = ards_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 13308


In [0]:

# Add row number partitioned by subject, ordered by ICU intime
window = Window.partitionBy("subject_id").orderBy("intime")
icu_df = icu_df.withColumn("icu_row", row_number().over(window))

# Filter for only first ICU stay
icu_first_df = icu_df.filter(col("icu_row") == 1)

In [0]:
icu_filtered_ards_df = icu_first_df.join(ards_df, on=["subject_id"], how="inner")

In [0]:
row_count = icu_filtered_ards_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 10742


In [0]:
sepsis_codes = [
    '67020', '67022', '67024', '77181', '99591', '99592', 'A021', 'A227', 'A267', 'A327', 'A40', 'A400', 'A401',
    'A403', 'A408', 'A409', 'A41', 'A410', 'A4101', 'A4102', 'A411', 'A412', 'A413', 'A414', 'A415', 'A4150', 'A4151',
    'A4152', 'A4153', 'A4159', 'A418', 'A4181', 'A4189', 'A419', 'A427', 'A5486', 'B377', 'O0337', 'O0387', 'O0487',
    'O0737', 'O0882', 'O85', 'O8604', 'P36', 'P360', 'P361', 'P3610', 'P3619', 'P362', 'P363', 'P3630', 'P3639',
    'P364', 'P365', 'P368', 'P369', 'R652', 'R6520', 'R6521', 'T8144', 'T8144XA', 'T8144XD', 'T8144XS'
]

sepsis_df = diagnoses_df.filter(col("icd_code").isin(sepsis_codes)).select("subject_id").distinct()

row_count = sepsis_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 17713


In [0]:
icu_filtered_sepsis_df = icu_first_df.join(sepsis_df, on=["subject_id"], how="inner")

row_count = icu_filtered_sepsis_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 13481


In [0]:
sepsis_ards_df = icu_filtered_ards_df.join(icu_filtered_sepsis_df, on=["subject_id"], how="inner").distinct()

row_count = sepsis_ards_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 4698


In [0]:
from pyspark.sql.functions import col, year

# Join patients data to ICU data
icu_with_age_df = icu_df.join(
    patients_df.select("subject_id", "anchor_age", "anchor_year"),
    on="subject_id", how="inner"
)

# Calculate real age at ICU admission
icu_with_real_age_df = icu_with_age_df.withColumn(
    "real_age", col("anchor_age") + (year(col("intime")) - col("anchor_year"))
)

# Filter patients who are 18 years or older
icu_adults_df = icu_with_real_age_df.filter(col("real_age") >= 18)


In [0]:
sepsis_ards_adult_df = sepsis_ards_df.join(
    icu_adults_df.select("subject_id").dropDuplicates(),
    on="subject_id",
    how="inner"
)

row_count = sepsis_ards_adult_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 4698


In [0]:
from pyspark.sql.functions import col, unix_timestamp

# Calculate ICU length of stay in hours
icu_stay_hours_df = icu_df.withColumn(
    "icu_los_hours", (unix_timestamp("outtime") - unix_timestamp("intime")) / 3600
)

# Filter patients with ICU LOS > 24 hours
icu_los_over_24_df = icu_stay_hours_df.filter(col("icu_los_hours") > 24)


In [0]:
icu_final_df = sepsis_ards_adult_df.join(
    icu_los_over_24_df.select("subject_id").dropDuplicates(),
    on="subject_id",
    how="inner"
)

row_count = icu_final_df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 4379


In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql.window import Window
# icd_df.filter(F.lower(F.col("long_title")).like("%blood ciltures%")).show(truncate=False)

In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql.window import Window
# item_df.filter(F.lower(F.col("label")).like("%blood ciltures%")).show(truncate=False)

In [0]:
# from pyspark.sql import functions as F

# Input the itemid you're looking for
# input_itemid = 229358 

# Filter the DataFrame by 'itemid' and select the 'label' column
# matching_label_df = item_df.filter(F.col("itemid") == input_itemid).select("label", "unitname")

# Show the result
# matching_label_df.show(truncate=False)


In [0]:
# del icu_final_df

In [0]:
# Drop unnecessary columns
columns_to_drop = [
    "hadm_id", "stay_id", "first_careunit", "last_careunit", "icu_row", "anchor_age", "los",
    "intime", "outtime"
]

icu_final_df = icu_final_df.drop(*columns_to_drop)

# Show updated schema
icu_final_df.printSchema()

root
 |-- subject_id: long (nullable = true)



In [0]:
icu_final_df.dtypes

[('subject_id', 'bigint')]

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# -----------------------------------------------
# Define Item IDs for the Required Features (MIMIC-IV)
# -----------------------------------------------
feature_item_ids = {
    "heart_rate": 220045,
    "respiratory_rate": 220210,
    "temperature": 223761,
    "spo2": 220277,
    "paco2": 223769,
    "pao2": 220224,
    "fio2": "fio2",
    "absolute_neutrophil_count": 229355,
    "absolute_lymphocytes_count": 229358,
    "systolic_arterial_blood_pressure": 220050,
    "sdiastolic_arterial_blood_pressure": 220051,
    "peep": 220339,
    "tidal_volumne": 224684,
    "mechanical_ventilation": 225792,
    "gcs_eye": 220739,
    "gcs_verbal": 223900,
    "gcs_motor": 223901,
    "platelet_count": 227457,
    "ph": 223830,
    "crp": 227444,
    "serum_creatinine": 220615,
    "lactate": 225668,
    "bun": 225624,
    "glucose": 220621,
    "sodium": 220645,
    "wbc": 220546,
    "hb": 220228,
    "potassium": 227442,
    "albumin": 227456,
    "bilirubin": 225690,
    "hematocrit": 220545,
    "hemodialysis": 225441,
    "hemoglobin": 220228,
    "map": 220052,
    "ast": 220587,
    "alt": 220644,
    "inr": 227467 
}

vital_signs = [
    "heart_rate", "respiratory_rate", "temperature", "spo2", 
    "systolic_arterial_blood_pressure", "sdiastolic_arterial_blood_pressure", "map"
]

# -----------------------------------------------
# Feature Extractor (Avg for vital signs within 24 hours, latest for others)
# -----------------------------------------------
def extract_feature(item_id, feature_name):
    if item_id == "fio2":
        fio2_item_ids = [229280, 227010, 229841, 226754]
        df = (
            chartevents_df
            .filter(F.col("itemid").isin(fio2_item_ids))
            .select("subject_id", "charttime", "valuenum")
            .withColumnRenamed("valuenum", feature_name)
        )
    else:
        df = (
            chartevents_df
            .filter(F.col("itemid") == item_id)
            .select("subject_id", "charttime", "valuenum")
            .withColumnRenamed("valuenum", feature_name)
        )

    if feature_name in vital_signs:
        # Join with ICU admission time to filter first 24 hours
        icu_times = icu_df.select("subject_id", "intime").dropDuplicates()
        df = df.join(icu_times, on="subject_id", how="inner")

        df = df.filter(
            (F.col("charttime") >= F.col("intime")) &
            (F.col("charttime") <= F.col("intime") + F.expr("INTERVAL 24 HOURS"))
        )

        df = df.groupBy("subject_id").agg(F.avg(feature_name).alias(feature_name))
    else:
        # Take latest value only (no time filter)
        window_spec = Window.partitionBy("subject_id").orderBy(F.col("charttime").desc())
        df = df.withColumn("row_num", F.row_number().over(window_spec))\
               .filter(F.col("row_num") == 1)\
               .drop("row_num", "charttime")

    return df

# -----------------------------------------------
# Start with Unique ICU Subject IDs
# -----------------------------------------------
icu_final_df = icu_final_df.select("subject_id").dropDuplicates()

# -----------------------------------------------
# Extract and Merge Features
# -----------------------------------------------
for feature, item_id in feature_item_ids.items():
    feature_df = extract_feature(item_id, feature)
    icu_final_df = icu_final_df.join(feature_df, on="subject_id", how="left")

# -----------------------------------------------
# Aggregate Urine Output
# -----------------------------------------------
urine_output_df = (
    outputevents_df
    .filter(F.col("itemid").isin([226566, 226627, 226631, 227489]))
    .select("subject_id", "value")
    .withColumnRenamed("value", "urine_output")
)

urine_output_agg = (
    urine_output_df
    .groupBy("subject_id")
    .agg(F.sum("urine_output").alias("total_urine_output"))
)

icu_final_df = icu_final_df.join(urine_output_agg, on="subject_id", how="left")

# -----------------------------------------------
# Add ICU Length of Stay (First Stay Only)
# -----------------------------------------------
icu_window = Window.partitionBy("subject_id").orderBy(F.col("intime").asc())

icu_unique_df = (
    icu_df
    .withColumn("row_num", F.row_number().over(icu_window))
    .filter(F.col("row_num") == 1)
    .select("subject_id", "los")
    .withColumnRenamed("los", "icu_length_of_stay")
)

icu_final_df = icu_final_df.join(icu_unique_df, on="subject_id", how="left")

# -----------------------------------------------
# Final Cleanup
# -----------------------------------------------
icu_final_df = icu_final_df.dropDuplicates(["subject_id"])
icu_final_df.printSchema()


root
 |-- subject_id: long (nullable = true)
 |-- heart_rate: double (nullable = true)
 |-- respiratory_rate: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- spo2: double (nullable = true)
 |-- paco2: double (nullable = true)
 |-- pao2: double (nullable = true)
 |-- fio2: double (nullable = true)
 |-- absolute_neutrophil_count: double (nullable = true)
 |-- absolute_lymphocytes_count: double (nullable = true)
 |-- systolic_arterial_blood_pressure: double (nullable = true)
 |-- sdiastolic_arterial_blood_pressure: double (nullable = true)
 |-- peep: double (nullable = true)
 |-- tidal_volumne: double (nullable = true)
 |-- mechanical_ventilation: double (nullable = true)
 |-- gcs_eye: double (nullable = true)
 |-- gcs_verbal: double (nullable = true)
 |-- gcs_motor: double (nullable = true)
 |-- platelet_count: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- crp: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- lactate

In [0]:
# -----------------------------------------------
# Prepare Demographic Features
# -----------------------------------------------

# Step 1: Get First Admission Info (one per subject)
admission_window = Window.partitionBy("subject_id").orderBy(F.col("admittime").asc())

admissions_unique_df = (
    admissions_df
    .withColumn("row_num", F.row_number().over(admission_window))
    .filter(F.col("row_num") == 1)
    .select("subject_id", "admittime", "admission_type", "insurance", "race")
)

# Step 2: Join with Patients Table for Gender & Anchor Info
admissions_with_demo_df = admissions_unique_df.join(
    patients_df.select("subject_id", "gender", "anchor_age", "anchor_year"),
    on="subject_id",
    how="left"
)

# Step 3: Compute Age at Admission
admissions_with_demo_df = admissions_with_demo_df.withColumn(
    "admittime_year", F.year("admittime")
).withColumn(
    "age", F.col("anchor_age") + (F.col("admittime_year") - F.col("anchor_year"))
).withColumn(
    "insurance_YN", F.when(F.col("insurance").isNotNull(), F.lit("Y")).otherwise(F.lit("N"))
)

# Step 4: Select Final Demographic Fields
demographics_df = admissions_with_demo_df.select(
    "subject_id", "gender", "age", "admission_type", "insurance_YN", "race"
)

# -----------------------------------------------
# Prepare Height, Weight, BMI
# -----------------------------------------------
# Height
height_df = (
    chartevents_df
    .filter(F.col("itemid") == 226707)
    .select("subject_id", "valuenum")
    .withColumnRenamed("valuenum", "height")
)
height_df = height_df.withColumn(
    "row_num", F.row_number().over(Window.partitionBy("subject_id").orderBy(F.col("height").desc()))
).filter(F.col("row_num") == 1).drop("row_num")

# Weight
weight_df = (
    chartevents_df
    .filter(F.col("itemid") == 226512)
    .select("subject_id", "valuenum")
    .withColumnRenamed("valuenum", "weight")
)
weight_df = weight_df.withColumn(
    "row_num", F.row_number().over(Window.partitionBy("subject_id").orderBy(F.col("weight").desc()))
).filter(F.col("row_num") == 1).drop("row_num")

# BMI
bmi_df = weight_df.join(height_df, on="subject_id", how="inner")
bmi_df = bmi_df.withColumn("bmi", F.col("weight") / ((F.col("height") / 100) ** 2))

# -----------------------------------------------
# Join Demographics to ICU Final DataFrame
# -----------------------------------------------
icu_final_df = (
    icu_final_df
    .join(demographics_df, on="subject_id", how="left")
    .join(height_df, on="subject_id", how="left")
    .join(weight_df, on="subject_id", how="left")
    .join(bmi_df.select("subject_id", "bmi"), on="subject_id", how="left")
)

icu_final_df = icu_final_df.dropDuplicates(["subject_id"])
icu_final_df.printSchema()


root
 |-- subject_id: long (nullable = true)
 |-- heart_rate: double (nullable = true)
 |-- respiratory_rate: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- spo2: double (nullable = true)
 |-- paco2: double (nullable = true)
 |-- pao2: double (nullable = true)
 |-- fio2: double (nullable = true)
 |-- absolute_neutrophil_count: double (nullable = true)
 |-- absolute_lymphocytes_count: double (nullable = true)
 |-- systolic_arterial_blood_pressure: double (nullable = true)
 |-- sdiastolic_arterial_blood_pressure: double (nullable = true)
 |-- peep: double (nullable = true)
 |-- tidal_volumne: double (nullable = true)
 |-- mechanical_ventilation: double (nullable = true)
 |-- gcs_eye: double (nullable = true)
 |-- gcs_verbal: double (nullable = true)
 |-- gcs_motor: double (nullable = true)
 |-- platelet_count: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- crp: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- lactate

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# -----------------------------------------------
# Extract In-Hospital Mortality (Ensuring One Row Per Patient)
# -----------------------------------------------
mortality_df = (
    admissions_df
    .select("subject_id", "hospital_expire_flag")
    .dropDuplicates(["subject_id"])  # Ensure unique subject_id before join
    .withColumnRenamed("hospital_expire_flag", "in_hospital_mortality")
    .withColumn("in_hospital_mortality", F.col("in_hospital_mortality").cast("int"))  # Convert to integer
)

# -----------------------------------------------
# Join with ICU_FINAL_DF
# -----------------------------------------------
icu_final_df = icu_final_df.join(mortality_df, on="subject_id", how="left")

# -----------------------------------------------
# Drop Any Remaining Duplicates (Ensuring One Row Per Patient)
# -----------------------------------------------
icu_final_df = icu_final_df.dropDuplicates(["subject_id"])

# -----------------------------------------------
# Show Final Dataframe Schema
# -----------------------------------------------
icu_final_df.dtypes

[('subject_id', 'bigint'),
 ('heart_rate', 'double'),
 ('respiratory_rate', 'double'),
 ('temperature', 'double'),
 ('spo2', 'double'),
 ('paco2', 'double'),
 ('pao2', 'double'),
 ('fio2', 'double'),
 ('absolute_neutrophil_count', 'double'),
 ('absolute_lymphocytes_count', 'double'),
 ('systolic_arterial_blood_pressure', 'double'),
 ('sdiastolic_arterial_blood_pressure', 'double'),
 ('peep', 'double'),
 ('tidal_volumne', 'double'),
 ('mechanical_ventilation', 'double'),
 ('gcs_eye', 'double'),
 ('gcs_verbal', 'double'),
 ('gcs_motor', 'double'),
 ('platelet_count', 'double'),
 ('ph', 'double'),
 ('crp', 'double'),
 ('serum_creatinine', 'double'),
 ('lactate', 'double'),
 ('bun', 'double'),
 ('glucose', 'double'),
 ('sodium', 'double'),
 ('wbc', 'double'),
 ('hb', 'double'),
 ('potassium', 'double'),
 ('albumin', 'double'),
 ('bilirubin', 'double'),
 ('hematocrit', 'double'),
 ('hemodialysis', 'double'),
 ('hemoglobin', 'double'),
 ('map', 'double'),
 ('ast', 'double'),
 ('alt', 'double

In [0]:
from pyspark.sql import functions as F

# -----------------------------------------------
# Define Medication Item IDs
# -----------------------------------------------
medication_item_ids = {
    "vasopressors": [221906, 221289, 222315],
    "inotropic_agents": [221653, 221662, 221289],
    "iv_fluids": [225158, 225159, 225943],
    "bicarbonate": [220995, 225165]
}

# -----------------------------------------------
# Extract Medication Administration for Each Category
# -----------------------------------------------
for feature, item_ids in medication_item_ids.items():
    feature_df = (
        inputevents_df
        .filter(F.col("itemid").isin(item_ids))  # Filter for relevant medication
        .select("subject_id")
        .distinct()
        .withColumn(feature, F.lit(1))  # Assign 1 if medication was given
    )
    
    # Merge with ICU Final DataFrame
    icu_final_df = icu_final_df.join(feature_df, on="subject_id", how="left")

# -----------------------------------------------
# Fill Nulls with 0 (Indicating No Medication Given)
# -----------------------------------------------
for feature in medication_item_ids.keys():
    icu_final_df = icu_final_df.fillna({feature: 0})

# -----------------------------------------------
# Ensure Each Subject ID Appears Only Once
# -----------------------------------------------
icu_final_df = icu_final_df.dropDuplicates(["subject_id"])

# -----------------------------------------------
# Show Final Dataframe Schema and Summary
# -----------------------------------------------
icu_final_df.printSchema()


root
 |-- subject_id: long (nullable = true)
 |-- heart_rate: double (nullable = true)
 |-- respiratory_rate: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- spo2: double (nullable = true)
 |-- paco2: double (nullable = true)
 |-- pao2: double (nullable = true)
 |-- fio2: double (nullable = true)
 |-- absolute_neutrophil_count: double (nullable = true)
 |-- absolute_lymphocytes_count: double (nullable = true)
 |-- systolic_arterial_blood_pressure: double (nullable = true)
 |-- sdiastolic_arterial_blood_pressure: double (nullable = true)
 |-- peep: double (nullable = true)
 |-- tidal_volumne: double (nullable = true)
 |-- mechanical_ventilation: double (nullable = true)
 |-- gcs_eye: double (nullable = true)
 |-- gcs_verbal: double (nullable = true)
 |-- gcs_motor: double (nullable = true)
 |-- platelet_count: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- crp: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- lactate

In [0]:
from pyspark.sql import functions as F

# -----------------------------------------------
# Define ICD Codes for Comorbidities (All as Strings)
# -----------------------------------------------
comorbidity_icd_codes = { 
    "diabetes": ['24900', '24901', '24910', '24911', '24920', '24921', '24930', '24931', '24940', '24941', '24950', '24951', '24960', '24961', '24970', '24971', '24980', '24981', '24990', '24991', '25000', '25001', '25002', '25003', '25010', '25011', '25012', '25013', '25020', '25021', '25022', '25023', '25030', '25031', '25032', '25033', '25040', '25041', '25042', '25043', '25050', '25051', '25052', '25053', '25060', '25061', '25062', '25063', '25070', '25071', '25072', '25073', '25080', '25081', '25082', '25083', '25090', '25091', '25092', '25093', '2535', '3572', '5881', '64800', '64801', '64802', '64803', '64804', '7751', 'V1221', 'V180', 'V771', 'E08', 'E080', 'E0800', 'E0801', 'E081', 'E0810', 'E0811', 'E082', 'E0821', 'E0822', 'E0829', 'E083', 'E0831', 'E08311', 'E08319', 'E0832', 'E08321', 'E083211', 'E083212', 'E083213', 'E083219', 'E08329', 'E083291', 'E083292', 'E083293', 'E083299', 'E0833', 'E08331', 'E083311', 'E083312', 'E083313', 'E083319', 'E08339', 'E083391', 'E083392', 'E083393', 'E083399', 'E0834', 'E08341', 'E083411', 'E083412', 'E083413', 'E083419', 'E08349', 'E083491', 'E083492', 'E083493', 'E083499', 'E0835', 'E08351', 'E083511', 'E083512', 'E083513', 'E083519', 'E08352', 'E083521', 'E083522', 'E083523', 'E083529', 'E08353', 'E083531', 'E083532', 'E083533', 'E083539', 'E08354', 'E083541', 'E083542', 'E083543', 'E083549', 'E08355', 'E083551', 'E083552', 'E083553', 'E083559', 'E08359', 'E083591', 'E083592', 'E083593', 'E083599', 'E0836', 'E0837', 'E0837X1', 'E0837X2', 'E0837X3', 'E0837X9', 'E0839', 'E084', 'E0840', 'E0841', 'E0842', 'E0843', 'E0844', 'E0849', 'E085', 'E0851', 'E0852', 'E0859', 'E086', 'E0861', 'E08610', 'E08618', 'E0862', 'E08620', 'E08621', 'E08622', 'E08628', 'E0863', 'E08630', 'E08638', 'E0864', 'E08641', 'E08649', 'E0865', 'E0869', 'E088', 'E089', 'E09', 'E090', 'E0900', 'E0901', 'E091', 'E0910', 'E0911', 'E092', 'E0921', 'E0922', 'E0929', 'E093', 'E0931', 'E09311', 'E09319', 'E0932', 'E09321', 'E093211', 'E093212', 'E093213', 'E093219', 'E09329', 'E093291', 'E093292', 'E093293', 'E093299', 'E0933', 'E09331', 'E093311', 'E093312', 'E093313', 'E093319', 'E09339', 'E093391', 'E093392', 'E093393', 'E093399', 'E0934', 'E09341', 'E093411', 'E093412', 'E093413', 'E093419', 'E09349', 'E093491', 'E093492', 'E093493', 'E093499', 'E0935', 'E09351', 'E093511', 'E093512', 'E093513', 'E093519', 'E09352', 'E093521', 'E093522', 'E093523', 'E093529', 'E09353', 'E093531', 'E093532', 'E093533', 'E093539', 'E09354', 'E093541', 'E093542', 'E093543', 'E093549', 'E09355', 'E093551', 'E093552', 'E093553', 'E093559', 'E09359', 'E093591', 'E093592', 'E093593', 'E093599', 'E0936', 'E0937', 'E0937X1', 'E0937X2', 'E0937X3', 'E0937X9', 'E0939', 'E094', 'E0940', 'E0941', 'E0942', 'E0943', 'E0944', 'E0949', 'E095', 'E0951', 'E0952', 'E0959', 'E096', 'E0961', 'E09610', 'E09618', 'E0962', 'E09620', 'E09621', 'E09622', 'E09628', 'E0963', 'E09630', 'E09638', 'E0964', 'E09641', 'E09649', 'E0965', 'E0969', 'E098', 'E099', 'E10', 'E101', 'E1010', 'E1011', 'E102', 'E1021', 'E1022', 'E1029', 'E103', 'E1031', 'E10311', 'E10319', 'E1032', 'E10321', 'E103211', 'E103212', 'E103213', 'E103219', 'E10329', 'E103291', 'E103292', 'E103293', 'E103299', 'E1033', 'E10331', 'E103311', 'E103312', 'E103313', 'E103319', 'E10339', 'E103391', 'E103392', 'E103393', 'E103399', 'E1034', 'E10341', 'E103411', 'E103412', 'E103413', 'E103419', 'E10349', 'E103491', 'E103492', 'E103493', 'E103499', 'E1035', 'E10351', 'E103511', 'E103512', 'E103513', 'E103519', 'E10352', 'E103521', 'E103522', 'E103523', 'E103529', 'E10353', 'E103531', 'E103532', 'E103533', 'E103539', 'E10354', 'E103541', 'E103542', 'E103543', 'E103549', 'E10355', 'E103551', 'E103552', 'E103553', 'E103559', 'E10359', 'E103591', 'E103592', 'E103593', 'E103599', 'E1036', 'E1037', 'E1037X1', 'E1037X2', 'E1037X3', 'E1037X9', 'E1039', 'E104', 'E1040', 'E1041', 'E1042', 'E1043', 'E1044', 'E1049', 'E105', 'E1051', 'E1052', 'E1059', 'E106', 'E1061', 'E10610', 'E10618', 'E1062', 'E10620', 'E10621', 'E10622', 'E10628', 'E1063', 'E10630', 'E10638', 'E1064', 'E10641', 'E10649', 'E1065', 'E1069', 'E108', 'E109', 'E11', 'E110', 'E1100', 'E1101', 'E111', 'E1110', 'E1111', 'E112', 'E1121', 'E1122', 'E1129', 'E113', 'E1131', 'E11311', 'E11319', 'E1132', 'E11321', 'E113211', 'E113212', 'E113213', 'E113219', 'E11329', 'E113291', 'E113292', 'E113293', 'E113299', 'E1133', 'E11331', 'E113311', 'E113312', 'E113313', 'E113319', 'E11339', 'E113391', 'E113392', 'E113393', 'E113399', 'E1134', 'E11341', 'E113411', 'E113412', 'E113413', 'E113419', 'E11349', 'E113491', 'E113492', 'E113493', 'E113499', 'E1135', 'E11351', 'E113511', 'E113512', 'E113513', 'E113519', 'E11352', 'E113521', 'E113522', 'E113523', 'E113529', 'E11353', 'E113531', 'E113532', 'E113533', 'E113539', 'E11354', 'E113541', 'E113542', 'E113543', 'E113549', 'E11355', 'E113551', 'E113552', 'E113553', 'E113559', 'E11359', 'E113591', 'E113592', 'E113593', 'E113599', 'E1136', 'E1137', 'E1137X1', 'E1137X2', 'E1137X3', 'E1137X9', 'E1139', 'E114', 'E1140', 'E1141', 'E1142', 'E1143', 'E1144', 'E1149', 'E115', 'E1151', 'E1152', 'E1159', 'E116', 'E1161', 'E11610', 'E11618', 'E1162', 'E11620', 'E11621', 'E11622', 'E11628', 'E1163', 'E11630', 'E11638', 'E1164', 'E11641', 'E11649', 'E1165', 'E1169', 'E118', 'E119', 'E13', 'E130', 'E1300', 'E1301', 'E131', 'E1310', 'E1311', 'E132', 'E1321', 'E1322', 'E1329', 'E133', 'E1331', 'E13311', 'E13319', 'E1332', 'E13321', 'E133211', 'E133212', 'E133213', 'E133219', 'E13329', 'E133291', 'E133292', 'E133293', 'E133299', 'E1333', 'E13331', 'E133311', 'E133312', 'E133313', 'E133319', 'E13339', 'E133391', 'E133392', 'E133393', 'E133399', 'E1334', 'E13341', 'E133411', 'E133412', 'E133413', 'E133419', 'E13349', 'E133491', 'E133492', 'E133493', 'E133499', 'E1335', 'E13351', 'E133511', 'E133512', 'E133513', 'E133519', 'E13352', 'E133521', 'E133522', 'E133523', 'E133529', 'E13353', 'E133531', 'E133532', 'E133533', 'E133539', 'E13354', 'E133541', 'E133542', 'E133543', 'E133549', 'E13355', 'E133551', 'E133552', 'E133553', 'E133559', 'E13359', 'E133591', 'E133592', 'E133593', 'E133599', 'E1336', 'E1337', 'E1337X1', 'E1337X2', 'E1337X3', 'E1337X9', 'E1339', 'E134', 'E1340', 'E1341', 'E1342', 'E1343', 'E1344', 'E1349', 'E135', 'E1351', 'E1352', 'E1359', 'E136', 'E1361', 'E13610', 'E13618', 'E1362', 'E13620', 'E13621', 'E13622', 'E13628', 'E1363', 'E13630', 'E13638', 'E1364', 'E13641', 'E13649', 'E1365', 'E1369', 'E138', 'E139', 'E232', 'N251', 'O24', 'O240', 'O2401', 'O24011', 'O24012', 'O24013', 'O24019', 'O2402', 'O2403', 'O241', 'O2411', 'O24111', 'O24112', 'O24113', 'O24119', 'O2412', 'O2413', 'O243', 'O2431', 'O24311', 'O24312', 'O24313', 'O24319', 'O2432', 'O2433', 'O244', 'O2441', 'O24410', 'O24414', 'O24415', 'O24419', 'O2442', 'O24420', 'O24424', 'O24425', 'O24429', 'O2443', 'O24430', 'O24434', 'O24435', 'O24439', 'O248', 'O2481', 'O24811', 'O24812', 'O24813', 'O24819', 'O2482', 'O2483', 'O249', 'O2491', 'O24911', 'O24912', 'O24913', 'O24919', 'O2492', 'O2493', 'P700', 'P702', 'R7303', 'Z131', 'Z833', 'Z8632']
    ,  
    "chronic_obstructive_pulmonary_disease": ['J44', 'J440', 'J441', 'J449'],  
    "peripheral_vascular_disease": ['44389', '4439', 'I738', 'I73', 'I739', 'I7389'],  
    "dementia": ['2900', '29010', '29011', '29012', '29013', '29020', '29021', '2903', '29040', '29041', '29042', '29043', '2912', '29282', '29410', '29411', '29420', '29421', '33119', '33182', 'F01', 'F015', 'F0150', 'F0151', 'F01511', 'F01518', 'F0152', 'F0153', 'F0154', 'F01A', 'F01A0', 'F01A1', 'F01A11', 'F01A18', 'F01A2', 'F01A3', 'F01A4', 'F01B', 'F01B0', 'F01B1', 'F01B11', 'F01B18', 'F01B2', 'F01B3', 'F01B4', 'F01C', 'F01C0', 'F01C1', 'F01C11', 'F01C18', 'F01C2', 'F01C3', 'F01C4', 'F02', 'F028', 'F0280', 'F0281', 'F02811', 'F02818', 'F0282', 'F0283', 'F0284', 'F02A', 'F02A0', 'F02A1', 'F02A11', 'F02A18', 'F02A2', 'F02A3', 'F02A4', 'F02B', 'F02B0', 'F02B1', 'F02B11', 'F02B18', 'F02B2', 'F02B3', 'F02B4', 'F02C', 'F02C0', 'F02C1', 'F02C11', 'F02C18', 'F02C2', 'F02C3', 'F02C4', 'F03', 'F039', 'F0390', 'F0391', 'F03911', 'F03918', 'F0392', 'F0393', 'F0394', 'F03A', 'F03A0', 'F03A1', 'F03A11', 'F03A18', 'F03A2', 'F03A3', 'F03A4', 'F03B', 'F03B0', 'F03B1', 'F03B11', 'F03B18', 'F03B2', 'F03B3', 'F03B4', 'F03C', 'F03C0', 'F03C1', 'F03C11', 'F03C18', 'F03C2', 'F03C3', 'F03C4', 'F1027', 'F1097', 'F1327', 'F1397', 'F1817', 'F1827', 'F1897', 'F1917', 'F1927', 'F1997', 'G310']
    ,  
    "peptic_ulcer_disease": ['V1271', 'Z8711'],  
    "liver_disease": ['5718', '5719', '5728', 'K70', 'K709', 'K71', 'K710', 'K711', 'K7110', 'K7111', 'K712', 'K713', 'K714', 'K715', 'K7150', 'K7151', 'K716', 'K717', 'K718', 'K719', 'K75', 'K758', 'K7589', 'K759', 'K769', 'P7884'],  
    "paraplegia": ['3341', '3441', 'G041', 'G114', 'G82', 'G822', 'G8220', 'G8221', 'G8222'],  
    "renal_disease": ['40301', '40311', '40391', '40402', '40403', '40412', '40413', '40492', '40493', '5856', '64210', '64211', '64212', '64213', '64214', '64620', '64621', '64622', '64623', '64624', 'I120', 'I1311', 'I132', 'N186', 'O2683', 'O26831', 'O26832', 'O26833', 'O26839'],
    "congestive_heart_failure": ['4280'],
    "heart_disease": ['09885', '3918', '3919', '39890', '39899', '40200', '40201', '40210', '40211','40290', '40291', '41189', '4148', '4149', '4161', '4168', '4169', '42982','42989', '4299'],
    "copd": ['J44', 'J440', 'J441', 'J449']
}

# -----------------------------------------------
# Extract Comorbidities from Diagnoses Table
# -----------------------------------------------
for condition, icd_codes in comorbidity_icd_codes.items():
    condition_df = (
        diagnoses_df
        .filter(F.col("icd_code").cast("string").isin(icd_codes))  # Ensure icd_code is treated as string
        .select("subject_id")
        .distinct()
        .withColumn(condition, F.lit(1))  # Indicate condition presence
    )
    
    # Left join with ICU Final DataFrame to add the condition
    icu_final_df = icu_final_df.join(condition_df, on="subject_id", how="left")

# -----------------------------------------------
# Fill Nulls with 0 (Patients without the condition)
# -----------------------------------------------
for condition in comorbidity_icd_codes.keys():
    icu_final_df = icu_final_df.fillna({condition: 0})

# -----------------------------------------------
# Ensure Each Subject ID Appears Only Once
# -----------------------------------------------
icu_final_df = icu_final_df.dropDuplicates(["subject_id"])

# -----------------------------------------------
# Show Final DataFrame Schema and Summary
# -----------------------------------------------
icu_final_df.printSchema()


root
 |-- subject_id: long (nullable = true)
 |-- heart_rate: double (nullable = true)
 |-- respiratory_rate: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- spo2: double (nullable = true)
 |-- paco2: double (nullable = true)
 |-- pao2: double (nullable = true)
 |-- fio2: double (nullable = true)
 |-- absolute_neutrophil_count: double (nullable = true)
 |-- absolute_lymphocytes_count: double (nullable = true)
 |-- systolic_arterial_blood_pressure: double (nullable = true)
 |-- sdiastolic_arterial_blood_pressure: double (nullable = true)
 |-- peep: double (nullable = true)
 |-- tidal_volumne: double (nullable = true)
 |-- mechanical_ventilation: double (nullable = true)
 |-- gcs_eye: double (nullable = true)
 |-- gcs_verbal: double (nullable = true)
 |-- gcs_motor: double (nullable = true)
 |-- platelet_count: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- crp: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- lactate

In [0]:

icu_final_df.dtypes

[('subject_id', 'bigint'),
 ('heart_rate', 'double'),
 ('respiratory_rate', 'double'),
 ('temperature', 'double'),
 ('spo2', 'double'),
 ('paco2', 'double'),
 ('pao2', 'double'),
 ('fio2', 'double'),
 ('absolute_neutrophil_count', 'double'),
 ('absolute_lymphocytes_count', 'double'),
 ('systolic_arterial_blood_pressure', 'double'),
 ('sdiastolic_arterial_blood_pressure', 'double'),
 ('peep', 'double'),
 ('tidal_volumne', 'double'),
 ('mechanical_ventilation', 'double'),
 ('gcs_eye', 'double'),
 ('gcs_verbal', 'double'),
 ('gcs_motor', 'double'),
 ('platelet_count', 'double'),
 ('ph', 'double'),
 ('crp', 'double'),
 ('serum_creatinine', 'double'),
 ('lactate', 'double'),
 ('bun', 'double'),
 ('glucose', 'double'),
 ('sodium', 'double'),
 ('wbc', 'double'),
 ('hb', 'double'),
 ('potassium', 'double'),
 ('albumin', 'double'),
 ('bilirubin', 'double'),
 ('hematocrit', 'double'),
 ('hemodialysis', 'double'),
 ('hemoglobin', 'double'),
 ('map', 'double'),
 ('ast', 'double'),
 ('alt', 'double

In [0]:
# Count the number of subject_id in filtered_new_feature_chartevents_df
subject_id_count = icu_final_df.select("subject_id").distinct().count()

# Display the count of subject_id
print(f"Number of unique subject_id in icu_final_df: {subject_id_count}")

Number of unique subject_id in icu_final_df: 4379


In [0]:
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
patients_final_df = icu_final_df.select("*").toPandas()

In [0]:
patients_final_df.to_csv('/Workspace/ISE_534_Data/my_patients.csv',index=False)

In [0]:
patients_loaded = pd.read_csv('/Workspace/ISE_534_Data/my_patients.csv')