# Data Preparation


In [1]:
import os, tempfile, zipfile, StringIO
from urllib import urlretrieve
import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType

try:
    from azure.storage.blob import BlobService
except ImportError:
    try:
        from azure.storage.blob import BlockBlobService as BlobService
    except ImportError:
        raise Exception('Please ensure that the azure-storage package is installed')

# Fill in your Azure storage account information here
account_name = ''
account_key = ''

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1493038940826_0003,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Obtain the input dataset

This tutorial uses a [diabetes dataset](https://archive.ics.uci.edu/ml/datasets/Diabetes) originally produced for the 1994 AAI Spring Symposium on Artificial Intelligence in Medicine, now generously shared by Dr. Michael Kahn on the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/).

To obtain this dataset and copy it to blob storage, run the code cell below:

In [2]:
blob_service = BlobService(account_name, account_key)
blob_service.create_container('preprocess')

with tempfile.NamedTemporaryFile() as f:
    urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/00296/dataset_diabetes.zip',
                f.name)
    my_file = zipfile.ZipFile(f.name)
    csv_contents = my_file.read('dataset_diabetes/diabetic_data.csv')

try:    
    blob_service.put_block_blob_from_text('preprocess',
                                          'diabetic_data.csv',
                                          csv_contents,
                                          x_ms_blob_content_type='text')
except AttributeError:
    blob_service.create_blob_from_text('preprocess',
                                       'diabetic_data.csv',
                                       csv_contents)

## Load data and randomly generate glucose readings

Reload the data from blob storage as a Spark dataframe.

In [3]:
input_filename = 'wasb://preprocess@{}.blob.core.windows.net/diabetic_data.csv'.format(account_name)
df = sqlContext.read.csv(input_filename, header=True, sep=',', inferSchema=True)

Generate some glucose readings (which unfortunately are not predictive of anything):

In [4]:
def add_noise(x):
    return x + round(np.random.uniform(0, 0.5), 2)

df = df.withColumn('discharge_date', F.lit('2015-01-01'))
df = df.withColumn('glucose_min', F.lit(0))
df = df.withColumn('glucose_max', F.lit(15))
df = df.withColumn('glucose_mean', F.lit(5))
df = df.withColumn('glucose_var', F.lit(9))

udf_add_noise = F.udf(add_noise, DoubleType())
df = df.withColumn('glucose_min', udf_add_noise(df['glucose_min']))
df = df.withColumn('glucose_max', udf_add_noise(df['glucose_max']))
df = df.withColumn('glucose_mean', udf_add_noise(df['glucose_mean']))
df = df.withColumn('glucose_var', udf_add_noise(df['glucose_var']))

## Handle missing values

In [5]:
df = df.select([F.when(df[c].cast('string') != "?", F.col(c)).otherwise(None).alias(c) for c in df.columns])

Add indicator columns for numeric and categorical missing values. (Retain the id columns for merging with other dataframes.)

In [6]:
# Define which variables are in which categories
id_vars = ['encounter_id', 'patient_nbr', 'discharge_date']
label_var = ['readmitted']
num_vars = ['time_in_hospital', 'num_lab_procedures', 'num_procedures',
            'num_medications', 'number_outpatient', 'number_emergency',
            'number_inpatient', 'diag_1', 'diag_2', 'diag_3', 'number_diagnoses',
            'glucose_min', 'glucose_max', 'glucose_mean', 'glucose_var']
cat_vars = ['race', 'gender', 'age', 'weight', 'admission_type_id',
            'discharge_disposition_id', 'admission_source_id',
            'payer_code', 'medical_specialty',
            'max_glu_serum', 'A1Cresult', 'metformin', 'repaglinide', 'nateglinide',
            'chlorpropamide', 'glimepiride', 'acetohexamide', 'glipizide',
            'glyburide', 'tolbutamide', 'pioglitazone', 'rosiglitazone', 'acarbose',
            'miglitol', 'troglitazone', 'tolazamide', 'examide', 'citoglipton',
            'insulin', 'glyburide-metformin', 'glipizide-metformin',
            'glimepiride-pioglitazone', 'metformin-rosiglitazone',
            'metformin-pioglitazone', 'change', 'diabetesMed']

df_mvi = df.select(id_vars + [F.when(df[c].isNull(), 'y').otherwise('n').alias(c + '_missing') for c in num_vars + cat_vars])

Replace missing numeric values with the column means:

In [7]:
df_num = df.select(id_vars + [df[c].cast('double') for c in num_vars])
num_var_means = dict(zip(num_vars,
                         df_num.select([F.mean(df_num[c]).alias(c + '_mean') \
                                        for c in num_vars]).rdd.flatMap(lambda x: x).collect()))
df_num = df_num.select(id_vars + [F.when(df_num[c].isNull(), num_var_means[c]).otherwise(df_num[c]).alias(c) for c in num_vars])

Indicate missing values in categorical columns. Merge with other missing value indicators.

In [8]:
df_cat = df.select(id_vars + [F.when(df[c].isNull(), 'NA_').otherwise(df[c].cast('string')).alias(c) for c in cat_vars])
df_cat = df_cat.join(df_mvi, id_vars, 'inner')
cat_vars = [x for x in df_cat.columns if x not in id_vars]

## Create the string indexing pipeline (takes a while)

In [9]:
s_indexers = [StringIndexer(inputCol=x, outputCol=x + '__indexed__') for x in cat_vars]
si_pipe = Pipeline(stages=s_indexers)
si_pipe_model = si_pipe.fit(df_cat)
df_cat = si_pipe_model.transform(df_cat)

Save the pipeline:

In [25]:
si_pipe_model_filename = 'wasb://model@{}.blob.core.windows.net/si_pipe_model'.format(account_name)
si_pipe_model.write().overwrite().save(si_pipe_model_filename)

Remove from consideration any categorical variables that have only one level:

In [11]:
cat_col_var = df_cat.select([F.variance(df_cat[c]).alias(c + '_sd') for \
                             c in [cv + '__indexed__' for cv in cat_vars]]).rdd.flatMap(lambda x: x).collect()
cat_vars = [cat_vars[i] for i in range(len(cat_col_var)) if cat_col_var[i] != 0]

## Perform one-hot encoding

In [12]:
oh_encoders = [OneHotEncoder(inputCol=x + '__indexed__', outputCol=x + '__encoded__')
              for x in cat_vars]
df_cat = df_cat.select(id_vars + [x + '__indexed__' for x in cat_vars])
oh_pipe_model = Pipeline(stages=oh_encoders).fit(df_cat)
df_cat = oh_pipe_model.transform(df_cat)

Save the pipeline:

In [26]:
oh_pipe_model_filename = 'wasb://model@{}.blob.core.windows.net/oh_pipe_model'.format(account_name)
oh_pipe_model.write().overwrite().save(oh_pipe_model_filename)

## Assemble categorical features into one vector

In [14]:
df_cat = df_cat.select([df_cat[c].alias(c.replace('__encoded__', ''))
                         for c in id_vars + [x + '__encoded__' for x in cat_vars]])
va = VectorAssembler(inputCols=cat_vars, outputCol='cat_features')
df_cat = va.transform(df_cat).select(id_vars + ['cat_features'])

## Map ambiguous labels appropriately

In [15]:
label_map = {'NO': 0, '>30': 0, '<30': 1}
def map_label(label):
    return(label_map[label])

df_label = df.select(id_vars + label_var)
udf_map_label = F.udf(map_label, StringType())
df_label = df_label.withColumn('readmitted', udf_map_label(df_label['readmitted']))

Create string indexer:

In [16]:
m_si_label = StringIndexer(inputCol='readmitted', outputCol='label').fit(df_label)
df_label = m_si_label.transform(df_label)
df_label = df_label.drop('readmitted')

Save string indexer:

In [27]:
si_label_filename = 'wasb://model@{}.blob.core.windows.net/si_label'.format(account_name)
m_si_label.write().overwrite().save(si_label_filename)

## Merge dataframes back together

In [18]:
df = df_label.join(df_num, id_vars, 'inner').join(df_cat, id_vars, 'inner')

In [19]:
va = VectorAssembler(inputCols=(num_vars + ['cat_features']), outputCol='features')
df = va.transform(df).select('label', 'features')

## Save the preprocessed data for model training

In [30]:
data_filename = 'wasb://model@{}.blob.core.windows.net/trainingdata'.format(account_name)
df.write.parquet(data_filename)

# Generate imaginary patients for the simulator

We will train and evaluate the model using the historical patient data preprocessed above. However, we will generate new patients and streaming glucose level readings to demonstrate how our model can be applied to incoming patient data. Here, we generate the imaginary patient profiles:

In [31]:
# number of patients to simulate
num_patients = 100

# get the distributions of numerical and categorical features in the real data
df = sqlContext.read.csv(input_filename, header=True, sep=',', inferSchema=True)

num_vars = ['time_in_hospital', 'num_lab_procedures', 'num_procedures',
            'num_medications', 'number_outpatient', 'number_emergency',
            'number_inpatient', 'diag_1', 'diag_2', 'diag_3', 'number_diagnoses']
cat_vars = ['race', 'gender', 'age', 'weight', 'admission_type_id',
            'discharge_disposition_id', 'admission_source_id',
            'payer_code', 'medical_specialty',
            'max_glu_serum', 'A1Cresult', 'metformin', 'repaglinide', 'nateglinide',
            'chlorpropamide', 'glimepiride', 'acetohexamide', 'glipizide',
            'glyburide', 'tolbutamide', 'pioglitazone', 'rosiglitazone', 'acarbose',
            'miglitol', 'troglitazone', 'tolazamide', 'examide', 'citoglipton',
            'insulin', 'glyburide-metformin', 'glipizide-metformin',
            'glimepiride-pioglitazone', 'metformin-rosiglitazone',
            'metformin-pioglitazone', 'change', 'diabetesMed']

distrib_dict = {}
for column in cat_vars:
    column_dist = iter(df.groupBy(column).count().rdd.flatMap(lambda x: x).collect())
    column_dict = dict(zip(column_dist, column_dist))
    distrib_dict[column] = column_dict
    
    
for column in num_vars:
    column_mean = df.agg(F.mean(F.col(column))).rdd.flatMap(lambda x: x).collect()
    column_stddev = df.agg(F.stddev(F.col(column))).rdd.flatMap(lambda x: x).collect()
    entry = (column_mean[0], column_stddev[0])
    distrib_dict[column] = entry

# remove values that indicate missingness
keys_to_remove = ['?', 'Unknown/Invalid', 'Other', 'PhysicianNotFound', 'None']
for key in distrib_dict.keys():
    if type(distrib_dict[key]) != dict:
        continue
    for key_to_remove in keys_to_remove:
        if key_to_remove in distrib_dict[key]:
            del distrib_dict[key][key_to_remove]
            
random_values = pd.DataFrame(np.random.randint(500000000, size=num_patients).T.tolist(),
                             columns=['patient_nbr'])
for column in cat_vars:
    my_dict = distrib_dict[column]
    possible_values = list(my_dict.keys())
    likelihoods = [my_dict[key] for key in possible_values]
    likelihoods = [float(i) / sum(likelihoods) for i in likelihoods]
    random_values[column] = np.random.choice(possible_values, num_patients, p=likelihoods)
    
for column in num_vars:
    my_mean, my_stddev = distrib_dict[column]
    random_values[column] = np.random.normal(my_mean, my_stddev, num_patients)
    
df_columns = df.columns
df_columns.pop(df_columns.index('readmitted'))  # we include no label for these patients
df_columns.pop(df_columns.index('encounter_id'))  # info in the patient record will be consistent across encounters
random_values = random_values[df_columns]

Upload the newly-generated patient records:

In [32]:
blob_service.create_container('patientrecords')

s = StringIO.StringIO()
random_values.to_csv(s, index=False)
strings = s.getvalue().split('\n')[:-1]
header = strings.pop(0)

for string in strings:
    csv_contents = '\n'.join([header, string])
    nbr = string.split(',')[0]
    try:
        blob_service.put_block_blob_from_text('patientrecords',
                                              '{}.csv'.format(nbr),
                                              csv_contents,
                                              x_ms_blob_content_type='text')
    except AttributeError:
        blob_service.create_blob_from_text('patientrecords',
                                           '{}.csv'.format(nbr),
                                           csv_contents)

About 5-10 minutes after your patient records have been copied to blob storage, you should see simulated glucose levels begin to appear in your storage account's `glucoselevelsaggs` container.