In [1]:
!pip install PyAthena
from pyathena import connect
from pyathena.pandas.util import as_pandas


# Import libraries
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import boto3
from botocore.client import ClientError
from IPython.display import display, HTML
import pickle
%matplotlib inline


s3 = boto3.resource('s3')
client = boto3.client("sts")
account_id = client.get_caller_identity()["Account"]
my_session = boto3.session.Session()
region = my_session.region_name
athena_query_results_bucket = 'aws-athena-query-results-'+account_id+'-'+region

try:
    s3.meta.client.head_bucket(Bucket=athena_query_results_bucket)
except ClientError:
    bucket = s3.create_bucket(Bucket=athena_query_results_bucket)
    print('Creating bucket '+athena_query_results_bucket)
cursor = connect(s3_staging_dir='s3://'+athena_query_results_bucket+'/athena/temp').cursor()



## Diabetic patients cohort

In [None]:
#cohort_table = "default.diabetic_patients_cohort"     ## FULL COHORT
#subject_id_col = "subject_id"                         ## FULL COHORT
#prefix = ""                                           ## FULL COHORT

#cohort_table = "default.train_cohort2"                ## BALANCED COHORT - TRAINING
#prefix = "balanced_train_"                            ## BALANCED COHORT - TRAINING
subject_id_col = "new_subject_id"                      ## BALANCED COHORT - TRAINING

cohort_table = "default.test_cohort"                   ## BALANCED COHORT - TESTING
prefix = "balanced_test_"                              ## BALANCED COHORT - TESTING
subject_id_col = "subject_id"                      ## BALANCED COHORT - TESTING

query='select * from default.diabetic_patients_cohort order by subject_id'
cursor.execute(query)
patients = as_pandas(cursor)

query='select mortality_flag, count (distinct ' + subject_id_col + ') from ' + cohort_table +' group by mortality_flag'
cursor.execute(query)
mf = as_pandas(cursor)
mf.head()

## Selected ICD9 codes

In [None]:
chartevents_codes = [
    ['Capillary refill rate', 3348, 224308, 223951, 8377, 115],
    ['Diastolic blood pressure', 8364, 225310, 228151, 8555, 8368, 220051, 8502, 8503, 8504, 8505, 8506, 8507, 8508, 153, 8440, 224643, 227242, 8441, 220180, 8444, 8445, 8446, 8448, 220060],
    ['Fraction inspired oxygen', 7146, 226767, 227035, 228192, 228193, 228232],
    ['Glascow coma scale eye opening', 184, 220739],
    ['Glascow coma scale motor response', 223901, 226757],
    ['Glascow coma scale total', 198],
    ['Glascow coma scale verbal response', 223900, 226758],
    ['Glucose', 3744, 3745, 1310, 807, 1529, 811, 220621, 226537, 3447, 225664],
    ['Heart Rate', 211, 220045],
    ['Height', 226730],
    ['Mean blood pressure', 225312, 52, 6702, 220052, 6927, 3312, 3314, 3316, 7618, 3318, 3320, 3322, 7620, 7622, 3324, 5702, 443, 456, 220181],
    ['Oxygen saturation', 0],
    ['Respiratory rate', 220210, 618, 224688, 224690, 224689, 619],
    ['Systolic blood pressure', 51, 225309, 220050, 3313, 3315, 3317, 3319, 3321, 3323, 3325, 442, 224167, 227243, 455, 220179, 480, 482, 484 ],
    ['Temperature', 224027, 645, 8537, 676, 677, 223762, 678, 679, 223761],
    ['Weight', 581],
    ['pH', 1126, 780, 223830, 220274, 220734, 4753, 4202, 1365, 7717, 3839]
]

labevents_codes = [
    ['Oxygen Saturation', 50817],
    ['Temperature', 50825],
    ['pH', 50820],
    ['% Hemoglobin A1c', 50852, 50854],
    ['Blood Glucose', 50931, 51529],
    ['Serum Creatinine', 50912]    
]

In [None]:
def create_codes_table():
    cursor.execute("SHOW TABLES LIKE 'featurescodes'")
    if (cursor.fetchone() is None):
        try:
            query = "create external table default.featurescodes  (code int, mimiciiitable string, feature string, icd9code int) stored as PARQUET location 's3://aws-athena-query-results-067114122515-us-east-1/featurescodes'"
            cursor.execute(query)
            cnt = 0
            for f in labevents_codes:
                fn = f[0]
                for c in f[1:]:
                    query = "insert into featurescodes values (" + str(cnt) + ",'labevents','" + fn +"'," + str(c) +")"
                    cursor.execute(query)
                    cnt += 1
            for f in chartevents_codes:
                fn = f[0]
                for c in f[1:]:
                    query = "insert into featurescodes values (" + str(cnt) + ",'chartevents','" + fn +"'," + str(c) +")"
                    cursor.execute(query)
                    cnt += 1
            print ("featurescodes table created!")
        except Exception as e:
            print (e)
    else:
        print ("featurescodes table already exists.")
            

In [None]:
create_codes_table()

Item codes are unique among chart and lab events.

In [None]:
cursor.execute("select count(*) from featurescodes f, featurescodes g where g.icd9code=f.icd9code and g.feature <> f.feature")
assert 0 == cursor.fetchone()[0], "Different features have same code in featurescodes table!!!"

## Create joint events table

In [None]:
def create_joint_events():
    cursor.execute("""CREATE TABLE events AS 
                    SELECT p.""" + subject_id_col + """ as subject_id, e.itemid,
                             e.charttime,
                             e.valuenum,
                             p.admit_time,
                             p.discharge_time,
                             p.mortality_flag
                    FROM """ + cohort_table + """ p
                    LEFT OUTER JOIN                              
                    (SELECT subject_id,
                             itemid,
                             charttime,
                             valuenum
                    FROM mimiciii.labevents l
                    INNER JOIN default.featurescodes f
                        ON f.icd9code = l.itemid                    
                    WHERE f.mimiciiitable='labevents'
                    UNION
                    SELECT subject_id,
                             itemid,
                             charttime,
                             valuenum
                    FROM mimiciii.chartevents c
                    INNER JOIN default.featurescodes f
                        ON f.icd9code = c.itemid                      
                    WHERE f.mimiciiitable='chartevents') e ON e.subject_id=p.subject_id
                    AND e.charttime < p.discharge_time - INTERVAL '48' HOUR
                    ORDER BY  p.""" + subject_id_col + """, e.itemid, e.charttime
    """)
    

In [None]:
cursor.execute("SHOW TABLES LIKE 'events'")
if (cursor.fetchone() is None):
    create_joint_events()

In [None]:
def create_events_daystodischarge():
    cursor.execute("""CREATE TABLE events_daystodischarge AS SELECT subject_id,
                             -date_diff('day', discharge_time, charttime) daystodischarge, itemid, valuenum, mortality_flag
                    FROM events e
                    ORDER BY  subject_id, daystodischarge desc, itemid""")

In [None]:
cursor.execute("SHOW TABLES LIKE 'events_daystodischarge'")
if (cursor.fetchone() is None):
    create_events_daystodischarge()

#### We took the average of the valuenum for each itemid on each day

In [None]:
def create_events_features():
    cursor.execute("""CREATE TABLE events_features 
                    AS SELECT subject_id,
                             daystodischarge,
                             f.code,
                             avg(valuenum) value,
                             mortality_flag
                    FROM events_daystodischarge e
                    LEFT JOIN featurescodes f ON e.itemid=f.icd9code
                    GROUP BY  subject_id, daystodischarge, f.code, mortality_flag
                    ORDER BY  subject_id, daystodischarge desc, code""")


In [None]:
cursor.execute("SHOW TABLES LIKE 'events_features'")
if (cursor.fetchone() is None):
    create_events_features()

## Sanity check

In [None]:
cursor.execute("""SELECT feature,
                         avg(value) avg,
                         stddev(value) dev,
                         min(value) min,
                         max(value) max,
                         count(value) cnt
                FROM events_features AS e
                INNER JOIN featurescodes AS f
                    ON e.code=f.code
                WHERE value>0 and value<1000
                GROUP BY  feature
                ORDER BY  dev DESC """)
df = as_pandas(cursor)
df

In [None]:
cursor.execute("""
                SELECT min(daystodischarge) minDays, max(daystodischarge) maxDays, avg(daystodischarge) avgDays, stddev(daystodischarge) stddevDays FROM events_features""")
df = as_pandas(cursor)
df

In [None]:
cursor.execute("""
                SELECT subject_id, count(distinct daystodischarge) cntDays FROM events_features GROUP BY subject_id""")
df = as_pandas(cursor)

In [None]:
print ("Statistics of days to discharge\n", df['cntDays'].describe())
print ("\n\nMore than 180 days\n", df.loc[df['cntDays']>180, ['cntDays']].describe())

In [None]:
cursor.execute("""
                SELECT mortality_flag, count(distinct """ +subject_id_col + """) cnt FROM """ + cohort_table + """ group by mortality_flag""")
df = as_pandas(cursor)
df

In [None]:
cursor.execute("""
                SELECT mortality_flag, count(*) cnt FROM events_features group by mortality_flag""")
df = as_pandas(cursor)
df

## Construction of Events dataframe

In [None]:
cursor.execute("select * from events_features order by subject_id, daystodischarge desc, code")
print ("Query finished")
events = as_pandas(cursor)

In [None]:
num_pat = len(events['subject_id'].unique())

In [None]:
events['code'] = events['code'].fillna(0).astype(int)
events['daystodischarge'] = events['daystodischarge'].fillna(0).astype(int)
events['value'] = events['value'].fillna(0)

In [None]:
events_item = events.groupby([events['subject_id'], events['daystodischarge']])['code'].apply(list).reset_index(name='codes')
events_values = events.groupby([events['subject_id'], events['daystodischarge']])['value'].apply(list).reset_index(name='values')
assert len(events_item['subject_id'].unique()) == num_pat, 'Wrong number of patients in events_item'
assert len(events_values['subject_id'].unique()) == num_pat, 'Wrong number of patients in events_values'

In [None]:
pickle.dump( events_item, open(prefix + "events_item.p", "wb" ) )
pickle.dump( events_values, open(prefix + "events_value.p", "wb"))
pickle.dump( patients, open(prefix + "patients.p", "wb"))

In [None]:
ei = pickle.load( open( prefix+"events_item.p", "rb" ) )

assert len(ei[ei['subject_id']==13]['codes']) == len(events_item[events_item['subject_id']==13]['codes']), "Wrong serialization!!"
assert len(ei)==len(events_item, ), "Wrong serialization!!"

ei = pickle.load(open(prefix+"events_value.p", "rb") )
#assert ei[ei['subject_id']==13]['values'][0] == events_values[events_values['subject_id']==13]['values'][0], "Wrong serialization!!"
assert len(ei)==len(events_item), "Wrong serialization!!"

ei = pickle.load(open(prefix+'patients.p', 'rb'))
assert int(ei[ei['subject_id']==2511]['mortality_flag']) == int(patients[patients['subject_id']==2511]['mortality_flag'])

In [None]:
cursor.execute('select max(code) from featurescodes')
max_code = int(cursor.fetchone()[0])

In [None]:
pickle.dump(max_code, open("events_maxcode.p", "wb"))

#### Maximum days per patient

In [None]:
dtod = events_item.groupby('subject_id').max('daystodischarge').sort_values(['daystodischarge'])
dtod

## Patients without data (days to discharge < 2)

In [None]:
dtod[dtod['daystodischarge']==0].sort_values(['subject_id'])

In [None]:
print ("## Number of patients without data: ", len(dtod[dtod['daystodischarge']==0]))