Load Admission List
Add timestamp

Each feature type

## Setup

In [1]:
import pandas as pd
import numpy as np
import re

from pyspark.sql.functions import *
from pyspark.sql.types import *

HOURS_AFTER_ADM = 24

Waiting for a Spark session to start...

## Admission list

In [2]:
# Load list of admissions
sample = pd.read_csv("sepsis_and_not_sepsis_admissions.csv")
sample['HADM_ID'] = sample['HADM_ID'].str.extract('([0-9]+)')
sample_admissions = np.array(sample['HADM_ID']).tolist()

admissions = spark.read.csv("s3://mimic-raw/mimic3/admissions/ADMISSIONS.csv", header=True, inferSchema=True)

# Add timestamp
admissions = admissions.\
    withColumn("ADMITTIME_UNIX", unix_timestamp("ADMITTIME")).\
    withColumn("ADMITTIME_PLUS_24H", unix_timestamp("ADMITTIME")+HOURS_AFTER_ADM*60*60)

  # Licensed to the Apache Software Foundation (ASF) under one or more


## Feature Dataframe

In [21]:
features = admissions.select("HADM_ID", "ADMITTIME_PLUS_24H").where(col("HADM_ID").isin(sample_admissions))

In [22]:
print((features.count(), len(features.columns)))
features.schema

(50605, 2)


StructType(List(StructField(HADM_ID,IntegerType,true),StructField(ADMITTIME_PLUS_24H,LongType,true)))

## Medication Features

In [6]:
N_MED_FEATURES = 100

meds = spark.read.csv("s3://mimic-raw/mimic3/prescriptions/PRESCRIPTIONS.csv", header=True, inferSchema=True)

# Filter to appropriate admissions
meds = meds.where(col("HADM_ID").isin(sample_admissions))

# Filter to first 24 hours
meds = meds.join(admissions.select("HADM_ID", "ADMITTIME_UNIX", "ADMITTIME_PLUS_24H"), on="HADM_ID").\
    withColumn("UNIX_STARTDATE", unix_timestamp(meds.STARTDATE))
meds = meds.where((meds.UNIX_STARTDATE >= meds.ADMITTIME_UNIX) & (meds.UNIX_STARTDATE <= meds.ADMITTIME_PLUS_24H))

# List of N most common meds
common_meds = meds.groupby("DRUG").count().orderBy(desc("count")).limit(N_MED_FEATURES)
common_meds_list = [drug["DRUG"] for drug in common_meds.collect()]

# Filter to most common meds
med_counts = meds.select("HADM_ID", "DRUG").groupBy("HADM_ID", "DRUG").count()
med_counts_filtered = med_counts.where(col("DRUG").isin(common_meds_list))

# pivot so that each medication is a column
# Results in N x M matrix where N is the number of admssions, and M is the number of med features
med_features = med_counts_filtered.groupBy("HADM_ID").pivot("DRUG").sum("count")

In [8]:
# Save to HDFS
med_features.write.csv("med_features.csv", header=True, mode="overwrite")

In [23]:
# Read from HDFS
med_features = spark.read.csv("med_features.csv", inferSchema=True, header=True)

In [24]:
# Join with features dataframe
features = features.join(med_features, on="HADM_ID", how="left")

In [25]:
print((features.count(), len(features.columns)))
features.printSchema()

(50605, 102)
root
 |-- HADM_ID: integer (nullable = true)
 |-- ADMITTIME_PLUS_24H: long (nullable = true)
 |-- 0.9% Sodium Chloride: integer (nullable = true)
 |-- 0.9% Sodium Chloride (Mini Bag Plus): integer (nullable = true)
 |-- 1/2 NS: integer (nullable = true)
 |-- 5% Dextrose: integer (nullable = true)
 |-- Acetaminophen: integer (nullable = true)
 |-- Albuterol 0.083% Neb Soln: integer (nullable = true)
 |-- Amiodarone: integer (nullable = true)
 |-- Aspirin: integer (nullable = true)
 |-- Aspirin EC: integer (nullable = true)
 |-- Atorvastatin: integer (nullable = true)
 |-- Bag: integer (nullable = true)
 |-- Bisacodyl: integer (nullable = true)
 |-- Calcium Gluconate: integer (nullable = true)
 |-- Captopril: integer (nullable = true)
 |-- CefazoLIN: integer (nullable = true)
 |-- CefePIME: integer (nullable = true)
 |-- Chlorhexidine Gluconate 0.12% Oral Rinse: integer (nullable = true)
 |-- D10W: integer (nullable = true)
 |-- D5 1/2NS: integer (nullable = true)
 |-- D5W: 

## Procedure Features (CPT)

In [None]:
N_CPT_FEATURES = 100

cpt = spark.read.csv("s3://mimic-raw/mimic3/cptevents/CPTEVENTS.csv", header = True, inferSchema=True)

# Filter to appropriate admissions
cpt = cpt.where(col("HADM_ID").isin(sample_admissions))

# Filter to first 3 of CPT
cpt_trimmed = cpt.select("HADM_ID", "CPT_NUMBER").withColumn("CPT", cpt.CPT_NUMBER.substr(0,3))

common_cpts = cpt_trimmed.groupby("CPT").count().orderBy(desc("count")).limit(N_CPT_FEATURES)
common_cpt_list = [cpt["CPT"] for cpt in common_cpts.collect()]

# count cpts
cpt_counts = cpt_trimmed.select("HADM_ID", "CPT").groupBy("HADM_ID", "CPT").count()

# Filter to most common cpt codes
cpt_counts_filtered = cpt_counts.where(col("CPT").isin(common_cpt_list))

# pivot so that each cpt code is a column
# Results in N x M matrix where N is the number of admssions, and M is the number of cpt features
cpt_features = cpt_counts_filtered.groupBy("HADM_ID").pivot("CPT").sum("count")

In [None]:
# save to HDFS
cpt_features.write.csv("cpt_features.csv", header=True, mode="overwrite")

In [26]:
# read from HDFS
cpt_features = spark.read.csv("cpt_features.csv", inferSchema=True, header=True)

In [27]:
# Join with features dataframe
features = features.join(cpt_features, on="HADM_ID", how="left")

In [28]:
print((features.count(), len(features.columns)))
features.printSchema()

(50605, 152)
root
 |-- HADM_ID: integer (nullable = true)
 |-- ADMITTIME_PLUS_24H: long (nullable = true)
 |-- 0.9% Sodium Chloride: integer (nullable = true)
 |-- 0.9% Sodium Chloride (Mini Bag Plus): integer (nullable = true)
 |-- 1/2 NS: integer (nullable = true)
 |-- 5% Dextrose: integer (nullable = true)
 |-- Acetaminophen: integer (nullable = true)
 |-- Albuterol 0.083% Neb Soln: integer (nullable = true)
 |-- Amiodarone: integer (nullable = true)
 |-- Aspirin: integer (nullable = true)
 |-- Aspirin EC: integer (nullable = true)
 |-- Atorvastatin: integer (nullable = true)
 |-- Bag: integer (nullable = true)
 |-- Bisacodyl: integer (nullable = true)
 |-- Calcium Gluconate: integer (nullable = true)
 |-- Captopril: integer (nullable = true)
 |-- CefazoLIN: integer (nullable = true)
 |-- CefePIME: integer (nullable = true)
 |-- Chlorhexidine Gluconate 0.12% Oral Rinse: integer (nullable = true)
 |-- D10W: integer (nullable = true)
 |-- D5 1/2NS: integer (nullable = true)
 |-- D5W: 

## Admission Characteristics

In [37]:
# create small table with relevant admission data
admissions = admissions.where(col('HADM_ID').isin(sample_admissions))
admissions = admissions.select('HADM_ID', 'ADMISSION_TYPE', 'ADMISSION_LOCATION', 'INSURANCE',
 'LANGUAGE', 'RELIGION', 'MARITAL_STATUS', 'ETHNICITY')

In [46]:
admission_type = admissions.groupBy("HADM_ID", "ADMISSION_TYPE").count().\
    groupBy("HADM_ID").pivot("ADMISSION_TYPE").sum("count")
admission_loc = admissions.groupBy("HADM_ID", "ADMISSION_LOCATION").count().\
    groupBy("HADM_ID").pivot("ADMISSION_LOCATION").sum("count")
admission_insurance = admissions.groupBy("HADM_ID", "INSURANCE").count().\
    groupBy("HADM_ID").pivot("INSURANCE").sum("count")

In [47]:
admission_features = admission_type.join(admission_loc, on="HADM_ID").join(admission_insurance, on="HADM_ID")

In [49]:
# save to HDFS
admission_features.write.csv("admission_features.csv", header=True, mode="overwrite")

In [9]:
# read from HDFS
admission_features = spark.read.csv("admission_features.csv", inferSchema=True, header=True)

In [50]:
features = features.join(admission_features, on="HADM_ID", how="left")

In [51]:
print((features.count(), len(features.columns)))
features.printSchema()

(50605, 170)
root
 |-- HADM_ID: integer (nullable = true)
 |-- ADMITTIME_PLUS_24H: long (nullable = true)
 |-- 0.9% Sodium Chloride: integer (nullable = true)
 |-- 0.9% Sodium Chloride (Mini Bag Plus): integer (nullable = true)
 |-- 1/2 NS: integer (nullable = true)
 |-- 5% Dextrose: integer (nullable = true)
 |-- Acetaminophen: integer (nullable = true)
 |-- Albuterol 0.083% Neb Soln: integer (nullable = true)
 |-- Amiodarone: integer (nullable = true)
 |-- Aspirin: integer (nullable = true)
 |-- Aspirin EC: integer (nullable = true)
 |-- Atorvastatin: integer (nullable = true)
 |-- Bag: integer (nullable = true)
 |-- Bisacodyl: integer (nullable = true)
 |-- Calcium Gluconate: integer (nullable = true)
 |-- Captopril: integer (nullable = true)
 |-- CefazoLIN: integer (nullable = true)
 |-- CefePIME: integer (nullable = true)
 |-- Chlorhexidine Gluconate 0.12% Oral Rinse: integer (nullable = true)
 |-- D10W: integer (nullable = true)
 |-- D5 1/2NS: integer (nullable = true)
 |-- D5W: 

## Laboratory Tests

In [29]:
N_LAB_FEATURES = 100

# load labevents
labs = spark.read.csv("s3://mimic-raw/mimic3/labevents/LABEVENTS.csv", header=True, inferSchema=True)

# Filter to appropriate admissions
labs = labs.where(col("HADM_ID").isin(sample_admissions))

# Filter to first 24 hours
labs = labs.join(admissions.select("HADM_ID", "ADMITTIME_UNIX", "ADMITTIME_PLUS_24H"), on="HADM_ID").\
    withColumn("UNIX_CHARTTIME", unix_timestamp(labs.CHARTTIME))
labs = labs.where((labs.UNIX_CHARTTIME >= labs.ADMITTIME_UNIX) & (labs.UNIX_CHARTTIME <= labs.ADMITTIME_PLUS_24H))

In [30]:
# Load labels for lab test names
lab_labels = spark.read.csv("s3://mimic-raw/mimic3/d_labitems/D_LABITEMS.csv", header=True, inferSchema=True)

# replace lab IDs with lab names (labels)
labs_with_label = labs.join(lab_labels, on="ITEMID").select("HADM_ID", "LABEL")

In [31]:
# List of N most common labs
common_labs = labs_with_label.groupby("LABEL").count().orderBy(desc("count")).limit(N_LAB_FEATURES)
common_labs_list = [lab["LABEL"] for lab in common_labs.collect()]

# count labs
lab_counts = labs_with_label.groupBy("HADM_ID", "LABEL").count()

# Filter to most common labs
lab_counts_filtered = lab_counts.where(col("LABEL").isin(common_labs_list))

# pivot so that each lab is a column
# Results in N x M matrix where N is the number of admssions, and M is the number of lab features
lab_features = lab_counts_filtered.groupBy("HADM_ID").pivot("LABEL").sum("count")

In [32]:
# save to HDFS
lab_features.write.csv("lab_features.csv", header=True, mode="overwrite")

In [9]:
# read from HDFS
lab_features = spark.read.csv("lab_features.csv", inferSchema=True, header=True)

In [53]:
# Join with features dataframe
features = features.join(lab_features, on="HADM_ID", how="left")

In [54]:
print((features.count(), len(features.columns)))
features.printSchema()

(50605, 270)
root
 |-- HADM_ID: integer (nullable = true)
 |-- ADMITTIME_PLUS_24H: long (nullable = true)
 |-- 0.9% Sodium Chloride: integer (nullable = true)
 |-- 0.9% Sodium Chloride (Mini Bag Plus): integer (nullable = true)
 |-- 1/2 NS: integer (nullable = true)
 |-- 5% Dextrose: integer (nullable = true)
 |-- Acetaminophen: integer (nullable = true)
 |-- Albuterol 0.083% Neb Soln: integer (nullable = true)
 |-- Amiodarone: integer (nullable = true)
 |-- Aspirin: integer (nullable = true)
 |-- Aspirin EC: integer (nullable = true)
 |-- Atorvastatin: integer (nullable = true)
 |-- Bag: integer (nullable = true)
 |-- Bisacodyl: integer (nullable = true)
 |-- Calcium Gluconate: integer (nullable = true)
 |-- Captopril: integer (nullable = true)
 |-- CefazoLIN: integer (nullable = true)
 |-- CefePIME: integer (nullable = true)
 |-- Chlorhexidine Gluconate 0.12% Oral Rinse: integer (nullable = true)
 |-- D10W: integer (nullable = true)
 |-- D5 1/2NS: integer (nullable = true)
 |-- D5W: 

## NLP Features

In [1]:
notes = spark.read.csv('s3://mimic-raw/mimic3/noteevents/updated-note.csv', header = True)

Waiting for a Spark session to start...

In [12]:
notes = notes.where(col("CHARTTIME").isNotNull())

In [13]:
notes = notes.where(col('HADM_ID').isin(sample_admissions))

In [14]:
notes = notes.join(admissions.select("HADM_ID", "ADMITTIME_UNIX", "ADMITTIME_PLUS_24H"), on="HADM_ID").\
    withColumn("UNIX_CHARTTIME", unix_timestamp(notes.CHARTTIME))

In [15]:
notes.take(3)

[Row(HADM_ID=u'179159', CHARTTIME=u'2116-02-07 14:08:00', CATEGORY=u'Nursing', TEXT=u'67M w/ h/o multiplemyeloma Dx [**2111**] neuropathy bed-bound cared for by    Dr [**Last Name (STitle) 731**] at [**Company 732**] last seen at [**Hospital1 54**] in [**2112**] a/w GIB. Pt was in    USH at nursing home when maroon stools were noted by staff members.    Patient himself was unaware of rectal bleeding. He denies GI symptoms.    He reports slight lightheadedness. He was transferred from NH to [**Hospital1 **] ED    where his HCT was 17.6 and plts 45 his NGT lavage was neg he was    given 2 units of PRBCs and 6 bags of plts.    Gastrointestinal bleed lower (Hematochezia BRBPR GI Bleed GIB)    Assessment:    Pt required as additional 2 units of blood when he arrived in the MICU    because his HCT only bumped to 19.8 after the 2 units if PRBCs that he    received in the EW and an additional 2 bags of plt for a plt count of    55.  He had an EGD [**2115-2-6**] no bleeding was noted.  He has h

In [16]:
notes.count()

1298607

In [17]:
notes = notes.where((notes.UNIX_CHARTTIME >= notes.ADMITTIME_UNIX) & (notes.UNIX_CHARTTIME <= notes.ADMITTIME_PLUS_24H))

In [18]:
notes.count()

209986

In [42]:
a = notes.select(notes['HADM_ID']).distinct()

In [43]:
a.count()

42100

In [19]:
import pyspark.sql.functions as f
t = notes.groupby("HADM_ID").agg(f.concat_ws(", ", f.collect_list(notes.TEXT)))

In [20]:
t.take(3)

[Row(HADM_ID=u'100140', concat_ws(, , collect_list(TEXT))=u'MICU/SICU NURSING ADMIT NOTE:  24 y.o female PMH: Subdural hematoma evacuated s/p head injury from MVA one year ago no residuals. PT with prolonged intubations and multiple self extubations s/p MVA. PT developed tracheal stenosis.  Trach placed [**7-15**].  Trach changed to t-tube [**12-16**].  PT from [**State **] admitted today for elective micro laryngoscopy with laser lysis of subglottis stenosis and placement of new t-tube.  PRocedure minimally successful.  WIll need repeat procedure to remove scar tissue between vocal cords.  Tx to MICU/SICU for overnight observation and monitoring. ENT beeper # [**Numeric Identifier 7123**]  Neuro:  PT alert oriented X 3 MAE. Denies pain at this time.  Pupils equal/reactive. + cough/gag.  TAlks with electrolarnyx.  CV:  Temp 99.5 HR 80-100 NSR no ectopy.  BP stable.  - edema + pulses.  # 20 IV in R AC D5LR @ 90 cc/hr infusing without difficulty.  Pt denies chest pain SOB dizziness at th

In [21]:
notes = t.withColumnRenamed("concat_ws(, , collect_list(TEXT))", "TEXT")

In [None]:
notes.write.csv("notes_raw.csv", header=True, mode="overwrite")

In [22]:
notes.take(3)

[Row(HADM_ID=u'100140', TEXT=u'MICU/SICU NURSING ADMIT NOTE:  24 y.o female PMH: Subdural hematoma evacuated s/p head injury from MVA one year ago no residuals. PT with prolonged intubations and multiple self extubations s/p MVA. PT developed tracheal stenosis.  Trach placed [**7-15**].  Trach changed to t-tube [**12-16**].  PT from [**State **] admitted today for elective micro laryngoscopy with laser lysis of subglottis stenosis and placement of new t-tube.  PRocedure minimally successful.  WIll need repeat procedure to remove scar tissue between vocal cords.  Tx to MICU/SICU for overnight observation and monitoring. ENT beeper # [**Numeric Identifier 7123**]  Neuro:  PT alert oriented X 3 MAE. Denies pain at this time.  Pupils equal/reactive. + cough/gag.  TAlks with electrolarnyx.  CV:  Temp 99.5 HR 80-100 NSR no ectopy.  BP stable.  - edema + pulses.  # 20 IV in R AC D5LR @ 90 cc/hr infusing without difficulty.  Pt denies chest pain SOB dizziness at this time.  PULM:  T-tube in pl

In [23]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [24]:
tokenizer = Tokenizer(inputCol="TEXT", outputCol="words")

In [25]:
wordsData = tokenizer.transform(notes)

In [26]:
wordsData.show(10)

+-------+--------------------+--------------------+
|HADM_ID|                TEXT|               words|
+-------+--------------------+--------------------+
| 100140|MICU/SICU NURSING...|[micu/sicu, nursi...|
| 100227|[**2160-12-7**] 1...|[[**2160-12-7**],...|
| 100263|[**2102-4-21**] 7...|[[**2102-4-21**],...|
| 100553|[**2146-6-16**] 2...|[[**2146-6-16**],...|
| 100704|Neonatology Atten...|[neonatology, att...|
| 100735|Neonatology Note ...|[neonatology, not...|
| 101272|[**2177-1-12**] 3...|[[**2177-1-12**],...|
| 102113|Nursing Admission...|[nursing, admissi...|
| 102684|ADMIT NOTE   Pt i...|[admit, note, , ,...|
| 103432|Admission Note  P...|[admission, note,...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [27]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("features").show(10,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# write code to merge other feature df with rescaledData on HADM_ID

In [28]:
rescaledData

DataFrame[HADM_ID: string, TEXT: string, words: array<string>, rawFeatures: vector, features: vector]

In [32]:
rescaledData.select("HADM_ID", "features").write("nlp_features.csv", header=True, mode="overwrite")

Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/tmp/kernel-PySpark-f89d6f48-edc4-4ab2-87f2-91825fca761c/pyspark_runner.py", line 194, in <module>
    eval(compiled_code)
  File "<string>", line 1, in <module>
TypeError: 'DataFrameWriter' object is not callable

StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:162)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.re

## Save feature df to hdfs 

In [55]:
features.write.csv("features_combined.csv", header=True, mode="overwrite")