####Creating DataFrames and typecasting columns according to the documentation
>admissions: https://mimic.mit.edu/docs/iv/modules/core/admissions/  
>d_labitens: https://mimic.mit.edu/docs/iv/modules/hosp/d_labitems/  
>edstays: https://mimic.mit.edu/docs/iv/modules/ed/edstays/  
>labevents: https://mimic.mit.edu/docs/iv/modules/hosp/labevents/

In [0]:
admissions_schema = """
subject_id INTEGER, 
hadm_id INTEGER, 
admittime TIMESTAMP, 
dischtime TIMESTAMP, 
deathtime TIMESTAMP, 
admission_type STRING, 
admission_location STRING, 
discharge_location STRING,
insurance STRING,
language STRING,
marital_status STRING,
ethnicity STRING,
edregtime TIMESTAMP,
edouttime TIMESTAMP,
hospital_expire_flag INTEGER"""

d_labitems_schema = """
itemid INTEGER,
label STRING,
fluid STRING,
category STRING,
loinc_code STRING"""

edstays_schema = """
subject_id INTEGER, 
hadm_id INTEGER, 
stay_id INTEGER, 
intime TIMESTAMP, 
outtime TIMESTAMP"""

labevents_schema = """
labevent_id	INTEGER,
subject_id INTEGER,
hadm_id INTEGER,
specimen_id	INTEGER,
itemid INTEGER,
charttime TIMESTAMP,
storetime TIMESTAMP,
value STRING,
valuenum DOUBLE,
valueuom STRING,
ref_range_lower DOUBLE,
ref_range_upper	DOUBLE,
flag STRING,
priority STRING,
comments STRING
"""

admissionsDf = spark.read.options(header=True).schema(admissions_schema).csv("dbfs:/FileStore/tables/admissions.csv")
d_labitemsDf = spark.read.options(header=True).schema(d_labitems_schema).csv("dbfs:/FileStore/tables/d_labitems.csv")
edstaysDf = spark.read.options(header=True).schema(edstays_schema).csv("dbfs:/FileStore/tables/edstays.csv")
labeventsDf = spark.read.options(header=True).schema(labevents_schema).csv("dbfs:/FileStore/tables/labevents.csv")

In [0]:
admissionsDf.printSchema()

root
 |-- subject_id: integer (nullable = true)
 |-- hadm_id: integer (nullable = true)
 |-- admittime: timestamp (nullable = true)
 |-- dischtime: timestamp (nullable = true)
 |-- deathtime: timestamp (nullable = true)
 |-- admission_type: string (nullable = true)
 |-- admission_location: string (nullable = true)
 |-- discharge_location: string (nullable = true)
 |-- insurance: string (nullable = true)
 |-- language: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- ethnicity: string (nullable = true)
 |-- edregtime: timestamp (nullable = true)
 |-- edouttime: timestamp (nullable = true)
 |-- hospital_expire_flag: integer (nullable = true)



In [0]:
d_labitemsDf.printSchema()

root
 |-- itemid: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- fluid: string (nullable = true)
 |-- category: string (nullable = true)
 |-- loinc_code: string (nullable = true)



In [0]:
edstaysDf.printSchema()

root
 |-- subject_id: integer (nullable = true)
 |-- hadm_id: integer (nullable = true)
 |-- stay_id: integer (nullable = true)
 |-- intime: timestamp (nullable = true)
 |-- outtime: timestamp (nullable = true)



In [0]:
labeventsDf.printSchema()

root
 |-- labevent_id: integer (nullable = true)
 |-- subject_id: integer (nullable = true)
 |-- hadm_id: integer (nullable = true)
 |-- specimen_id: integer (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- charttime: timestamp (nullable = true)
 |-- storetime: timestamp (nullable = true)
 |-- value: string (nullable = true)
 |-- valuenum: double (nullable = true)
 |-- valueuom: string (nullable = true)
 |-- ref_range_lower: double (nullable = true)
 |-- ref_range_upper: double (nullable = true)
 |-- flag: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- comments: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

####What is the percentage of patients admitted to the hospital that have never been to the ED?
>The edstays table gives us the information about patients admited to ED (https://mimic.mit.edu/docs/iv/modules/ed/edstays/)
>

In [0]:
patientsEDDf = admissionsDf.join(edstaysDf,["subject_id","hadm_id"],"left") \
.groupBy("subject_id").agg(sum("stay_id").alias("sum_stay_id"))

patientsEDDf.createOrReplaceTempView("patientsED")

display(
    spark.sql("""
        SELECT
            COUNT(DISTINCT(subject_id)) AS count_total_patients,
            ((COUNT(DISTINCT(subject_id)) FILTER(WHERE sum_stay_id IS NOT NULL))/COUNT(DISTINCT(subject_id)))*100 AS percentage_patients_ed,
            ((COUNT(DISTINCT(subject_id)) FILTER(WHERE sum_stay_id IS NULL))/COUNT(DISTINCT(subject_id)))*100 AS percentage_patients_not_ed            
        FROM patientsED
    """)
)

count_total_patients,percentage_patients_ed,percentage_patients_not_ed
100,62.0,38.0


####For each patient, generate a json with the hospital admission data.
>Admission data is in table admissions (https://mimic.mit.edu/docs/iv/modules/core/admissions/)

In [0]:
subject_list = admissionsDf.select("subject_id").distinct().collect() # Getting distinct subject_id as python list of Rows
output_basepath = "dbfs:/FileStore/tables/subject"
for row in subject_list:
    subject = row.subject_id
    df = admissionsDf.filter(f"subject_id = {subject}") # Creating a DataFrame for each subject_id
    df.write.mode("overwrite").json(f"{output_basepath}/subject_{subject}") # Writing as Json in output basepath with naming convention: subject_[subject_id]
dbutils.fs.ls("dbfs:/FileStore/tables/subject")

Out[69]: [FileInfo(path='dbfs:/FileStore/tables/subject/subject_10000032/', name='subject_10000032/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10001217/', name='subject_10001217/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10001725/', name='subject_10001725/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10002428/', name='subject_10002428/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10002495/', name='subject_10002495/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10002930/', name='subject_10002930/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10003046/', name='subject_10003046/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/subject/subject_10003400/', name='subject_10003400/', size=0, modificationTime=0),
 FileInfo(path=

####Check the performance of albumin in predicting in-hospital mortality.  
Use only each patient’s first admission to the hospital; and only the Albumin results from the first 24h from hospital admission.
>Column deathtime from admissions indicates if patient died in the hospital, if Null the patient didn't die (https://mimic.mit.edu/docs/iv/modules/core/admissions/)  
>All 3 patients treated with Albumin within 24h from admission to the hospital survived

In [0]:
patientsAlbuminDf = admissionsDf.join(labeventsDf,["subject_id","hadm_id"],"left") \
.join(d_labitemsDf,["itemid"],"left") \
.where("lower(label) like '%albumin%'") \
.withColumn("row_number",row_number().over(Window.partitionBy("subject_id").orderBy("admittime"))).filter(col("row_number")==1) \
.withColumn("timedelta", (col("dischtime").cast("long") - col("admittime").cast("long"))/3600) \
.filter("timedelta <= 24.0") \
.drop("row_number")

display(patientsAlbuminDf)

itemid,subject_id,hadm_id,admittime,dischtime,deathtime,admission_type,admission_location,discharge_location,insurance,language,marital_status,ethnicity,edregtime,edouttime,hospital_expire_flag,labevent_id,specimen_id,charttime,storetime,value,valuenum,valueuom,ref_range_lower,ref_range_upper,flag,priority,comments,label,fluid,category,loinc_code,timedelta
50835,10000032,22595853,2180-05-06T22:23:00.000+0000,2180-05-07T17:15:00.000+0000,,URGENT,TRANSFER FROM HOSPITAL,HOME,Other,ENGLISH,WIDOWED,WHITE,2180-05-06T19:17:00.000+0000,2180-05-06T23:30:00.000+0000,0,188,10941085,2180-05-07T10:11:00.000+0000,2180-05-07T12:50:00.000+0000,,,g/dL,,,,STAT,LESS THAN 1.0.,"Albumin, Ascites",Ascites,Chemistry,,18.866666666666667
50862,10004457,25559382,2148-09-14T14:19:00.000+0000,2148-09-15T12:45:00.000+0000,,DIRECT OBSERVATION,PHYSICIAN REFERRAL,,Medicare,ENGLISH,DIVORCED,WHITE,,,0,65622,93293781,2148-09-15T05:34:00.000+0000,2148-09-15T07:09:00.000+0000,3.9,3.9,g/dL,3.5,5.2,,ROUTINE,,Albumin,Blood,Chemistry,1751-7,22.433333333333334
50862,10014354,26722126,2146-11-09T01:53:00.000+0000,2146-11-09T13:13:00.000+0000,,EU OBSERVATION,EMERGENCY ROOM,,Other,ENGLISH,MARRIED,WHITE,2146-11-08T21:24:00.000+0000,2146-11-09T13:13:00.000+0000,0,180273,46872598,2146-11-08T23:36:00.000+0000,2146-11-09T00:30:00.000+0000,3.6,3.6,g/dL,3.5,5.2,,STAT,,Albumin,Blood,Chemistry,1751-7,11.333333333333334
