In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
import re
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text  # Imports TF ops for preprocessing.
from sklearn.model_selection import GroupShuffleSplit
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import backend as k
# text preprocesssing function
# removes redudented tokens and maps datetimes to <time_stamp>
def _clean_text_gen(x):
    pattern = r'.[*]+[0-9]{4}[-][0-9]+[-][0-9]+[*]+.\s+[0-9]+[:][0-9]+\s+[A|P]M'
    x = re.sub(pattern, '<timestamp>', x)
    x = re.sub('[\n]', ' ', x)
    tokens =[token for token in x.split(' ') if len(token)>0]
    for token in tokens:
        yield token
            
def clean_text(x):
    results = []
    gen =  _clean_text_gen(x)
    try:
        while True:
            new_token = next(gen)
            try:
                if results[-1] != new_token:
                    results.append(new_token)
                else:
                    pass
            except IndexError:
                results.append(new_token)
    except StopIteration:
        pass
    return ' '.join(results)
MAX_MEMORY = "12g"
data_dir = os.getenv('PHYSIO_HOME')

In [2]:
# creates the spark context 
spark = SparkSession.builder \
    .appName("dp") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()


text_preprocessing = udf(lambda x :clean_text(x),StringType())


## read in admissions data and using Hospital Expired Flag as an Outcome 
path = data_dir + '/' + 'ADMISSIONS.csv'
admissions = spark.read.csv(path, header=True, inferSchema=True)
admissions = admissions.withColumn('HOSPITAL_EXPIRE_FLAG', col('HOSPITAL_EXPIRE_FLAG').cast(IntegerType()))
admissions = admissions.withColumn('HADM_ID', col('HADM_ID').cast(IntegerType()))

path = data_dir + '/' + 'NOTEEVENTS.csv'

# defines schema for notes
schema = StructType([
    StructField("ROW_ID",StringType(), False),
    StructField("SUBJECT_ID",StringType(), False),
    StructField("HADM_ID",IntegerType(), True),
    StructField("CHARTDATE",DateType(), True),
    StructField("CHARTTIME",DateType(), True),
    StructField("STORETIME",DateType(), True),
    StructField("CATEGORY",StringType(), True),
    StructField("DESCRIPTION",StringType(), True),
    StructField("CGID",StringType(), True),
    StructField("ISERROR",StringType(), True),
    StructField("TEXT",StringType(), True)])

# reads notes into a spark data frame
notes = spark.read.csv(path, 
                       schema=schema, 
                       sep=',', 
                       header=True,
                       multiLine=True,
                        quote = '"',
                        escape ='"')\
.filter('CATEGORY like ("%hysic%")')

notes = notes.withColumn("TEXT", text_preprocessing("TEXT") )
                         
## Join Note to Outcomes
notes = notes.join(admissions.select(['HADM_ID', col('HOSPITAL_EXPIRE_FLAG').alias('label')]), 
      on='HADM_ID', 
      how='inner')

## Print Summary Statistics
print('Data Before DownSampeling:')
notes.groupby("CATEGORY")\
.agg(countDistinct('ROW_ID'),  
     countDistinct("HADM_ID"), 
     countDistinct("SUBJECT_ID"),
     mean(length("TEXT")),
     mean(('label'))
    )\
.sort("count(ROW_ID)", ascending =False).show()

## creates a balanced sample of positive and negative phyisicans notes
# notes where mortality happend in the hopsital 
positive = notes\
.select('HADM_ID', "TEXT", "label")\
.filter('label == 1').toPandas()

# notes where mortality did not in the hopsital, sampled in equal protortions as the the positive notes
negative = notes\
.select('HADM_ID', "TEXT", "label")\
.filter('label == 0').limit(positive.shape[0]).toPandas()

# combines negative and positive samples to create a balanced subsample
df_balanced = pd.concat([positive, negative], axis=0).set_index('HADM_ID').sample( frac=1)

print('DownSampeled Data:')
print(df_balanced.shape)                
spark.stop()

Data Before DownSampeling:
+----------+-------------+--------------+-----------------+------------------+------------------+
|  CATEGORY|count(ROW_ID)|count(HADM_ID)|count(SUBJECT_ID)| avg(length(TEXT))|        avg(label)|
+----------+-------------+--------------+-----------------+------------------+------------------+
|Physician |       140100|          8983|             7566|5255.5330692362595|0.1745895788722341|
+----------+-------------+--------------+-----------------+------------------+------------------+

DownSampeled Data:
(48920, 2)


In [3]:
group_kfold = GroupShuffleSplit(n_splits=10)
gen = group_kfold.split(df_balanced, groups = list(df_balanced.index))
train_index, test_index = next(gen)
assert set( df_balanced.iloc[train_index, :].index).intersection(set( df_balanced.iloc[test_index, :].index)) == set()

X_train = df_balanced.iloc[train_index, :]['TEXT'].values  
y_train = df_balanced.iloc[train_index, :]['label'].values 
X_test = df_balanced.iloc[test_index, :]['TEXT'].values  
y_test = df_balanced.iloc[test_index, :]['label'].values 
y = df_balanced.loc[:,'label'].values 
print(X_train.shape, X_test.shape)

# setup tensorflow datasets for training
DATASET_SIZE = X_train.shape[0]
train_size = int(0.7 * DATASET_SIZE)

## Second Split for into train and eval using tensorflow pipes
full_train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
full_train_dataset = full_train_dataset.shuffle(1)
train_dataset = full_train_dataset.take(train_size)
eval_dataset = full_train_dataset.skip(train_size)

test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

(39325,) (9595,)


In [4]:
sequence_lengths_train = list(map(lambda x: len(x.split(' ')), X_train))
max_len = np.max(sequence_lengths_train)
max_padding_len =  int(np.mean(sequence_lengths_train) + np.std(sequence_lengths_train) * 2)
print(F'max_len: {max_len}')
print(F'max_padding_len: {max_padding_len}')


max_len: 3442
max_padding_len: 1555


In [5]:
from  tensorflow.keras.layers.experimental.preprocessing import TextVectorization
vocab_size = 20000
print(F'vocab_size {vocab_size}')
X_train_ds = tf.data.Dataset.from_tensor_slices(X_train)
preprocessor = TextVectorization(max_tokens=vocab_size,
                                 pad_to_max_tokens=max_padding_len,
                                 output_sequence_length=max_padding_len,
                                output_mode='int')
preprocessor.adapt(X_train_ds.batch(64))

vocab_size 20000


In [6]:
from tensorflow.keras.layers import *

In [7]:
n_embedding_dims = 200
batch_size = 64
def build_classifier_model():
    text_input = Input(shape=(1,), batch_size=batch_size, dtype=tf.string, name='text')
    preprocessed = preprocessor( text_input)
    embedding  = Embedding(input_dim=vocab_size+1, 
                           output_dim=n_embedding_dims,  
                           input_length=max_padding_len)(preprocessed)
    pool = GlobalAveragePooling1D()(embedding)
    drop = Dropout(0.1)(pool)
    outputs = Dense(1, activation='sigmoid', name='classifier')(drop)
    
    return tf.keras.Model(text_input,outputs, name='MortalityClassifier')

model = build_classifier_model()

metric = tf.metrics.BinaryAccuracy(name='acc')
loss = loss=tf.keras.losses.BinaryCrossentropy( name='loss')
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.0002,
                                                             decay_steps=10000,
                                                             decay_rate=0.9)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, name='Adam')
val_metric_name = 'val_acc'
weights_path = 'best_weights.h5'

# stops training whenmodel fails to improve
esm =  EarlyStopping(patience=1, monitor=val_metric_name ,mode='max')

# save only the best weights
# checkpoint = ModelCheckpoint(weights_path ,
#                              mode='max',
#                              monitor=val_metric_name , 
#                              verbose=1, 
#                              save_best_only=True,
#                             save_format='tf')
# complies the model 
model.compile(loss=loss, optimizer=optimizer, metrics=[metric])
model.summary()

Model: "MortalityClassifier"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
text (InputLayer)            [(64, 1)]                 0         
_________________________________________________________________
text_vectorization (TextVect (None, 1555)              0         
_________________________________________________________________
embedding (Embedding)        (None, 1555, 200)         4000200   
_________________________________________________________________
global_average_pooling1d (Gl (None, 200)               0         
_________________________________________________________________
dropout (Dropout)            (None, 200)               0         
_________________________________________________________________
classifier (Dense)           (None, 1)                 201       
Total params: 4,000,401
Trainable params: 4,000,401
Non-trainable params: 0
_____________________________________

In [8]:
# train model
n_epochs = 20
history = model.fit(train_dataset.batch(batch_size), 
                      validation_data=eval_dataset.batch(batch_size), 
                      shuffle=True,
                       callbacks=[esm],
                      epochs=n_epochs  
                   ) 

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [9]:
threshold = 0.5
# predicts test set
test_preds = model.predict(test_dataset.batch(batch_size)).flatten()
test_pred_labels = np.array([1 if v > threshold else 0 for v in test_preds])

# predicts training set
train_preds = model.predict(full_train_dataset.batch(batch_size)).flatten()
train_pred_labels = np.array([1 if v > threshold else 0 for v in train_preds])


In [10]:
from sklearn.metrics import *
# calcuates metrics on test data
test_f1 = f1_score(y_test, test_pred_labels)
test_acc_balanced = balanced_accuracy_score(y_test, test_pred_labels)
test_acc = accuracy_score(y_test, test_pred_labels)
test_precision = precision_score(y_test, test_pred_labels)
test_recall = recall_score(y_test, test_pred_labels)
test_auc_score = roc_auc_score(y[test_index], test_preds)
print(F'roc_auc_score: {test_auc_score } on test')

# calculates metrics on training data 
train_f1 = f1_score(y_train, train_pred_labels)
train_acc_balanced = balanced_accuracy_score(y_train, train_pred_labels)
train_acc = accuracy_score(y_train, train_pred_labels)
train_precision = precision_score(y_train, train_pred_labels)
train_recall = recall_score(y_train, train_pred_labels)
train_auc_score = roc_auc_score(y_train, train_preds)
print(F'roc_auc_score: {train_auc_score} on train')

# gets params Artifacts for logging mlflow model
n_cases = np.sum(y == 1)
n_controls = np.sum(y == 0)
n_train_obs = X_train.shape[0]
n_test_obs = X_test.shape[0]

train_label_prob = y_train.mean()
test_label_prob = y_test.mean()
desc = str(model.to_json())
model_type = type(model)



roc_auc_score: 0.869221294512772 on test
roc_auc_score: 0.984839800754114 on train


In [11]:
model.summary()

Model: "MortalityClassifier"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
text (InputLayer)            [(64, 1)]                 0         
_________________________________________________________________
text_vectorization (TextVect (None, 1555)              0         
_________________________________________________________________
embedding (Embedding)        (None, 1555, 200)         4000200   
_________________________________________________________________
global_average_pooling1d (Gl (None, 200)               0         
_________________________________________________________________
dropout (Dropout)            (None, 200)               0         
_________________________________________________________________
classifier (Dense)           (None, 1)                 201       
Total params: 4,000,401
Trainable params: 4,000,401
Non-trainable params: 0
_____________________________________

In [12]:
from mlflow.models.signature import infer_signature
input_example = X_train[1:10]
signature = infer_signature(input_example, model.predict(input_example))

In [13]:
import mlflow
artifact_path = 'Model'
data_grain = 'NOTE_ID'
label_name = 'HOSPITAL_EXPIRE_FLAG'
data_source = 'PhysioMimicIII'
run_name = 'CNN_Embedding_wo_initialization'
experiment_id = 1
tracking_uri = "http://localhost:5000"
mlflow.set_tracking_uri(tracking_uri)
experiment_id=1
with mlflow.start_run(run_name=run_name, experiment_id=experiment_id) as run:
    
    tracking_uri = mlflow.get_tracking_uri()
    artifact_uri = mlflow.get_artifact_uri()
    
    print("Tracking uri: {}".format(tracking_uri))
    print("Artifact uri: {}".format(artifact_uri))
    mlflow.log_metric('n_epochs', len(history.history['loss']))
    for key in list(history.history.keys()):   
        for i, j in enumerate(history.history[key]):
            mlflow.log_metric(key, j, step=i)
    mlflow.log_param('data_source', data_source)
    mlflow.log_param('label_name', label_name)
    mlflow.log_param('data_grain', data_grain)
    mlflow.log_param('n_cases', n_cases)
    mlflow.log_param('n_controls', n_controls)
    mlflow.log_param('n_train_obs', n_train_obs)
    mlflow.log_param('n_test_obs', n_test_obs)
    mlflow.log_param('train_label_prob', train_label_prob)
    mlflow.log_param('test_label_prob', test_label_prob)
    mlflow.log_param('desc', desc)
    mlflow.log_param('model_type',model_type)
    mlflow.log_metric('train_f1', train_f1)
    mlflow.log_metric('train_acc_balanced', train_acc_balanced)
    mlflow.log_metric('train_acc', train_acc)
    mlflow.log_metric('train_precision', train_precision)
    mlflow.log_metric('train_recall', train_recall)
    mlflow.log_metric('train_auc_score', train_auc_score)
    mlflow.log_metric('test_f1', test_f1)
    mlflow.log_metric('test_acc_balanced', test_acc_balanced)
    mlflow.log_metric('test_acc', test_acc)
    mlflow.log_metric('test_precision', test_precision)
    mlflow.log_metric('test_recall', test_recall)
    mlflow.log_metric('test_auc_score', test_auc_score)
    run_id = run.info.run_id
    experiment_id = run.info.experiment_id 
    mlflow.end_run()
    print(F'logging experiment_id: "{experiment_id}" run_id :"{run_id}" completed')

Tracking uri: http://localhost:5000
Artifact uri: ./mlruns/1/18a7c18146f04d908fb0d9435a56b7c9/artifacts
logging experiment_id: "1" run_id :"18a7c18146f04d908fb0d9435a56b7c9" completed
