# Machine Learning at Scale
Dr. James G. Shanahan and Marguerite Oneto, with assistance from Kasane Utsumi   
&copy; copyright July 2016


# The Heritage Health Prize Challenge Using Spark SQL

### Improve Healthcare, Win \$3,000,000.  Identify patients who will be admitted to a hospital within the next year using historical claims data.

This notebook uses [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html), [DataFrames](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame), and [Pipelines](https://spark.apache.org/docs/latest/ml-guide.html) to build a basic pipeline for modeling outcomes using the Heritage Health Prize dataset.

The notebook is organized as follows:  

1.  **Introduction**
2.  **Set Up Spark**
3.  **ETL**
4.  **Feature Engineering**
5.  **Modeling, Evaluation, and Tuning**
6.  **Results**
7.  **Next Steps**
8.  **Resources**
9.  **Appendix**

## Table of Contents <a name="TOC"></a> 

1.  [Introduction](#1)   
2.  [Set Up Spark](#2)
    1.  [Local Set-Up](#2.A)
    2.  [Docker Set-Up](#2.B)
    3.  [Cluster Set-Up](#2.C)
3.  [ETL](#3)
    1.  [Extract](#3.A)
    2.  [EDA-0](#3.B)
    3.  [Transform and Load](#3.C)
        1.  [Target Variables](#3.C.a)
        2.  [Claims Data](#3.C.b)
        3.  [Drug Data](#3.C.c)
        4.  [Lab Data](#3.C.d)
        5.  [Members Data](#3.C.e)
    4.  [EDA-1](#3.D)
4.  [Feature Engineering](#4)
    1.  [Aggregation](#4.A)
        1.  [Claims Data](#4.A.a)
        2.  [Drug Data](#4.A.b)
        3.  [Lab Data](#4.A.c)
    2.  [One Hot Encoding: Members Data](#4.B)
    3.  [Create Final DataFrame For Modeling](#4.C)
        1.  [Merge Data](#4.C.a)
        2.  [Handle Missing Data](#4.C.b)
        3.  [Drop Unnecessary Variables](#4.C.c)
        4.  [External Storage Read/Write of Final DataFrame](#4.C.d)
        5.  [EDA-2](#4.C.e)
5.  [Modeling, Evaluation, and Tuning](#5)
    1.  [Create Training, Validation, and Test DataFrames](#5.A)
    2.  [Define Evaluation Metric](#5.B)
    3.  [Build and Evaluate Baseline Model](#5.C)
    4.  [Feature Selection: Principal Component Analysis (PCA)](#5.D)
    5.  [Hyperparameter Tuning](#5.E)
6.  [Results](#6)
7.  [Next Steps](#7)
8.  [Resources](#8)
9.  [Appendix](#9)
    1.  [Runtimes](#9.A)
    2.  [Example of Training a Non-Linear Model Using Spark Pipelines](#9.B)
    3.  [User-Defined Functions in Spark Pipelines](#9.C)

## 1.  Introduction <a name="1"></a>
[Back to Table of Contents](#TOC)

[The Heritage Health Prize (HHP)](https://www.heritagehealthprize.com/c/hhp) was a data science challenge sponsored by [The Heritage Provider Network](http://www.heritageprovidernetwork.com).  It took place from April 4, 2011 to April 4, 2013.  For information on the winning entries, please see [here](http://www.heritagehealthprize.com/c/hhp/details/milestone-winners).

In this notebook, we follow a traditional data science project process to address the HHP challenge of predicting future hospital stays using past patient treatment information.  We extract, transform, and load the data (**ETL**).We conduct **feature engineering**.  Along the way, we do some exploratory data analysis (**EDA**).  We then **create models, evaluate their performance, and fine tune their parameters**.  We write up our **results**.  We explain what additional research we would like to do in a list of **next steps**.  We list our **resources**.  And finally, we create an **appendix** for tangential information related to our project.

These are the Python modules needed to run this notebook:

1.  pyspark
2.  pandas
3.  os
4.  sys
5.  math

## 2.  Set Up Spark <a name="2"></a>
[Back to Table of Contents](#TOC)

### 2.A Local Set-Up <a name="2.A"></a>
[Back to Table of Contents](#TOC)

Use this code to set up a spark context to run on your local computer.  Set the SPARK_HOME environment variable to the Spark directory being used on your local computer.

In [2]:
# # Replace the path below with your particular local Spark directory
# SPARK_HOME='/usr/local/Cellar/apache-spark/spark-1.6.2-bin-hadoop2.6/'

# import os
# import sys

# spark_home = os.environ['SPARK_HOME'] = SPARK_HOME
# if not spark_home:
#     raise ValueError('SPARK_HOME enviroment variable is not set')
# sys.path.insert(0, os.path.join(spark_home,'python'))
# sys.path.insert(0, os.path.join(spark_home,'python/lib/py4j-0.9-src.zip'))
# execfile(os.path.join(spark_home,'python/pyspark/shell.py'))

# from pyspark.sql import SQLContext
# sql_context = SQLContext(sc)

In [3]:
# sc.stop()

### 2.B Docker Set-Up <a name="2.B"></a>
[Back to Table of Contents](#TOC)

Use this code to set up a Spark Context and a SQL Context to run on a Docker virtual machine on your local computer.

In [4]:
# import os
# import sys

# import pyspark
# from pyspark.sql import SQLContext

# # We can give a name to our app (to find it in Spark WebUI) and configure execution mode
# # In this case, it is local multicore execution with "local[*]"
# app_name = "hhp"
# master = "local[*]"
# conf = pyspark.SparkConf().setAppName(app_name).setMaster(master)
# sc = pyspark.SparkContext(conf=conf)
# sql_context = SQLContext(sc)
# print sc
# print sql_context

### 2.C Cluster Set-Up <a name="2.C"></a>
[Back to Table of Contents](#TOC)

Use this code to set up a SQL Context when the cluster already has a Spark Context running.

In [1]:
sql_context = SQLContext(sc)
print sql_context

<pyspark.sql.context.SQLContext object at 0x7effae3df790>


## 3.  ETL <a name="3"></a>
[Back to Table of Contents](#TOC)

 A copy of the HHP dataset can be downloaded [here](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AADp4D50rGM61hpaSThZnqF3a/HHP_release3?dl=0).  It consists of the following files:
1.  [Claims.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AADAkd0WulfgFthMibmIBtJoa/HHP_release3/Claims.csv?dl=0)  (Claims Data - features)
2.  [DaysInHospital_Y2.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AAD0_gwrKFo657caVjV3mjZVa/HHP_release3/DaysInHospital_Y2.csv?dl=0)  (Target Data for Year 2 - labels)
3.  [DaysInHospital_Y3.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AACmJkf7txiQiPZ_jd5cqsf3a/HHP_release3/DaysInHospital_Y3.csv?dl=0)  (Target Data for Year 3 - labels)
4.  [DrugCount.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AAA_9IxBOsoBSqyhlu7x8z2qa/HHP_release3/DrugCount.csv?dl=0)  (Drug Count Data - features)
5.  [LabCount.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AABxVuWLtBLqdYr6pyJCPsfga/HHP_release3/LabCount.csv?dl=0)  (Lab Count Data - features)
6.  [Members.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AACgVHEh2yUxzavQlurF8hfla/HHP_release3/Members.csv?dl=0)  (Members Data -features)
7.  [Target.csv](https://www.dropbox.com/sh/upt0j2q44ncrn1m/AAD7M6yWTsRtToS7uKmbWrA7a/HHP_release3/Target.csv?dl=0)  (Target Data for Year 4 - labels)

The Extract-Transform-Load process will have three steps:  
1.  Extract  
2.  EDA-0  
3.  Transform and Load  

**NOTE: The data dictionary for the HHP data can be found [here](https://www.heritagehealthprize.com/c/hhp/data).**

<img src="https://dl.dropbox.com/s/d5u2et0xahnt2ct/HHP_DataDictionary.png" width="500" height="500" />

First, set the directory where the data can be found:

In [2]:
import os

# Set the directory containing the raw data files
DATA_DIR = os.path.join('data/HHP_release3')

### 3.A  Extract <a name="3.A"></a>
[Back to Table of Contents](#TOC)

Spark 1.6 does not have native support to load csv files directly into SQL DataFrames. We tried [Databrick's CSV Data Source package](https://github.com/databricks/spark-csv), but that did not work. Thus, we implemented a work-around by first reading the csv file into a pandas dataframe and then reading the pandas dataframe into a Spark SQL DataFrame.

In [3]:
import pandas as pd
from pyspark.sql.types import *

def load_csv_file(filename):
    # HACK - Loading csv directly into a dataframe was supposed to work by using a Databricks
    # package (https://github.com/databricks/spark-csv), but it didn't.
    # So in this section, we are going the Pandas -> DataFrame route.  Could also go RDD -> DataFrame route.
    
    # Set our file path
    input_path = os.path.join(DATA_DIR, filename)
    
    # Read into Pandas dataframe.  We assume the file contains a header row.
    # Note:  We read in all fields as string.
    df_pandas = pd.read_csv(input_path, dtype='str')  
    
    # Read Pandas dataframe into SQL DataFrame.
    # We must define the schema so that datatype is not automatically inferred and everything remains string.
    schema_string = df_pandas.columns.values.tolist()
    fields = [StructField(field_name, StringType(), True) for field_name in schema_string]
    schema = StructType(fields)
    
    return sql_context.createDataFrame(df_pandas, schema)

# Read in Year 2 Target Variables
df_target_Y2 = load_csv_file('DaysInHospital_Y2.csv').cache()
# Check the data
print "Schema for target_Y2 DataFrame with %d rows:" %(df_target_Y2.count())
df_target_Y2.printSchema()
df_target_Y2.show(1)

# Read in Year 3 Target Variables
df_target_Y3 = load_csv_file('DaysInHospital_Y3.csv').cache()
# Check the data
print "Schema for target_Y3 DataFrame with %d rows:" %(df_target_Y3.count())
df_target_Y3.printSchema()
df_target_Y3.show(1)

# Read in Year 4 Target Variables
df_target_Y4 = load_csv_file('Target.csv').cache()
# Check the data
print "Schema for target_Y4 DataFrame with %d rows:" %(df_target_Y4.count())
df_target_Y4.printSchema()
df_target_Y4.show(1)

# Read in Claims Data
df_claims = load_csv_file('Claims.csv').cache()
# Check the data
print "Schema for claims DataFrame with %d rows:" %(df_claims.count())
df_claims.printSchema()
df_claims.show(1)

# Read in Drug Data
df_drug_count = load_csv_file('DrugCount.csv').cache()
# Check the data
print "Schema for drug_count DataFrame with %d rows:" %(df_drug_count.count())
df_drug_count.printSchema()
df_drug_count.show(1)

# Read in Lab Data
df_lab_count = load_csv_file('LabCount.csv').cache()
# Check the data
print "Schema for lab_count DataFrame with %d rows:" %(df_lab_count.count())
df_lab_count.printSchema()
df_lab_count.show(1)

# Read in Members Data
df_members = load_csv_file('Members.csv').cache()
# Check the data
print "Schema for members DataFrame with %d rows:" %(df_members.count())
df_members.printSchema()
df_members.show(1)

Schema for target_Y2 DataFrame with 76038 rows:
root
 |-- MemberID: string (nullable = true)
 |-- ClaimsTruncated: string (nullable = true)
 |-- DaysInHospital: string (nullable = true)

+--------+---------------+--------------+
|MemberID|ClaimsTruncated|DaysInHospital|
+--------+---------------+--------------+
|24027423|              0|             0|
+--------+---------------+--------------+
only showing top 1 row

Schema for target_Y3 DataFrame with 71435 rows:
root
 |-- MemberID: string (nullable = true)
 |-- ClaimsTruncated: string (nullable = true)
 |-- DaysInHospital: string (nullable = true)

+--------+---------------+--------------+
|MemberID|ClaimsTruncated|DaysInHospital|
+--------+---------------+--------------+
|90963501|              0|             0|
+--------+---------------+--------------+
only showing top 1 row

Schema for target_Y4 DataFrame with 70942 rows:
root
 |-- MemberID: string (nullable = true)
 |-- ClaimsTruncated: string (nullable = true)
 |-- DaysInHospita

### 3.B  EDA-0 <a name="3.B"></a>
[Back to Table of Contents](#TOC)

It is a best practice to do some EDA on each table to see what transformations we need to complete to get the data in shape for feature engineering.  Given Spark DataFrames, there are two ways to explore the data:   

1.  Run SQL queries on temporary tables created from the DataFrames using 'sql_context.sql'  OR  
2.  Run queries directly on the DataFrames (for a list of DataFrame operations, see [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)).

Below are examples of each method.  Also see [Section 7: Next Steps](#7.).

In [4]:
# Method 1: SQL queries on temporary tables
# In order to run SQL queries on DataFrames, we must first register them as temporary tables

# df_target_Y2.registerTempTable("target_Y2_tt")
# df_target_Y3.registerTempTable("target_Y3_tt")
# df_target_Y4.registerTempTable("target_Y4_tt")
df_claims.registerTempTable("claims_tt")
# df_drug_count.registerTempTable("drug_count_tt")
# df_lab_count.registerTempTable("lab_count_tt")
# df_members.registerTempTable("members_tt")

In [5]:
# Then we can run some queries on the tt tables
# Example: Count the number of samples in the Claims Data where the LengthOfStay is suppressed
df_sup_los_1 = sql_context.sql("SELECT COUNT(MemberID) FROM claims_tt WHERE SupLOS = 1")
print 'Number of samples in the Claims Data where the LengthOfStay is suppressed:'
df_sup_los_1.show(1)
df_sup_los_1.unpersist()
# Example:  Get all possible values of the variable PrimaryConditionGroup
df_pcg = sql_context.sql("SELECT DISTINCT(PrimaryConditionGroup) FROM claims_tt")
print 'Possible Values for PrimaryConditionGroup:'
df_pcg.show(df_pcg.count())

# Free up memory
sql_context.dropTempTable("claims_tt")

Number of samples in the Claims Data where the LengthOfStay is suppressed:
+-----+
|  _c0|
+-----+
|11332|
+-----+

Possible Values for PrimaryConditionGroup:
+---------------------+
|PrimaryConditionGroup|
+---------------------+
|                  NaN|
|              NEUMENT|
|               RESPR4|
|               INFEC4|
|               SEPSIS|
|               STROKE|
|              PERVALV|
|               CANCRA|
|               CANCRB|
|              LIVERDZ|
|              SEIZURE|
|               CANCRM|
|               PRGNCY|
|               SKNAUT|
|               HEART2|
|               HEART4|
|                  CHF|
|               RENAL1|
|               RENAL2|
|               RENAL3|
|                ROAMI|
|               MISCL1|
|               MISCL5|
|             ARTHSPIN|
|               PNCRDZ|
|              PERINTL|
|                  AMI|
|              APPCHOL|
|               CATAST|
|               TRAUMA|
|                HIPFX|
|                PNEUM|
|

In [6]:
# TO DO:  EDA on each temp table above.

In [7]:
# Method 2: Direct queries of DataFrames
# DataFrame Operations:  https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

# Example: Get counts of values in each column of Claims data
print df_claims.columns
columns_for_analysis = ['Year', 'Specialty', 'PlaceSvc', 'LengthOfStay', 'DSFS', 
                        'PrimaryConditionGroup', 'CharlsonIndex', 'ProcedureGroup', 'SupLOS']
for column in columns_for_analysis:
    df_claims.groupBy(column).count().show()

['MemberID', 'ProviderID', 'Vendor', 'PCP', 'Year', 'Specialty', 'PlaceSvc', 'PayDelay', 'LengthOfStay', 'DSFS', 'PrimaryConditionGroup', 'CharlsonIndex', 'ProcedureGroup', 'SupLOS']
+----+------+
|Year| count|
+----+------+
|  Y1|865689|
|  Y2|898872|
|  Y3|904429|
+----+------+

+--------------------+------+
|           Specialty| count|
+--------------------+------+
|                 NaN|  8405|
|Obstetrics and Gy...| 36594|
|               Other| 92687|
|  Diagnostic Imaging|207297|
|          Pediatrics| 84862|
|      Rehabilitation| 57554|
|            Internal|672059|
|           Emergency|126130|
|             Surgery|208217|
|          Laboratory|653188|
|      Anesthesiology| 33435|
|    General Practice|473655|
|           Pathology| 14907|
+--------------------+------+

+-------------------+-------+
|           PlaceSvc|  count|
+-------------------+-------+
|                NaN|   7632|
|        Urgent Care| 199528|
|              Other|  11700|
|    Independent Lab| 65775

In [8]:
# TO DO:  EDA on each DataFrame above.

### 3.C  Transform and Load <a name="3.C"></a>
[Back to Table of Contents](#TOC)

We use **Spark SQL Pipelines** to transform the data based on our findings in EDA-0.  Mostly, we are converting strings to numeric values.  We follow these steps:  

1.  Create the SQL statements to transform the data.  
2.  Create a SQL transformer for each statement.  
3.  Define a pipeline that runs all the SQL statements in order.  
4.  Fit/define a pipeline model.  
5.  Run the pipeline model to transform the data.  

The SQL statements used in this section are based on this [blog](http://anotherdataminingblog.blogspot.com/2011/10/code-for-respectable-hhp-model.html).

**NOTE: The SQLTransformer in pyspark.ml cannot execute all SQL commands.  This is because DataFrames are based on RDDs, and RDDs are immutable data structures.  Updating elements in place is not possible.  For example, instead of modifying an existing column with "UPDATE ... SET" as in traditional SQL, we must create a new column.  Refer to this [StackOverflow entry](http://stackoverflow.com/questions/29109916/updating-a-dataframe-column-in-spark).**

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

#### 3.C.a  Target Variables <a name="3.C.a"></a>
[Back to Table of Contents](#TOC)

**NOTE: When we build our models below using pipelines, Spark expects that our target variable will have the name 'label'.  We creat 'label' here in the transformation step as LN(DaysInHospital + 1.0).  This is in line with the log root-mean-squared-error (log RMSE) loss function used by HHP to measure model performance.  See [here](https://www.heritagehealthprize.com/c/hhp/details/evaluation) for the original HHP explanation of their evaluation metric and [here](#5.B) in this notebook for a summary.**

In [10]:
# Step 1:  Create the SQL statements
# Convert strings to numbers and add Year
sql_0001_text = "SELECT MemberID, \
                 CAST(ClaimsTruncated AS INT) AS ClaimsTruncated, \
                 CAST(DaysInHospital AS DOUBLE) AS DaysInHospital, \
                 LN(DaysInHospital + 1.0) AS label, \
                 'Y1' AS Year \
                 FROM __THIS__"
sql_0002_text = "SELECT MemberID, \
                 CAST(ClaimsTruncated AS INT) AS ClaimsTruncated, \
                 CAST(DaysInHospital AS DOUBLE) AS DaysInHospital, \
                 LN(DaysInHospital + 1.0) AS label, \
                 'Y2' AS Year \
                 FROM __THIS__"
sql_0003_text = "SELECT MemberID, \
                 CAST(ClaimsTruncated AS INT) AS ClaimsTruncated, \
                 CAST(DaysInHospital AS DOUBLE) AS DaysInHospital, \
                 LN(DaysInHospital + 1.0) AS label, \
                 'Y3' AS Year \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))
sql_0002 = SQLTransformer(statement=(sql_0002_text))
sql_0003 = SQLTransformer(statement=(sql_0003_text))

# Step 3: Define the pipelines
transform_target_Y2_pipeline = Pipeline(stages=[sql_0001])
transform_target_Y3_pipeline = Pipeline(stages=[sql_0002])
transform_target_Y4_pipeline = Pipeline(stages=[sql_0003])

# Step 4: Fit the pipeline models
transform_target_Y2_pipeline_model = transform_target_Y2_pipeline.fit(df_target_Y2)
transform_target_Y3_pipeline_model = transform_target_Y3_pipeline.fit(df_target_Y3)
transform_target_Y4_pipeline_model = transform_target_Y4_pipeline.fit(df_target_Y4)

# Step 5: Run the pipeline model
df_target_Y2_transformed = transform_target_Y2_pipeline_model.transform(df_target_Y2).cache()
df_target_Y3_transformed = transform_target_Y3_pipeline_model.transform(df_target_Y3).cache()
df_target_Y4_transformed = transform_target_Y4_pipeline_model.transform(df_target_Y4).cache()

# Check the transformed data
print 'df_target_Y2_transformed has %d rows:' %(df_target_Y2_transformed.count())
print df_target_Y2_transformed.show(2)
print 'df_target_Y3_transformed has %d rows:' %(df_target_Y3_transformed.count())
print df_target_Y3_transformed.show(2)
print 'df_target_Y4_transformed has %d rows:' %(df_target_Y4_transformed.count())
print df_target_Y4_transformed.show(2)

# Free up memory
df_target_Y2.unpersist()
df_target_Y3.unpersist()
df_target_Y4.unpersist()

df_target_Y2_transformed has 76038 rows:
+--------+---------------+--------------+-----+----+
|MemberID|ClaimsTruncated|DaysInHospital|label|Year|
+--------+---------------+--------------+-----+----+
|24027423|              0|           0.0|  0.0|  Y1|
|98324177|              0|           0.0|  0.0|  Y1|
+--------+---------------+--------------+-----+----+
only showing top 2 rows

None
df_target_Y3_transformed has 71435 rows:
+--------+---------------+--------------+-----+----+
|MemberID|ClaimsTruncated|DaysInHospital|label|Year|
+--------+---------------+--------------+-----+----+
|90963501|              0|           0.0|  0.0|  Y2|
|85160905|              0|           0.0|  0.0|  Y2|
+--------+---------------+--------------+-----+----+
only showing top 2 rows

None
df_target_Y4_transformed has 70942 rows:
+--------+---------------+--------------+-----+----+
|MemberID|ClaimsTruncated|DaysInHospital|label|Year|
+--------+---------------+--------------+-----+----+
|20820036|            

DataFrame[MemberID: string, ClaimsTruncated: string, DaysInHospital: string]

#### 3.C.b  Claims Data <a name="3.C.b"></a>
[Back to Table of Contents](#TOC)

In [11]:
# Step 1:  Create the SQL statements
# Convert strings to numbers
sql_0001_text = "SELECT *, \
                 CASE WHEN PayDelay = '162+' THEN 163 ELSE (CAST(PayDelay AS INT)) END AS PayDelayI \
                 FROM __THIS__"
sql_0002_text = "SELECT *, \
                 CASE \
                 WHEN DSFS = '0- 1 month' THEN 1 \
                 WHEN DSFS = '1- 2 months' THEN 2 \
                 WHEN DSFS = '2- 3 months' THEN 3 \
                 WHEN DSFS = '3- 4 months' THEN 4 \
                 WHEN DSFS = '4- 5 months' THEN 5 \
                 WHEN DSFS = '5- 6 months' THEN 6 \
                 WHEN DSFS = '6- 7 months' THEN 7 \
                 WHEN DSFS = '7- 8 months' THEN 8 \
                 WHEN DSFS = '8- 9 months' THEN 9 \
                 WHEN DSFS = '9-10 months' THEN 10 \
                 WHEN DSFS = '10-11 months' THEN 11 \
                 WHEN DSFS = '11-12 months' THEN 12 \
                 WHEN DSFS IS NULL THEN NULL \
                 END \
                 AS DSFSI \
                 FROM __THIS__"
sql_0003_text = "SELECT *, \
                 CASE \
                 WHEN CharlsonIndex = '0' THEN 0 \
                 WHEN CharlsonIndex = '1-2' THEN 2 \
                 WHEN CharlsonIndex = '3-4' THEN 4 \
                 WHEN CharlsonIndex = '5+' THEN 6 \
                 END \
                 AS CharlsonIndexI \
                 FROM __THIS__"
sql_0004_text = "SELECT *, \
                 CASE \
                 WHEN LengthOfStay = '1 day' THEN 1 \
                 WHEN LengthOfStay = '2 days' THEN 2 \
                 WHEN LengthOfStay = '3 days' THEN 3 \
                 WHEN LengthOfStay = '4 days' THEN 4 \
                 WHEN LengthOfStay = '5 days' THEN 5 \
                 WHEN LengthOfStay = '6 days' THEN 6 \
                 WHEN LengthOfStay = '1- 2 weeks' THEN 11 \
                 WHEN LengthOfStay = '2- 4 weeks' THEN 21 \
                 WHEN LengthOfStay = '4- 8 weeks' THEN 42 \
                 WHEN LengthOfStay = '26+ weeks' THEN 180 \
                 WHEN LengthOfStay IS NULL THEN null \
                 END \
                 AS LengthOfStayI \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))
sql_0002 = SQLTransformer(statement=(sql_0002_text))
sql_0003 = SQLTransformer(statement=(sql_0003_text))
sql_0004 = SQLTransformer(statement=(sql_0004_text))

# Step 3: Define the pipeline
transform_claims_pipeline = Pipeline(stages=[sql_0001, sql_0002, sql_0003, sql_0004])

# Step 4: Fit the pipeline model
transform_claims_pipeline_model = transform_claims_pipeline.fit(df_claims)

# Step 5: Run the pipeline model
df_claims_transformed = transform_claims_pipeline_model.transform(df_claims).cache()

# Check the transformed data
print 'df_claims_transformed has %d rows:' %(df_claims_transformed.count())
print df_claims_transformed.show(2)

# Free up memory
df_claims.unpersist()

df_claims_transformed has 2668990 rows:
+--------+----------+------+-----+----+---------+--------+--------+------------+-----------+---------------------+-------------+--------------+------+---------+-----+--------------+-------------+
|MemberID|ProviderID|Vendor|  PCP|Year|Specialty|PlaceSvc|PayDelay|LengthOfStay|       DSFS|PrimaryConditionGroup|CharlsonIndex|ProcedureGroup|SupLOS|PayDelayI|DSFSI|CharlsonIndexI|LengthOfStayI|
+--------+----------+------+-----+----+---------+--------+--------+------------+-----------+---------------------+-------------+--------------+------+---------+-----+--------------+-------------+
|42286978|   8013252|172193|37796|  Y1|  Surgery|  Office|      28|         NaN|8- 9 months|              NEUMENT|            0|           MED|     0|       28|    9|             0|         null|
|97903248|   3316066|726296| 5300|  Y3| Internal|  Office|      50|         NaN|7- 8 months|              NEUMENT|          1-2|            EM|     0|       50|    8|          

DataFrame[MemberID: string, ProviderID: string, Vendor: string, PCP: string, Year: string, Specialty: string, PlaceSvc: string, PayDelay: string, LengthOfStay: string, DSFS: string, PrimaryConditionGroup: string, CharlsonIndex: string, ProcedureGroup: string, SupLOS: string]

#### 3.C.c  Drug Data <a name="3.C.c"></a>
[Back to Table of Contents](#TOC)

In [12]:
# Step 1:  Create the SQL statements
# Convert strings to numbers
sql_0001_text = "SELECT *, \
                 CASE WHEN DrugCount = '7+' THEN 7 ELSE (CAST(DrugCount AS INT)) END AS DrugCountI \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))

# Step 3: Define the pipeline
transform_drugs_pipeline = Pipeline(stages=[sql_0001])

# Step 4: Fit the pipeline model
transform_drugs_pipeline_model = transform_drugs_pipeline.fit(df_drug_count)

# Step 5: Run the pipeline model
df_drug_count_transformed = transform_drugs_pipeline_model.transform(df_drug_count).cache()

# Check the transformed data
print 'df_drug_count_transformed has %d rows:' %(df_drug_count_transformed.count())
print df_drug_count_transformed.show(2)

# Free up memory
df_drug_count.unpersist()

df_drug_count_transformed has 818241 rows:
+--------+----+-----------+---------+----------+
|MemberID|Year|       DSFS|DrugCount|DrugCountI|
+--------+----+-----------+---------+----------+
|48925661|  Y2|9-10 months|       7+|         7|
|90764620|  Y3|8- 9 months|        3|         3|
+--------+----+-----------+---------+----------+
only showing top 2 rows

None


DataFrame[MemberID: string, Year: string, DSFS: string, DrugCount: string]

#### 3.C.d  Lab Data <a name="3.C.d"></a>
[Back to Table of Contents](#TOC)

In [13]:
# Step 1:  Create the SQL statements
# Convert strings to numbers
sql_0001_text = "SELECT *, \
                 CASE WHEN LabCount = '10+' THEN 10 ELSE (CAST(LabCount AS INT)) END AS LabCountI \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))

# Step 3: Define the pipeline
transform_labs_pipeline = Pipeline(stages=[sql_0001])

# Step 4: Fit the pipeline model
transform_labs_pipeline_model = transform_labs_pipeline.fit(df_lab_count)

# Step 5: Run the pipeline model
df_lab_count_transformed = transform_labs_pipeline_model.transform(df_lab_count).cache()

# Check the transformed data
print 'df_lab_count_transformed has %d rows:' %(df_lab_count_transformed.count())
print df_lab_count_transformed.show(2)

# Free up memory
df_lab_count.unpersist()

df_lab_count_transformed has 361484 rows:
+--------+----+-----------+--------+---------+
|MemberID|Year|       DSFS|LabCount|LabCountI|
+--------+----+-----------+--------+---------+
|69258001|  Y3|2- 3 months|       1|        1|
|10143167|  Y1| 0- 1 month|       2|        2|
+--------+----+-----------+--------+---------+
only showing top 2 rows

None


DataFrame[MemberID: string, Year: string, DSFS: string, LabCount: string]

#### 3.C.e  Members Data <a name="3.C.e"></a>
[Back to Table of Contents](#TOC)

In [14]:
# There are no transformations needed on the members data.  It will be one hot encoded in Section 4.B.

### 3.D  EDA-1 <a name="3.D"></a>
[Back to Table of Contents](#TOC)

It is a best practice to do some EDA on each table to get a feel for the categorical variables, to see the distributions of the numeric variables, etc.  See [here](http://itl.nist.gov/div898/handbook/eda/eda.htm) for a comprehensive list of EDA techniques and the graphs [here](http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/kmbgrkhh73931lo/Titanic-EDA-LogisticRegression.ipynb) for inspiration from the [Titanic Kaggle competition](https://www.kaggle.com/c/titanic).  Also see [Section 7: Next Steps](#7.).

In [15]:
# TO DO: EDA on df_target_Y2_transformed, df_target_Y3_transformed, df_target_Y4_transformed, df_claims_transformed,
#        df_drug_count_transformed, df_lab_count_transformed, df_members

## 4.  Feature Engineering <a name="4"></a>
[Back to Table of Contents](#TOC)

We have three steps to our feature engineering for HHP:

4.A  [Aggregation](#4.A)  
4.B  [One Hot Encoding](#4.B)  
4.C  [Create Final DataFrame for Modeling](#4.C)

### 4.A  Aggregation <a name="4.A"></a>
[Back to Table of Contents](#TOC)

In this section, we create one line of data per MemberID per year.

In the Claims, DrugCount, and LabCount data, there may be multiple lines of data for each member in a particular year.  We must aggregate the variables for each member in each year.

For integer variables, we aggregate them by simple statistics, as in the example for the DrugCount below.

<img src="https://dl.dropbox.com/s/zug5jr2pntcygbg/Aggregate.png" width="800" height="800"/>

For categorical variables, we [one hot encode](https://www.quora.com/What-is-one-hot-encoding-and-when-is-it-used-in-data-science) them (i.e. create [dummy/indicator variables](https://en.wikipedia.org/wiki/Dummy_variable_(statistics)).  We create a new variable for each value of each categorical variable and put a 1 if the category of that variable occurs in that record and a 0 otherwise.  We then sum up how many times that category occurs for each patient each year, as in the example for the categorical variable Place of Service (PlaceSvc) below.

<img src="https://dl.dropbox.com/s/j0jvegfbj84li59/OHE.png" width="900" height="900"/>



#### 4.A.a  Claims Data <a name="4.A.a"></a>
[Back to Table of Contents](#TOC)

In aggregating the Claims data, we count the total number of claims for a patient within a given year, we count how many distinct values each categorical variable takes on, we aggregate the numeric variables (min, max, average, range, etc.), and we one hot encode the  categorical variables.  We also indicate whether the LengthOfStay variable was suppressed (supLOS).

**NOTE: This takes a while to run.**

In [16]:
import time
from datetime import datetime

start_time_agg = time.time()
cpu_start_time_agg = time.clock()
print 'Start time: %s' %(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# Step 1:  Create the SQL statements
# Aggregate variables across Year and MemberID
sql_0001_text = "SELECT MemberID AS MemberID_c, Year AS Year_c, \
                 \
                 COUNT(*) AS num_Claims, \
                 COUNT(DISTINCT ProviderID) AS num_Providers, \
                 COUNT(DISTINCT Vendor) AS num_Vendors, \
                 COUNT(DISTINCT PCP) AS num_PCPs, \
                 COUNT(DISTINCT PlaceSvc) AS num_PlaceSvcs, \
                 COUNT(DISTINCT Specialty) AS num_Specialities, \
                 COUNT(DISTINCT PrimaryConditionGroup) AS num_PrimaryConditionGroups, \
                 COUNT(DISTINCT ProcedureGroup) AS num_ProcedureGroups, \
                 \
                 MAX(PayDelayI) AS max_PayDelay, \
                 MIN(PayDelayI) AS min_PayDelay, \
                 AVG(PayDelayI) AS avg_PayDelay, \
                 \
                 MAX(LengthOfStayI) AS max_LOS, \
                 MIN(LengthOfStayI) AS min_LOS, \
                 AVG(LengthOfStayI) AS avg_LOS, \
                 \
                 SUM(CASE WHEN LengthOfStay IS NULL AND SupLOS = 0 THEN 1 ELSE 0 END) AS LOS_TOT_UNKNOWN, \
                 SUM(CASE WHEN LengthOfStay IS NULL AND SupLOS = 1 THEN 1 ELSE 0 END) AS LOS_TOT_SUPRESSED, \
                 SUM(CASE WHEN LengthOfStay IS NOT NULL THEN 1 ELSE 0 END) AS LOS_TOT_KNOWN, \
                 \
                 MAX(DSFSI) AS max_dsfs, \
                 MIN(DSFSI) AS min_dsfs, \
                 MAX(DSFSI) - MIN(DSFSI) AS range_dsfs, \
                 AVG(DSFSI) AS avg_dsfs, \
                 \
                 MAX(CharlsonIndexI) AS max_CharlsonIndexI, \
                 MIN(CharlsonIndexI) AS min_CharlsonIndexI, \
                 AVG(CharlsonIndexI) AS avg_CharlsonIndexI, \
                 MAX(CharlsonIndexI) - MIN(CharlsonIndexI) AS range_CharlsonIndexI, \
                 \
                 SUM(CASE WHEN PrimaryConditionGroup = 'MSC2a3' THEN 1 ELSE 0 END) AS pcg1, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'METAB3' THEN 1 ELSE 0 END) AS pcg2, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'ARTHSPIN' THEN 1 ELSE 0 END) AS pcg3, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'NEUMENT' THEN 1 ELSE 0 END) AS pcg4, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'RESPR4' THEN 1 ELSE 0 END) AS pcg5, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'MISCHRT' THEN 1 ELSE 0 END) AS pcg6, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'SKNAUT' THEN 1 ELSE 0 END) AS pcg7, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'GIBLEED' THEN 1 ELSE 0 END) AS pcg8, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'INFEC4' THEN 1 ELSE 0 END) AS pcg9, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'TRAUMA' THEN 1 ELSE 0 END) AS pcg10, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'HEART2' THEN 1 ELSE 0 END) AS pcg11, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'RENAL3' THEN 1 ELSE 0 END) AS pcg12, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'ROAMI' THEN 1 ELSE 0 END) AS pcg13, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'MISCL5' THEN 1 ELSE 0 END) AS pcg14, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'ODaBNCA' THEN 1 ELSE 0 END) AS pcg15, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'UTI' THEN 1 ELSE 0 END) AS pcg16, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'COPD' THEN 1 ELSE 0 END) AS pcg17, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'GYNEC1' THEN 1 ELSE 0 END) AS pcg18, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'CANCRB' THEN 1 ELSE 0 END) AS pcg19, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'FXDISLC' THEN 1 ELSE 0 END) AS pcg20, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'AMI' THEN 1 ELSE 0 END) AS pcg21, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'PRGNCY' THEN 1 ELSE 0 END) AS pcg22, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'HEMTOL' THEN 1 ELSE 0 END) AS pcg23, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'HEART4' THEN 1 ELSE 0 END) AS pcg24, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'SEIZURE' THEN 1 ELSE 0 END) AS pcg25, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'APPCHOL' THEN 1 ELSE 0 END) AS pcg26, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'CHF' THEN 1 ELSE 0 END) AS pcg27, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'GYNECA' THEN 1 ELSE 0 END) AS pcg28, \
                 SUM(CASE WHEN PrimaryConditionGroup IS NULL THEN 1 ELSE 0 END) AS pcg29, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'PNEUM' THEN 1 ELSE 0 END) AS pcg30, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'RENAL2' THEN 1 ELSE 0 END) AS pcg31, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'GIOBSENT' THEN 1 ELSE 0 END) AS pcg32, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'STROKE' THEN 1 ELSE 0 END) AS pcg33, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'CANCRA' THEN 1 ELSE 0 END) AS pcg34, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'FLaELEC' THEN 1 ELSE 0 END) AS pcg35, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'MISCL1' THEN 1 ELSE 0 END) AS pcg36, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'HIPFX' THEN 1 ELSE 0 END) AS pcg37, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'METAB1' THEN 1 ELSE 0 END) AS pcg38, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'PERVALV' THEN 1 ELSE 0 END) AS pcg39, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'LIVERDZ' THEN 1 ELSE 0 END) AS pcg40, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'CATAST' THEN 1 ELSE 0 END) AS pcg41, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'CANCRM' THEN 1 ELSE 0 END) AS pcg42, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'PERINTL' THEN 1 ELSE 0 END) AS pcg43, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'PNCRDZ' THEN 1 ELSE 0 END) AS pcg44, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'RENAL1' THEN 1 ELSE 0 END) AS pcg45, \
                 SUM(CASE WHEN PrimaryConditionGroup = 'SEPSIS' THEN 1 ELSE 0 END) AS pcg46, \
                 \
                 SUM(CASE WHEN Specialty = 'Internal' THEN 1 ELSE 0 END) AS sp1, \
                 SUM(CASE WHEN Specialty = 'Laboratory' THEN 1 ELSE 0 END) AS sp2, \
                 SUM(CASE WHEN Specialty = 'General Practice' THEN 1 ELSE 0 END) AS sp3, \
                 SUM(CASE WHEN Specialty = 'Surgery' THEN 1 ELSE 0 END) AS sp4, \
                 SUM(CASE WHEN Specialty = 'Diagnostic Imaging' THEN 1 ELSE 0 END) AS sp5, \
                 SUM(CASE WHEN Specialty = 'Emergency' THEN 1 ELSE 0 END) AS sp6, \
                 SUM(CASE WHEN Specialty = 'Other' THEN 1 ELSE 0 END) AS sp7, \
                 SUM(CASE WHEN Specialty = 'Pediatrics' THEN 1 ELSE 0 END) AS sp8, \
                 SUM(CASE WHEN Specialty = 'Rehabilitation' THEN 1 ELSE 0 END) AS sp9, \
                 SUM(CASE WHEN Specialty = 'Obstetrics and Gynecology' THEN 1 ELSE 0 END) AS sp10, \
                 SUM(CASE WHEN Specialty = 'Anesthesiology' THEN 1 ELSE 0 END) AS sp11, \
                 SUM(CASE WHEN Specialty = 'Pathology' THEN 1 ELSE 0 END) AS sp12, \
                 SUM(CASE WHEN Specialty IS NULL THEN 1 ELSE 0 END) AS sp13, \
                 \
                 SUM(CASE WHEN ProcedureGroup = 'EM' THEN 1 ELSE 0 END ) AS pg1, \
                 SUM(CASE WHEN ProcedureGroup = 'PL' THEN 1 ELSE 0 END ) AS pg2, \
                 SUM(CASE WHEN ProcedureGroup = 'MED' THEN 1 ELSE 0 END ) AS pg3, \
                 SUM(CASE WHEN ProcedureGroup = 'SCS' THEN 1 ELSE 0 END ) AS pg4, \
                 SUM(CASE WHEN ProcedureGroup = 'RAD' THEN 1 ELSE 0 END ) AS pg5, \
                 SUM(CASE WHEN ProcedureGroup = 'SDS' THEN 1 ELSE 0 END ) AS pg6, \
                 SUM(CASE WHEN ProcedureGroup = 'SIS' THEN 1 ELSE 0 END ) AS pg7, \
                 SUM(CASE WHEN ProcedureGroup = 'SMS' THEN 1 ELSE 0 END ) AS pg8, \
                 SUM(CASE WHEN ProcedureGroup = 'ANES' THEN 1 ELSE 0 END ) AS pg9, \
                 SUM(CASE WHEN ProcedureGroup = 'SGS' THEN 1 ELSE 0 END ) AS pg10, \
                 SUM(CASE WHEN ProcedureGroup = 'SEOA' THEN 1 ELSE 0 END ) AS pg11, \
                 SUM(CASE WHEN ProcedureGroup = 'SRS' THEN 1 ELSE 0 END ) AS pg12, \
                 SUM(CASE WHEN ProcedureGroup = 'SNS' THEN 1 ELSE 0 END ) AS pg13, \
                 SUM(CASE WHEN ProcedureGroup = 'SAS' THEN 1 ELSE 0 END ) AS pg14, \
                 SUM(CASE WHEN ProcedureGroup = 'SUS' THEN 1 ELSE 0 END ) AS pg15, \
                 SUM(CASE WHEN ProcedureGroup IS NULL THEN 1 ELSE 0 END ) AS pg16, \
                 SUM(CASE WHEN ProcedureGroup = 'SMCD' THEN 1 ELSE 0 END ) AS pg17, \
                 SUM(CASE WHEN ProcedureGroup = 'SO' THEN 1 ELSE 0 END ) AS pg18, \
                 \
                 SUM(CASE WHEN PlaceSvc = 'Office' THEN 1 ELSE 0 END) AS ps1, \
                 SUM(CASE WHEN PlaceSvc = 'Independent Lab' THEN 1 ELSE 0 END) AS ps2, \
                 SUM(CASE WHEN PlaceSvc = 'Urgent Care' THEN 1 ELSE 0 END) AS ps3, \
                 SUM(CASE WHEN PlaceSvc = 'Outpatient Hospital' THEN 1 ELSE 0 END) AS ps4, \
                 SUM(CASE WHEN PlaceSvc = 'Inpatient Hospital' THEN 1 ELSE 0 END) AS ps5, \
                 SUM(CASE WHEN PlaceSvc = 'Ambulance' THEN 1 ELSE 0 END) AS ps6, \
                 SUM(CASE WHEN PlaceSvc = 'Other' THEN 1 ELSE 0 END) AS ps7, \
                 SUM(CASE WHEN PlaceSvc = 'Home' THEN 1 ELSE 0 END) AS ps8, \
                 SUM(CASE WHEN PlaceSvc IS NULL THEN 1 ELSE 0 END) AS ps9 \
                 \
                 FROM __THIS__ \
                 GROUP BY Year, MemberID"
sql_0002_text = "SELECT *, \
                 \
                 CASE WHEN max_LOS IS NULL THEN 0 ELSE max_LOS END AS new_max_LOS, \
                 CASE WHEN min_LOS IS NULL THEN 0 ELSE min_LOS END AS new_min_LOS, \
                 CASE WHEN avg_LOS IS NULL THEN 0 ELSE avg_LOS END AS new_avg_LOS, \
                 \
                 CASE WHEN max_dsfs IS NULL THEN 0 ELSE max_dsfs END AS new_max_dsfs, \
                 CASE WHEN min_dsfs IS NULL THEN 0 ELSE min_dsfs END AS new_min_dsfs, \
                 CASE WHEN range_dsfs IS NULL THEN -1 ELSE range_dsfs END AS new_range_dsfs, \
                 CASE WHEN avg_dsfs IS NULL THEN 0 ELSE avg_dsfs END AS new_avg_dsfs, \
                 \
                 CASE WHEN range_CharlsonIndexI IS NULL THEN -1 ELSE range_CharlsonIndexI END AS new_range_CharlsonIndexI \
                 \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))
sql_0002 = SQLTransformer(statement=(sql_0002_text))

# Step 3: Define the pipeline
aggregate_claims_pipeline = Pipeline(stages=[sql_0001, sql_0002])

# Step 4: Fit the pipeline model
aggregate_claims_pipeline_model = aggregate_claims_pipeline.fit(df_claims_transformed)

# Step 5: Run the pipeline model
df_claims_aggregated = aggregate_claims_pipeline_model.transform(df_claims_transformed).cache()

# Check the aggregated data
print 'df_claims_aggregated has %d rows:' %(df_claims_aggregated.count())
# print df_claims_aggregated.printSchema()
# print df_claims_aggregated.show(1)

# Free up memory
df_claims_transformed.unpersist()

print 'End time: %s' %(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print 'Elapsed Time: %.2f minutes' %((time.time() - start_time_agg)/60)
print 'Elapsed CPU Time: %.2f minutes' %((time.clock() - cpu_start_time_agg)/60)

Start time: 2016-08-04 19:13:11
df_claims_aggregated has 218415 rows:
End time: 2016-08-04 19:21:12
Elapsed Time: 8.01 minutes
Elapsed CPU Time: 0.00 minutes


#### 4.A.b  Drug Data <a name="4.A.b"></a>
[Back to Table of Contents](#TOC)

In aggregating the Drug data, we calculate the maximum, minimum, and average of DrugCount, and we also count the number of drug records for each patient in a given year.

In [17]:
# Step 1:  Create the SQL statements
# Aggregate variables across Year and MemberID
sql_0001_text = "SELECT MemberID AS MemberID_dc, Year AS Year_dc, \
                 \
                 MAX(DrugCountI) AS max_DrugCount, \
                 MIN(DrugCountI) AS min_DrugCount, \
                 AVG(DrugCountI * 1.0) AS avg_DrugCount, \
                 COUNT(*) AS months_DrugCount \
                 \
                 FROM __THIS__ \
                 GROUP BY Year, MemberID"


# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))

# Step 3: Define the pipeline
aggregate_drugs_pipeline = Pipeline(stages=[sql_0001])

# Step 4: Fit the pipeline model
aggregate_drugs_pipeline_model = aggregate_drugs_pipeline.fit(df_drug_count_transformed)

# Step 5: Run the pipeline model
df_drug_count_aggregated = aggregate_drugs_pipeline_model.transform(df_drug_count_transformed).cache()

# Check the aggregated data
print 'df_drug_count_aggregated has %d rows:' %(df_drug_count_aggregated.count())
print df_drug_count_aggregated.printSchema()
print df_drug_count_aggregated.show(2)

# Free up memory
df_drug_count_transformed.unpersist()

df_drug_count_aggregated has 141571 rows:
root
 |-- MemberID_dc: string (nullable = true)
 |-- Year_dc: string (nullable = true)
 |-- max_DrugCount: integer (nullable = true)
 |-- min_DrugCount: integer (nullable = true)
 |-- avg_DrugCount: decimal(17,5) (nullable = true)
 |-- months_DrugCount: long (nullable = false)

None
+-----------+-------+-------------+-------------+-------------+----------------+
|MemberID_dc|Year_dc|max_DrugCount|min_DrugCount|avg_DrugCount|months_DrugCount|
+-----------+-------+-------------+-------------+-------------+----------------+
|   88457086|     Y2|            2|            1|      1.42857|               7|
|   88916957|     Y1|            3|            1|      1.33333|               6|
+-----------+-------+-------------+-------------+-------------+----------------+
only showing top 2 rows

None


DataFrame[MemberID: string, Year: string, DSFS: string, DrugCount: string, DrugCountI: int]

#### 4.A.c  Lab Data <a name="4.A.c"></a>
[Back to Table of Contents](#TOC)

In aggregating the Lab data, we calculate the maximum, minimum, and average of LabCount, and we also count the number of lab records for each patient in a given year.

In [18]:
# Step 1:  Create the SQL statements
# Aggregate variables across Year and MemberID
sql_0001_text = "SELECT MemberID AS MemberID_lc, Year AS Year_lc, \
                 \
                 MAX(LabCountI) AS max_LabCount, \
                 MIN(LabCountI) AS min_LabCount, \
                 AVG(LabCountI * 1.0) AS avg_LabCount, \
                 COUNT(*) AS months_LabCount \
                 \
                 FROM __THIS__ \
                 GROUP BY Year, MemberID"


# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))

# Step 3: Define the pipeline
aggregate_labs_pipeline = Pipeline(stages=[sql_0001])

# Step 4: Fit the pipeline model
aggregate_labs_pipeline_model = aggregate_labs_pipeline.fit(df_lab_count_transformed)

# Step 5: Run the pipeline model
df_lab_count_aggregated = aggregate_labs_pipeline_model.transform(df_lab_count_transformed).cache()

# Check the aggregated data
print 'df_lab_count_aggregated has %d rows:' %(df_lab_count_aggregated.count())
print df_lab_count_aggregated.printSchema()
print df_lab_count_aggregated.show(2)

# Free up memory
df_lab_count_transformed.unpersist()

df_lab_count_aggregated has 154934 rows:
root
 |-- MemberID_lc: string (nullable = true)
 |-- Year_lc: string (nullable = true)
 |-- max_LabCount: integer (nullable = true)
 |-- min_LabCount: integer (nullable = true)
 |-- avg_LabCount: decimal(17,5) (nullable = true)
 |-- months_LabCount: long (nullable = false)

None
+-----------+-------+------------+------------+------------+---------------+
|MemberID_lc|Year_lc|max_LabCount|min_LabCount|avg_LabCount|months_LabCount|
+-----------+-------+------------+------------+------------+---------------+
|    8236453|     Y3|          10|           3|     6.14286|              7|
|   51031139|     Y1|          10|           5|     8.33333|              3|
+-----------+-------+------------+------------+------------+---------------+
only showing top 2 rows

None


DataFrame[MemberID: string, Year: string, DSFS: string, LabCount: string, LabCountI: int]

### 4.B One Hot Encoding: Members Data <a name="4.B"></a>
[Back to Table of Contents](#TOC)

In this section, we one hot encode the age and sex variables in the Members Data DataFrame.

In [19]:
# Step 1:  Create the SQL statements
# Create indicator variables for age and sex
sql_0001_text = "SELECT *, \
                 CASE WHEN AgeAtFirstClaim  = '0-9' THEN 1 ELSE 0 END AS age_05,  \
                 CASE WHEN AgeAtFirstClaim = '10-19' THEN 1 ELSE 0 END AS age_15, \
                 CASE WHEN AgeAtFirstClaim = '20-29' THEN 1 ELSE 0 END AS age_25, \
                 CASE WHEN AgeAtFirstClaim = '30-39' THEN 1 ELSE 0 END AS age_35, \
                 CASE WHEN AgeAtFirstClaim = '40-49' THEN 1 ELSE 0 END AS age_45, \
                 CASE WHEN AgeAtFirstClaim = '50-59' THEN 1 ELSE 0 END AS age_55, \
                 CASE WHEN AgeAtFirstClaim = '60-69' THEN 1 ELSE 0 END AS age_65, \
                 CASE WHEN AgeAtFirstClaim = '70-79' THEN 1 ELSE 0 END AS age_75, \
                 CASE WHEN AgeAtFirstClaim = '80+' THEN 1 ELSE 0 END AS age_85,   \
                 CASE WHEN AgeAtFirstClaim IS NULL THEN 1 ELSE 0 END AS age_MISS  \
                 FROM __THIS__"
sql_0002_text = "SELECT *, \
                 CASE WHEN Sex = 'M' THEN 1 ELSE 0 END AS sex_M,  \
                 CASE WHEN Sex = 'F' THEN 1 ELSE 0 END AS sex_F   \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))
sql_0002 = SQLTransformer(statement=(sql_0002_text))

# Step 3: Define the pipeline
transform_members_pipeline = Pipeline(stages=[sql_0001, sql_0002])

# Step 4: Fit the pipeline model
transform_members_pipeline_model = transform_members_pipeline.fit(df_members)

# Step 5: Run the pipeline model
df_members_transformed = transform_members_pipeline_model.transform(df_members).cache()

# Check the transformed data
print 'df_members_transformed has %d rows:' %(df_members_transformed.count())
print df_members_transformed.printSchema()
print df_members_transformed.show(2)

# Free up memory
df_members.unpersist()

df_members_transformed has 113000 rows:
root
 |-- MemberID: string (nullable = true)
 |-- AgeAtFirstClaim: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- age_05: integer (nullable = false)
 |-- age_15: integer (nullable = false)
 |-- age_25: integer (nullable = false)
 |-- age_35: integer (nullable = false)
 |-- age_45: integer (nullable = false)
 |-- age_55: integer (nullable = false)
 |-- age_65: integer (nullable = false)
 |-- age_75: integer (nullable = false)
 |-- age_85: integer (nullable = false)
 |-- age_MISS: integer (nullable = false)
 |-- sex_M: integer (nullable = false)
 |-- sex_F: integer (nullable = false)

None
+--------+---------------+---+------+------+------+------+------+------+------+------+------+--------+-----+-----+
|MemberID|AgeAtFirstClaim|Sex|age_05|age_15|age_25|age_35|age_45|age_55|age_65|age_75|age_85|age_MISS|sex_M|sex_F|
+--------+---------------+---+------+------+------+------+------+------+------+------+------+--------+-----+-----+
|1

DataFrame[MemberID: string, AgeAtFirstClaim: string, Sex: string]

### 4.C  Create Final DataFrame For Modeling <a name="4.C"></a>
[Back to Table of Contents](#TOC)

We turn from using SQL to [operating directly on the DataFrames](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame) in order to join all of our data and create the final DataFrame that will be used for modeling.

####  4.C.a  Merge Data <a name="4.C.a"></a>
[Back to Table of Contents](#TOC)

We merge all of our DataFrames together in the following steps:

1.  Union Target DataFrames Across Years  
2.  Join/Merge in Members Data  
3.  Join/Merge in Aggregated Claims Data  
4.  Join/Merge in Aggregated Drug Data  
5.  Join/Merge in Aggregated Lab Data 

In [20]:
# 1. Union Target DataFrames Across Years
df_merged = df_target_Y2_transformed.unionAll(df_target_Y3_transformed).cache()
print 'Number of training samples is %d.' %(df_merged.count())
# print 'Schema for df_merged DataFrame:'
# print df_merged.printSchema()
# print df_merged.show(1)

Number of training samples is 147473.


In [21]:
# 2. Join/Merge in Members Data
df_merged = df_merged.join(df_members_transformed, 'MemberID', 'left_outer').cache()
# print df_merged.printSchema()
# print df_merged.show(1)
print 'Number of rows with missing Members Data: %d' %(df_merged.where('AgeAtFirstClaim IS NULL').count())

Number of rows with missing Members Data: 0


In [22]:
# 3. Join/Merge in Aggregated Claims Data
condition = [df_merged.MemberID == df_claims_aggregated.MemberID_c, df_merged.Year == df_claims_aggregated.Year_c]
df_merged = df_merged.join(df_claims_aggregated, condition, 'left_outer') \
                     .drop('MemberID_c') \
                     .drop('Year_c') \
                     .cache()
# print df_merged.printSchema()
# print df_merged.show(1)
print 'Number of rows with missing Claims Data: %d' %(df_merged.where('num_Claims IS NULL').count())

Number of rows with missing Claims Data: 0


In [23]:
# 4. Join/Merge in Aggregated Drug Data
condition = [df_merged.MemberID == df_drug_count_aggregated.MemberID_dc, df_merged.Year == df_drug_count_aggregated.Year_dc]
df_merged = df_merged.join(df_drug_count_aggregated, condition, 'left_outer') \
                     .drop('MemberID_dc') \
                     .drop('Year_dc') \
                     .cache()
# print df_merged.printSchema()
# print df_merged.show(1)
print 'Number of rows with missing Drug Data: %d' %(df_merged.where('max_DrugCount IS NULL').count())

Number of rows with missing Drug Data: 51423


In [24]:
# 5. Join/Merge in Aggregated Lab Data
condition = [df_merged.MemberID == df_lab_count_aggregated.MemberID_lc, df_merged.Year == df_lab_count_aggregated.Year_lc]
df_merged = df_merged.join(df_lab_count_aggregated, condition, 'left_outer') \
                     .drop('MemberID_lc') \
                     .drop('Year_lc') \
                     .cache()
# print df_merged.printSchema()
# print df_merged.show(1)
print 'Number of rows with missing Lab Data: %d' %(df_merged.where('max_LabCount IS NULL').count())

Number of rows with missing Lab Data: 42978


In [25]:
# Free up memory
df_target_Y2_transformed.unpersist()
df_target_Y3_transformed.unpersist()
df_members_transformed.unpersist()
df_claims_aggregated.unpersist()
df_drug_count_aggregated.unpersist()
df_lab_count_aggregated.unpersist()

DataFrame[MemberID_lc: string, Year_lc: string, max_LabCount: int, min_LabCount: int, avg_LabCount: decimal(17,5), months_LabCount: bigint]

#### 4.C.b  Handle Missing Data <a name="4.C.b"></a>
[Back to Table of Contents](#TOC)

We need to replace null values for those members who are missing drug and/or lab data.  We also put in indicator variables to let us know whether a member is missing this information.

In [26]:
# Step 1:  Create the SQL statements
# Create indicator variables for missing DrugCount and LabCount data, and replace null values with 0's.
sql_0001_text = "SELECT *, \
                 CASE WHEN max_DrugCount IS NULL THEN 1 ELSE 0 END AS null_DrugCount,  \
                 CASE WHEN max_LabCount IS NULL THEN 1 ELSE 0 END AS null_LabCount, \
                 \
                 CASE WHEN max_DrugCount IS NULL THEN 0 ELSE max_DrugCount END AS new_max_DrugCount, \
                 CASE WHEN min_DrugCount IS NULL THEN 0 ELSE min_DrugCount END AS new_min_DrugCount, \
                 CASE WHEN avg_DrugCount IS NULL THEN 0 ELSE avg_DrugCount END AS new_avg_DrugCount, \
                 CASE WHEN months_DrugCount IS NULL THEN 0 ELSE months_DrugCount END AS new_months_DrugCount, \
                 \
                 CASE WHEN max_LabCount IS NULL THEN 0 ELSE max_LabCount END AS new_max_LabCount, \
                 CASE WHEN min_LabCount IS NULL THEN 0 ELSE min_LabCount END AS new_min_LabCount, \
                 CASE WHEN avg_LabCount IS NULL THEN 0 ELSE avg_LabCount END AS new_avg_LabCount, \
                 CASE WHEN months_LabCount IS NULL THEN 0 ELSE months_LabCount END AS new_months_LabCount \
                 \
                 FROM __THIS__"

# Step 2: Create the SQL transformers
sql_0001 = SQLTransformer(statement=(sql_0001_text))

# Step 3: Define the pipeline
missing_data_pipeline = Pipeline(stages=[sql_0001])

# Step 4: Fit the pipeline model
missing_data_pipeline_model = missing_data_pipeline.fit(df_merged)

# Step 5: Run the pipeline model
df_complete = missing_data_pipeline_model.transform(df_merged).cache()

# Check the complete data
print 'df_complete has %d rows:' %(df_complete.count())
print 'Double-Check number of rows with missing Drug Data: %d' %(df_complete.where('null_DrugCount = 1').count())
print 'Double-Check number of rows with missing Lab Data: %d' %(df_complete.where('null_LabCount = 1').count())
# print df_complete.printSchema()
# print df_complete.show(1)

# Free up memory
# df_merged.unpersist()

df_complete has 147473 rows:
Double-Check number of rows with missing Drug Data: 51423
Double-Check number of rows with missing Lab Data: 42978


#### 4.C.c  Drop Unnecessary Variables <a name="4.C.c"></a>
[Back to Table of Contents](#TOC)

We drop all the variables not needed for modeling and drop any rows with null values.

In [27]:
df_final = df_complete.drop('MemberID') \
                      .drop('Year') \
                      .drop('DaysInHospital') \
                      .drop('AgeAtFirstClaim') \
                      .drop('Sex') \
                      .drop('max_LOS') \
                      .drop('min_LOS') \
                      .drop('avg_LOS') \
                      .drop('max_dsfs') \
                      .drop('min_dsfs') \
                      .drop('range_dsfs') \
                      .drop('avg_dsfs') \
                      .drop('range_CharlsonIndexI') \
                      .drop('max_DrugCount') \
                      .drop('min_DrugCount') \
                      .drop('avg_DrugCount') \
                      .drop('months_DrugCount') \
                      .drop('max_LabCount') \
                      .drop('min_LabCount') \
                      .drop('avg_LabCount') \
                      .drop('months_LabCount') \
                      .dropna() \
                      .cache()
                
# Check the final data
print 'df_final has %d rows and %d columns.' %(df_final.count(), len(df_final.columns))
# print df_final.printSchema()
# print df_final.show(1)

# Free up memory
df_complete.unpersist()

df_final has 147473 rows and 135 columns.


DataFrame[MemberID: string, ClaimsTruncated: int, DaysInHospital: double, label: double, Year: string, AgeAtFirstClaim: string, Sex: string, age_05: int, age_15: int, age_25: int, age_35: int, age_45: int, age_55: int, age_65: int, age_75: int, age_85: int, age_MISS: int, sex_M: int, sex_F: int, num_Claims: bigint, num_Providers: bigint, num_Vendors: bigint, num_PCPs: bigint, num_PlaceSvcs: bigint, num_Specialities: bigint, num_PrimaryConditionGroups: bigint, num_ProcedureGroups: bigint, max_PayDelay: int, min_PayDelay: int, avg_PayDelay: double, max_LOS: int, min_LOS: int, avg_LOS: double, LOS_TOT_UNKNOWN: bigint, LOS_TOT_SUPRESSED: bigint, LOS_TOT_KNOWN: bigint, max_dsfs: int, min_dsfs: int, range_dsfs: int, avg_dsfs: double, max_CharlsonIndexI: int, min_CharlsonIndexI: int, avg_CharlsonIndexI: double, range_CharlsonIndexI: int, pcg1: bigint, pcg2: bigint, pcg3: bigint, pcg4: bigint, pcg5: bigint, pcg6: bigint, pcg7: bigint, pcg8: bigint, pcg9: bigint, pcg10: bigint, pcg11: bigint, p

#### 4.C.d   External Storage Read/Write of Final DataFrame <a name="4.C.d"></a>
[Back to Table of Contents](#TOC)

In case your kernel dies, and you do not want to rerun all of the above cells, you may want to store the final DataFrame.  Here is code for writing to and reading from external storage.

In [28]:
# Write to/Read from external storage

# Write DataFrame to external storage
df_final.write.save('df_final', mode='overwrite')
print 'DataFrame df_final written to storage.'

# Read DataFrame from external storage
# df_final = sql_context.read.load('df_final')
# print 'DataFrame df_final read from storage.'

DataFrame df_final written to storage.


#### 4.C.e   EDA-2 <a name="4.C.e"></a>
[Back to Table of Contents](#TOC)

It is a best practice to do more EDA after feature engineering to see how the features relate to the target variable.  See [Section 7: Next Steps](#7.).

In [29]:
# TO DO: EDA on df_final.  Explore correlations of features and target variable, etc.  
#        See if a feature set can be found that beats the performance of PCA in Section 5.D.

## 5.  Modeling and Evaluation <a name="5"></a>
[Back to Table of Contents](#TOC)

### 5.A  Create Training, Validation, and Test DataFrames <a name="5.A"></a>
[Back to Table of Contents](#TOC)

We randomly split the data in two steps, with 70% for training, 15% for validation, and 15% for final testing.

In [30]:
df_train, df_other = df_final.randomSplit([0.70, 0.30], seed=24)
df_val, df_test = df_other.randomSplit([0.50, 0.50], seed=24)

df_train.cache()
df_val.cache()
df_test.cache()

print 'Number of samples in training dataset: %d' %(df_train.count())
print 'Number of samples in validation dataset: %d' %(df_val.count())
print 'Number of samples in test dataset: %d' %(df_test.count())


Number of samples in training dataset: 103229
Number of samples in validation dataset: 21980
Number of samples in test dataset: 22264


In [31]:
# We create a function to get the column names of a given DataFrame, excluding the label (DaysInHospital)

def get_column_names(dataframe):
    column_names = dataframe.columns
    final_columns = []
    for column in column_names:
        if column != 'label':
            final_columns.append(column)
    return final_columns
    

### 5.B   Define Evaluation Metric <a name="5.B"></a>
[Back to Table of Contents](#TOC)

The [HHP prediction accuracy measure](https://www.heritagehealthprize.com/c/hhp/details/evaluation) is the log root-mean-squared-error (log RMSE) loss function

\begin{equation}
\epsilon = \sqrt{\frac{1}{n}\displaystyle\sum_{i=1}^{n}{[log(p_i + 1) - log(a_i + 1)]\,^2}}
\end{equation}

where 
1.  $i$ is a member;  
2.  $n$ is the total number of members;  
3.  $p_i$ is the predicted number of days spent in the hospital for member $i$ in the test period;  
4.  $a_i$ is the actual number of days spent in the hospital for member $i$ in the test period.  

We will calculate this loss for the models we will train below.  

To help in this calculation, we create a function to calculate the squared difference of the predictions and actuals.

In [32]:
def calc_squared_error(line):
    actual = line[0]
    prediction = line[1]
    return (prediction - actual)**2

### 5.C   Build and Evaluate Baseline Model <a name="5.C"></a>
[Back to Table of Contents](#TOC)

We use pipelines to build a baseline linear regression model with elastic net regularization (a combination of L1 and L2 regularization).  See [here](http://spark.apache.org/docs/latest/mllib-linear-methods.html#regularizers) for more information on regularization options for linear regression in Spark.

The baseline model includes all 135 features.  In Section 5.D below, we will do some feature selection to reduce the dimensionality of the feature space.

In [33]:
import math
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler

In [34]:
# Get all of our features together into one array called "features".  Do not include the label!
feature_assembler = VectorAssembler(inputCols=get_column_names(df_train), outputCol="features")

# Define our model
lr = LinearRegression(maxIter=100, elasticNetParam=0.80, labelCol="label", featuresCol="features", 
                      predictionCol = "prediction")

# Define our pipeline
pipeline_baseline = Pipeline(stages=[feature_assembler, lr])

# Train our model using the training data
model_baseline = pipeline_baseline.fit(df_train)

# Use our trained model to make predictions using the validation data
output_baseline = model_baseline.transform(df_val)  #.select("features", "label", "prediction", "coefficients")
predictions_baseline = output_baseline.select("label", "prediction")

# Measure the performance of our trained model on the validation data
loss_baseline =  math.sqrt(predictions_baseline.map(calc_squared_error) \
                                               .reduce(lambda x,y: x+y)/float(predictions_baseline.count()))
print "The log RMSE loss for the baseline model using linear regression is %.6f" %(loss_baseline)



The log RMSE loss for the baseline model using linear regression is 0.462934


In [35]:
# Get the parameters of the baseline model
print 'Baseline Model intercept:'
print model_baseline.stages[-1].intercept
print 'Baseline Model coefficients:'
print model_baseline.stages[-1].coefficients


# See Section 7: Next Steps #12
# Get linear regression summary, such as coefficient standard errors, R-squared, etc.
# This has only become available with Spark 2.0.0.
# See the documentation on 'class pyspark.ml.regression.LinearRegressionSummary' at 
# https://spark.apache.org/docs/2.0.0/api/python/pyspark.ml.html?highlight=crossvalidator#pyspark.ml.regression.LinearRegressionSummary

Baseline Model intercept:
0.291329776374
Baseline Model coefficients:
[0.144059618566,-0.151405835641,-0.159943410084,-0.0945226337502,-0.126005534019,-0.160065302438,-0.159586868412,-0.123135310773,-0.0646469532678,0.00756374170444,0.0,-0.1446056407,-0.130053570429,0.000955626283541,0.000112108451644,0.0229743737326,-0.0133904405207,-0.00642017373504,-0.0161047437375,0.00552072364695,-0.0114542279277,0.000143528842829,0.00048435234629,-0.000524532175696,0.0,0.0,0.000955626283541,0.00584091589735,0.00608151809875,0.00401875751127,0.000222640563173,-0.00271415204716,0.00270136696357,-0.000327433605455,-0.00366323329242,-0.00353123606677,-0.00726889232987,0.0052131416062,-0.00759023625725,-0.00713348791836,-0.000146878174716,0.000693943085698,0.00414833327466,-0.00366063008065,-0.0100261858525,-0.00201529447708,0.0015321798347,0.0126907275677,-7.5556850662e-05,-0.00551560242023,0.00487105020992,0.0329217174065,0.00755463041043,-0.000701717749044,0.00135268189973,-0.00602011756255,0.00374

### 5.D   Feature Selection: Principal Component Analysis (PCA) <a name="5.D"></a>
[Back to Table of Contents](#TOC)

Here are some references for doing feature selection:  

-  For Categorical Features:  http://spark.apache.org/docs/latest/mllib-feature-extraction.html 
-  For Singular Value Decomposition and Principal Component Analysis:  http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html   

We will use PCA to reduce our 135 variables down to just 10 components, and then fit another linear regression model using the 10 components.

In [37]:
from pyspark.ml.feature import PCA

# Pipeline for generating 10 PCA components

# Get all of our features together into one array called "features".  Do not include the label!
feature_assembler = VectorAssembler(inputCols=get_column_names(df_train), outputCol="features")

# Define our model
pca = PCA(k=10, inputCol="features", outputCol="pca_features")

# Define our pipeline
pipeline_pca = Pipeline(stages=[feature_assembler, pca])

# Train our model using the training data
model_pca = pipeline_pca.fit(df_train)
print 'PCA training complete.'

# Use our trained model to output new PCA features
df_train_pca = model_pca.transform(df_train)
df_val_pca = model_pca.transform(df_val)
df_test_pca = model_pca.transform(df_test)

df_train_pca.cache()
df_val_pca.cache()
df_test_pca.cache()

print 'Example PCA components:'
print df_train_pca.select('pca_features').take(1)

PCA training complete.
Example PCA components:
[Row(pca_features=DenseVector([-101.8709, -7.232, 3.351, 1.5132, -3.182, 7.2285, -4.2851, -2.1116, -7.564, 3.5573]))]


In [38]:
# Pipeline for building a linear regression model with the 10 components calculated above.
# Note: We do not need to use the VectorAssembler here because the 10 PCA components are already
#       compiled in a vector stored in the variable "pca_features."

# Define our model
lr_pca = LinearRegression(maxIter=100, elasticNetParam=0.80, labelCol="label", featuresCol="pca_features", 
                      predictionCol = "prediction")

# Define our pipeline
pipeline_pca = Pipeline(stages=[lr_pca])

# Train our model using the training data
model_pca = pipeline_pca.fit(df_train_pca)

# Use our trained model to make predictions using the validation data
output_pca = model_pca.transform(df_val_pca)
predictions_pca = output_pca.select("label", "prediction")

# Measure the performance of our trained model on the validation data
loss_pca =  math.sqrt(predictions_pca.map(calc_squared_error) \
                                     .reduce(lambda x,y: x+y)/float(predictions_pca.count()))
print "The log RMSE loss for the PCA model using linear regression is %.6f" %(loss_pca)

The log RMSE loss for the PCA model using linear regression is 0.475768


In [39]:
# Get the parameters of the PCA model
print 'PCA Model Intercept:'
print model_pca.stages[-1].intercept
print 'PCA Model Coefficients:'
print model_pca.stages[-1].coefficients

PCA Model Intercept:
0.0479622137003
PCA Model Coefficients:
[-0.000987254186672,0.00211178198125,-0.00443428036693,-0.000759002880767,0.0012825897954,-0.00126693653089,-0.00395773531123,-0.00888382580741,-0.00183520669693,-0.00558761965335]


### 5.E   Hyperparameter Tuning <a name="5.E"></a>
[Back to Table of Contents](#TOC)

Hyperparameter tuning can be done using spark.ml pipelines.  See [here](http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-cross-validation) for examples of using **cross validation** to find the best hyperparameters.  See [here](http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-train-validation-split) for examples of using **train-validation split** to find the best hyperparameters.

In this section, we will use **cross validation** to find the best hyperparameters for a **linear regression model** using our **PCA features** from Section [5.D](#5.D).

The steps we will follow for using the [CrossValidator](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.tuning.CrossValidator) from [spark.ml.tuning](http://spark.apache.org/docs/latest/ml-tuning.html#ml-tuning-model-selection-and-hyperparameter-tuning) are:

1.  Set lists of the parameters to try
2.  Build the parameter grid that will be searched
3.  Create a CrossValidator instance
4.  Run the cross validation to find the best set of parameters
5.  Validate the final best model

In [40]:
import time
from datetime import datetime
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Set lists of the parameters to try
elastic_net_params = [0.20, 0.50, 0.80]
print 'Step 1 complete.'

# Step 2: Build the parameter grid that will be searched
param_grid_lr_pca = ParamGridBuilder().addGrid(lr_pca.elasticNetParam, elastic_net_params) \
                                      .build()
print 'Step 2 complete.'

# Step 3: Create a CrossValidator instance
cv_lr_pca = CrossValidator().setEstimator(pipeline_pca) \
                            .setEvaluator(RegressionEvaluator(metricName="rmse")) \
                            .setEstimatorParamMaps(param_grid_lr_pca) \
                            .setNumFolds(2)  # Use 3+ in practice
print 'Step 3 complete.'

# Step 4: Run the cross validation to find the best set of parameters
print 'Start time: %s' %(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
model_cv_lr_pca = cv_lr_pca.fit(df_train_pca)
print 'Step 4 complete.  End Time: %s' %(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# Step 5: Validate the final best model
output_cv_lr_pca = model_cv_lr_pca.transform(df_val_pca).select("features", "label", "prediction")
predictions_cv_lr_pca = output_cv_lr_pca.select("label", "prediction")
loss_cv_lr_pca =  math.sqrt(predictions_cv_lr_pca.map(calc_squared_error) \
                                                 .reduce(lambda x,y: x+y)/float(predictions_cv_lr_pca.count()))
print 'Step 5 complete.'
print "The RMSE loss for the best linear regression model found using cross validation is %.6f" %(loss_cv_lr_pca)

Step 1 complete.
Step 2 complete.
Step 3 complete.
Start time: 2016-08-04 19:28:30
Step 4 complete.  End Time: 2016-08-04 19:31:20
Step 5 complete.
The RMSE loss for the best linear regression model found using cross validation is 0.475768


In [41]:
# Double-check our calculate of the log RMSE above
print 'Double-check Log RMSE:'
print RegressionEvaluator(metricName="rmse").evaluate(model_cv_lr_pca.transform(df_val_pca))

# Get the parameters of the best model
print 'Best Model Intercept:'
print model_cv_lr_pca.bestModel.stages[-1].intercept
print 'Best Model Coefficients:'
print model_cv_lr_pca.bestModel.stages[-1].coefficients

Double-check Log RMSE:
0.4757684172
Best Model Intercept:
0.0479622137003
Best Model Coefficients:
[-0.000987254186672,0.00211178198125,-0.00443428036693,-0.000759002880767,0.0012825897954,-0.00126693653089,-0.00395773531123,-0.00888382580741,-0.00183520669693,-0.00558761965335]


## 6.  Results <a name="6"></a>
[Back to Table of Contents](#TOC)

Our **best result (log RMSE = 0.462934)** was achieved by the baseline linear regression model run on all 135 features.  The PCA linear regression model using 10 principal components had **log RMSE = 0.475768**, even after hyperparameter tuning.

<table align="left">
<caption>Model Performance:  Loss</caption>
<tr><td align="center">Baseline Linear Regression</td><td align="center">PCA Linear Regression</td><td align="center">Hyperparameter Tuning on PCA</td></tr>
<tr><td align="center">0.462934</td><td align="center">0.475768</td><td align="center">0.475768</td></tr>
</table>   

## 7.  Next Steps <a name="7"></a>
[Back to Table of Contents](#TOC)

1.  Complete [Section 3.B: EDA-0](#3.B).  Gather information to see what transformations may need to be done on the data.  Answer questions about each raw DataFrame.  In general, is the data in good shape?  For example, in each of the Target DataFrames (df_target_Y1, df_target_Y2, df_target_Y3), what values does DaysInHospital take on?  Are they all integers?  What values does ClaimsTruncated take on?  Are they all integers?  In the Claims DataFrame (df_claims), how many different ProviderIDs are there?  How many different PrimaryConditionGroups are there?  What are their values?  What values can the CharlesonIndex take on?  Are they integers?  In the Drug Count DataFrame (df_drug_count), what values can DrugCount take on?  Are they all integers?  Given this information, what transformations are needed?

2.  Complete [Section 3.D: EDA-1](#3.D).  Create tables and graphs to display information about the transformed DataFrames.  For inspiration, see the [Titanic notebook](http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/kmbgrkhh73931lo/Titanic-EDA-LogisticRegression.ipynb).  Answer questions about each DataFrame.  For example, in each of the Target DataFrames (df_target_Y1, df_target_Y2, df_target_Y3), what is the minimum, maximum, mean, and standard deviation of DaysInHospital?  In the Claims DataFrame, group by MemberID and Year and count the number of records.  What is the minimum, maximum, mean, and standard deviation of the count?  Do the same for the Drug Count and Lab Count DataFrames, etc.

3.  In [Section 4.A.a: Claims Data](#4.A.a), where we aggregate by MemberID and Year, automate the creation of the SQL string to one hot encode the categorical variables by selecting distinct values of the variables PrimaryConditionGroup, Specialty, ProcedureGroup, and PlaceSvc.

4.  Complete [Section 4.C.e: EDA-2](#4.C.e).  Create tables and graphs to display information relating the different feature variables to the target variable.  Get a feel for which features might be predictive of DaysInHospital.

5.  In [Section 5.C: Build and Evaluate Baseline Model](#5.C), we build a linear regression model.  What if the model predicts negative values?  Transform the predictions so that any negative values are recoded to 0 and remeasure model performance.

6.  In [Section 5.D: Feature Selection: Principal Component Analysis (PCA)](#5.D), try doing some hyperparameter tuning (i.e. try different numbers of components).  Print the model performance results out in a table.

7.  In [Section 5.D: Feature Selection: Principal Component Analysis (PCA)](#5.D), build a model by selecting features by hand based on the findings in [Section 4.C.e: EDA-2](#4.C.e).  Can a model be built that outperforms the PCA model?

8.  In Section [5.E: Hyperparameter Tuning](#5.E), try using the [train-validation split](http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-train-validation-split).  Does it outperform cross-validation?  Does it yield different optimal hyperparameters?

9.  At the end of [Section 5.E: Hyperparameter Tuning](#5.E), figure out how to get the "best model" hyperparameters out of PipelineModels (we could not find how to do this in the Spark documentation).  Start looking [here](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=pipeline#pyspark.ml.PipelineModel).  Although, it may not be possible.  See [here](http://stackoverflow.com/questions/36697304/how-to-extract-model-hyper-parameters-from-spark-ml-in-pyspark).  A user-defined function may have to be created.  See example [here](#9.C).

10.  Could particular providers (ProviderID), vendors (Vendor), or primary care physicians (PCP) work with sicker patients who are more likely to go into the hospital?  If so, these variables might be highly correlated with the target variable (DaysInHospital).  There are thousands of different ProviderIDs, Vendors, and PCPs in the Claims data.  If we were to one hot encode them, this would create thousands of new features.  One way to reduce the dimensionality would be to hash the new features.  For an example of hashing, see the ["3 Idiots' Approach"](http://www.csie.ntu.edu.tw/~r01922136/kaggle-2014-criteo.pdf) to the [Kaggle Criteo competition](https://www.kaggle.com/c/criteo-display-ad-challenge).  Also see their code [here](https://github.com/guestwalk/kaggle-2014-criteo).

11.  Try additional models.  See possibilities [here](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.regression) (e.g. Decision Tree Regressor, Gradient-Boosted Trees Regressor, Random Forest Regressor).  See an example [here](#9.B).  Tune their hyperparameters.  Try different feature selections.

12.  This notebook was built using Spark 1.6.  Try running the notebook on Spark 2.0 ([download here](http://spark.apache.org/downloads.html)).  What happens?  What changes are needed?  See [here](https://spark.apache.org/docs/latest/ml-guide.html) for the Spark documentation on the differences in Version 2.0.  One noteworthy difference is that we can now get a [summary of a linear regression model's statistics](https://spark.apache.org/docs/2.0.0/api/python/pyspark.ml.html?highlight=crossvalidator#pyspark.ml.regression.LinearRegressionSummary), such as the p-values, standard errors, and t-statistics of the coefficients and intercept.

## 8.  Resources <a name="8"></a>
[Back to Table of Contents](#TOC)

- CSV Data Source [(link to Databricks' csv file upload module)](https://github.com/databricks/spark-csv)
- Data Dictionary [(link to Kaggle competition)](https://www.heritagehealthprize.com/c/hhp/data)
- DataFrame Operations [(link to Spark documentation)](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
- DataFrames [(link to Spark documentation)](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
- EDA Techniques [(link to website)](http://itl.nist.gov/div898/handbook/eda/eda.htm)
- Feature Extraction [(link to Spark documentation)](http://spark.apache.org/docs/latest/ml-features.html)
- Feature Selection [(link to Spark documentation)](http://spark.apache.org/docs/latest/mllib-feature-extraction.html), [(link to more Spark documenation)](http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html)
- Heritage Health Prize Summary [(link to Kaggle competition)](https://www.heritagehealthprize.com/c/hhp)
- Hyperparameter Tuning: Cross Validation [(link to Spark documentation)](http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-cross-validation)
- Hyperparameter Tuning: Train-Validation Split [(link to Spark documentation)](http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-train-validation-split)
- Loss Function [(link to Kaggle competition)](https://www.heritagehealthprize.com/c/hhp/details/evaluation)
- One Hot Encoding [(link to Quora)](https://www.quora.com/What-is-one-hot-encoding-and-when-is-it-used-in-data-science), [(link to Wikipedia)](https://en.wikipedia.org/wiki/Dummy_variable_(statistics)
- Pipelines [(link to Spark documentation)](https://spark.apache.org/docs/latest/ml-guide.html)
- Regularization for Linear Regression [(link to Spark documentation)](http://spark.apache.org/docs/latest/mllib-linear-methods.html#regularizers)
- Spark SQL [(link to Spark documentation)](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- SQL Statements Blog Source [(link to blog)](http://anotherdataminingblog.blogspot.com/2011/10/code-for-respectable-hhp-model.html)
- Tuning Models in Spark [(link to Spark documentation)](http://spark.apache.org/docs/latest/ml-tuning.html#ml-tuning-model-selection-and-hyperparameter-tuning)
- Titanic Kaggle Competition [(link to notebook)](http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/kmbgrkhh73931lo/Titanic-EDA-LogisticRegression.ipynb), [(link to Kaggle competition)](https://www.kaggle.com/c/titanic)
- User-Defined Functions in Spark Pipelines [(link to blog post)](http://blog.insightdatalabs.com/spark-pipelines-elegant-yet-powerful/)
- Winners of the HHP Challenge [(link to milestone papers)](http://www.heritagehealthprize.com/c/hhp/details/milestone-winners)

## 9.  Appendix <a name="8"></a>
[Back to Table of Contents](#TOC)

### 9.A  Runtimes <a name="9.A"></a>
[Back to Table of Contents](#TOC)

We had a single Spark server on a 64GB RAM and 8 core Softlayer Centos VM. We originally set up the ipython environment to be able to run multiple jobs at the same time by splitting RAM and cores using the command below while launching the ipython notebook server:    

**IPYTHON_OPTS="notebook --ip=myIP --port=80" $SPARK_HOME/bin/pyspark --master spark://myIP:7077 --executor-memory 30G --conf spark.cores.max=4**  
(this would allow running two similtaneous jobs, since total RAM is 64GB and total cores are eight) 

However, in the end we configured the server to run only one job at a time since some of the code would not have run otherwise. That said, we know that most of the code should run fine on your local setup (we used a Mac with 16GB RAM and 4 cores) except:

1. Feature Selection Using Principal Component Analysis (PCA)
2. Hyperparameter Tuning

One possible strategy is to run most of the code on your local set-up, save all of the output, then switch to the cloud when the code starts to fail (e.g. getting outOfMemory errors or code running forever) to save on cloud costs.

Given this set-up, how long did it take to prepare the data, train models, and tune them?  We ran this entire notebook from beginning to end, and it took **17 minutes 20 seconds** to run in clock time.  It took **1 minute 32 seconds** of CPU time:

<table align="left">
<caption>Runtimes</caption>
<tr><td>Start time:</td><td>2016-08-03 03:01:07</td></tr>
<tr><td>End time:</td><td>2016-08-03 03:18:27</td></tr>
<tr><td>Elapsed time:</td><td>17.32 minutes</td></tr>
<tr><td>Elapsed CPU time:</td><td>1.53 minutes</td></tr>
</table>   

In [46]:
# # Move this cell to the top of the notebook to measure runtimes and choose 'Run All' in the Cell menu
# import time
# from datetime import datetime

# start_time = time.time()
# cpu_start_time = time.clock()
# print 'Start time: %s' %(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

In [47]:
# # Move this cell to the bottom of the notebook to measure runtimes and choose 'Run All' in the Cell menu
# print 'End time: %s' %(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# print 'Elapsed Time: %.2f minutes' %((time.time() - start_time)/60)
# print 'Elapsed CPU Time: %.2f minutes' %((time.clock() - cpu_start_time)/60)

### 9.B  Example of Training a Non-Linear Model Using Spark Pipelines <a name="9.B"></a>
[Back to Table of Contents](#TOC)

See [Section 7: Next Steps](#7)

In [48]:
# from pyspark.ml.regression import GBTRegressor

# feature_assembler = VectorAssembler(inputCols=get_column_names(df_train), outputCol="features")

# gbt = GBTRegressor(featuresCol="features")

# pipeline_baseline_gbt = Pipeline(stages=[feature_assembler, gbt])
# model_baseline_gbt = pipeline_baseline_gbt.fit(df_train)
# output_baseline_gbt = model_baseline_gbt.transform(df_val).select("features", "label", "prediction")
# predictions_baseline_gbt = output_baseline_gbt.select("label", "prediction")

# logloss_baseline_gbt =  math.sqrt(predictions_baseline_gbt.map(calc_squared_diff) \
#                                         .reduce(lambda x,y: x+y)/float(predictions_baseline_gbt.count()))
# print "The log loss for the baseline model using a gradient-boosted tree regressor is %.6f" %(logloss_baseline_gbt)

### 9.C  User-Defined Functions in Spark Pipelines <a name="9.C"></a>
[Back to Table of Contents](#TOC)

Spark.ml has many built-in functions for feature engineering that can be put into a Pipeline.  See [here](http://spark.apache.org/docs/latest/ml-features.html).  But users can also define their own functions.  See below for an example of how to implement your own function ([source](http://blog.insightdatalabs.com/spark-pipelines-elegant-yet-powerful/)).

In [49]:
# Here's an example of how to create a user-defined function to put into a spark.ml Pipeline.

# from pyspark.ml.util import keyword_only  
# from pyspark.ml.pipeline import Transformer  
# from pyspark.ml.param.shared import HasInputCol, HasOutputCol

# # Create a custom word count transformer class
# class MyWordCounter(Transformer, HasInputCol, HasOutputCol):  
#     @keyword_only
#     def __init__(self, inputCol=None, outputCol=None):
#         super(WordCounter, self).__init__()
#         kwargs = self.__init__._input_kwargs
#         self.setParams(**kwargs)

#     @keyword_only
#     def setParams(self, inputCol=None, outputCol=None):
#         kwargs = self.setParams._input_kwargs
#         return self._set(**kwargs)

#     def _transform(self, dataset):
#         out_col = self.getOutputCol()
#         in_col = dataset[self.getInputCol()]

#         # Define transformer logic
#         def f(s):
#             return len(s.split(' '))
#         t = LongType()

#         return dataset.withColumn(out_col, udf(f, t)(in_col))

# # Instantiate the new word count transformer
# wc = MyWordCounter(inputCol="review", outputCol="wc")  
# example_pipeline = Pipeline(stages=[wc])
# example_pipeline_model = example_pipeline.fit(example_data)
# example_df = example_pipeline_model.transform(example_data)