# Homework 1
Amin Venjara

I found IPythonNotebook a much easier way to engage with Spark and Hive. It took some setup to get this working, especially to be able to control an AWS instance,  but I found it much easier than the command prompt. I followed the instructions here:  http://blog.impiyush.me/2015/02/running-ipython-notebook-server-on-aws.html. 

All elements of my homework are included in here, except for the initial data loading. The construction of tables, loading of data, ERD diagram, transformation, analysis, results queries and explanation are all included in this notebook to enable easier reading and navigation. I have also created a hive_base_ddl.sql file based on some initial work and to practice with DDL statements, but chose to work with the tables created here as the basis for my analysis and queries.

In [1]:
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

#note this is code was built on UCB-complete AMI using Spark version 1.5.0 
#and Python version 2.7.10

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/

Using Python version 2.7.10 (default, May 28 2015 17:02:03)
SparkContext available as sc, HiveContext available as sqlContext.


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
import numpy as np
import pandas as pd
from scipy import stats

#sc.addPyFile('/home/w205/pyspark_csv/pyspark_csv.py')
#import pyspark_csv as pycsv
sqlContext = SQLContext(sc)
hContext = HiveContext(sc)

#Optional Config
#ProgramName="Myp"
#master="local" # use local[3] to specify the number of cores that your driver program has
#config = SparkConf().setAppName(ProgramName).setMaster(master)

Previous scripts should have loaded the files we need into hdfs under the folder /user/w205/hospital_compare/. Let's check to make sure they are there

In [3]:
!hdfs dfs -ls /user/w205/hospital_compare/

Found 13 items
-rw-r--r--   1 w205 supergroup        670 2015-10-18 22:20 /user/w205/hospital_compare/derby.log
-rw-r--r--   1 w205 supergroup   63280769 2015-10-18 22:20 /user/w205/hospital_compare/effective_care.csv
-rw-r--r--   1 w205 supergroup   63280973 2015-10-18 22:20 /user/w205/hospital_compare/effective_care_raw.csv
-rw-r--r--   1 w205 supergroup   81973764 2015-10-18 22:20 /user/w205/hospital_compare/hcahps_data.csv
-rw-r--r--   1 w205 supergroup   81974230 2015-10-22 01:41 /user/w205/hospital_compare/hcahps_survey.csv
-rw-r--r--   1 w205 supergroup     826758 2015-10-18 22:20 /user/w205/hospital_compare/hospitals.csv
-rw-r--r--   1 w205 supergroup     826912 2015-10-18 22:20 /user/w205/hospital_compare/hospitals_raw.csv
-rw-r--r--   1 w205 supergroup      13146 2015-10-18 22:20 /user/w205/hospital_compare/measures.csv
-rw-r--r--   1 w205 supergroup      13261 2015-10-18 22:20 /user/w205/hospital_compare/measures_raw.csv
-rw-r--r--   1 w205 supergroup   19936145 20

We read in the text files. We keep in the header row with the column titles on at this point because they will help us build the schema we need

In [4]:
proceduresFile = sc.textFile('/user/w205/hospital_compare/effective_care_raw.csv')
hospitalsFile = sc.textFile('/user/w205/hospital_compare/hospitals_raw.csv')
measuresFile = sc.textFile('/user/w205/hospital_compare/measures_raw.csv')
surveysFile = sc.textFile('/user/w205/hospital_compare/hcahps_survey.csv')

In [5]:
print "Lines in hospitalsFile = ", hospitalsFile.count()
print "Lines in proceduresFile = ", proceduresFile.count()
print "Lines in measuresFile = ", measuresFile.count()
print "Lines in surveyFile = ", surveysFile.count()

Lines in hospitalsFile =  4825
Lines in proceduresFile =  217822
Lines in measuresFile =  101
Lines in surveyFile =  204865


We want to perform two prep functions across our files. One, build the initial list of fields that are found in the data files as a foundation for our table schema. Two, remove header rows from files

In [5]:
def build_schema_list (csv):
    header = csv.first()
    schemaString = header.replace('"','').replace(' ','')
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
    return fields

def remove_header(itr_index, itr): 
    return iter(list(itr)[1:]) if itr_index == 0 else itr

We apply the functions across our data files and test the number of lines to see that the header has, in fact, been removed

In [6]:
#Hospitals Data
hospitalsFields = build_schema_list(hospitalsFile)
hospitalsNoHeader = hospitalsFile.mapPartitionsWithIndex(remove_header)

#Procedures Data
proceduresFields = build_schema_list(proceduresFile)
proceduresNoHeader = proceduresFile.mapPartitionsWithIndex(remove_header)

#Measures Data
measuresFields = build_schema_list(measuresFile)
measuresNoHeader = measuresFile.mapPartitionsWithIndex(remove_header)

#Survey Data
surveysFields = build_schema_list(surveysFile)
surveysNoHeader = surveysFile.mapPartitionsWithIndex(remove_header)

print "Lines in hospitalsNoHeader = ", hospitalsNoHeader.count()
print "Lines in proceduresNoHeader = ", proceduresNoHeader.count()
print "Lines in measuresNoHeader = ", measuresNoHeader.count()
print "Lines in surveysNoHeader = ", surveysNoHeader.count()

Lines in hospitalsNoHeader =  4824
Lines in proceduresNoHeader =  217821
Lines in measuresNoHeader =  100
Lines in surveysNoHeader =  204864


# Part 2: Transformations
In this section, we want to build modify the tables to fit our ER diagram and set them up for our analyses and queries in part 3

To set the appropriate context for our analysis, we should outline our general approach. While there are a number of procedures to choose from to evaluate hospitals, including all of them does not necessarily lead to the best evaluation. As the rankings at Healthinsight (https://healthinsight.org/rankings/hospitals), which uses the HospitalCompare data as well, states: "Hospitals' performance on the quality measures has improved dramatically over time, with the result that for a number of measures the great majority of hospitals are achieving perfect or near perfect performance. Including these 'topped out' measures in the set of measures used to rank hospitals has the effect of obscuring the real performance differences between hospitals, and results in a situation where very small differences in overall performance on the quality measures produce large differences in hospitals' ranks."

As a result, they work to select a set of metrics that are not "topped out" and use those as the basis of the analysis. We follow a similar approach also adding a criterion that the procedures should cover a good base of the hospital population. 

Based on this, we will aim to first identify the procedures that will be included as part of the evaluation of hopsitals. Then, we will calculate and store an Aggregate, Average, and Variance score for each hospital. 

For the sake of time, we have a few simplifying assumptions:
1. All procedures will be weighted evenly in evaluation of hospitals. With further analysis this could be adjusted for example to reflect usage of services or perhaps patient ranking of the services that matter most to them. 
2. We will restrict our analysis to procedures where the metric is percentage based and larger means better. In future development, normalization of variables can be completed to include a broader reach. 


In order to organize our data, we propose the following ER diagram

<img src="ERD.png">

Let's start with the hospitals data. It comes loaded with demograhic information about the hosptial, which we'll keep

In [7]:
hospitalsFields

[StructField(ProviderID,StringType,true),
 StructField(HospitalName,StringType,true),
 StructField(Address,StringType,true),
 StructField(City,StringType,true),
 StructField(State,StringType,true),
 StructField(ZIPCode,StringType,true),
 StructField(CountyName,StringType,true),
 StructField(PhoneNumber,StringType,true),
 StructField(HospitalType,StringType,true),
 StructField(HospitalOwnership,StringType,true),
 StructField(EmergencyServices,StringType,true)]

We also want to add in fields to capture the  6 Condition scores we identified and 1 overall score. 

In [109]:
hospitalsFields = build_schema_list(hospitalsFile)

In [110]:
hospitalsFields.extend([StructField("AggregateScore", FloatType(), True), 
                       StructField("AverageScore", FloatType(), True), 
                       StructField("VarianceScore", FloatType(), True)]) 
hospitalsFields

[StructField(ProviderID,StringType,true),
 StructField(HospitalName,StringType,true),
 StructField(Address,StringType,true),
 StructField(City,StringType,true),
 StructField(State,StringType,true),
 StructField(ZIPCode,StringType,true),
 StructField(CountyName,StringType,true),
 StructField(PhoneNumber,StringType,true),
 StructField(HospitalType,StringType,true),
 StructField(HospitalOwnership,StringType,true),
 StructField(EmergencyServices,StringType,true),
 StructField(AggregateScore,FloatType,true),
 StructField(AverageScore,FloatType,true),
 StructField(VarianceScore,FloatType,true)]

Now, we map the file data to the hospital fields

In [111]:
hospitalsParts = hospitalsNoHeader.map(lambda l: l.split('","'))
hospitalsDataFields = hospitalsParts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10], -1.0, -1.0, -1.0))

Now, we create the dataframe and register the table

In [112]:
hospitalsSchema = StructType(hospitalsFields)
hospitalsDF = sqlContext.createDataFrame(hospitalsDataFields, hospitalsSchema)
sqlContext.registerDataFrameAsTable(hospitalsDF, "hospitals")
hContext.registerDataFrameAsTable(hospitalsDF, "hospitals")

Now, we'll repeat the same process for the other tables. Let's continue with Procedures

In [11]:
proceduresFields

[StructField(ProviderID,StringType,true),
 StructField(HospitalName,StringType,true),
 StructField(Address,StringType,true),
 StructField(City,StringType,true),
 StructField(State,StringType,true),
 StructField(ZIPCode,StringType,true),
 StructField(CountyName,StringType,true),
 StructField(PhoneNumber,StringType,true),
 StructField(Condition,StringType,true),
 StructField(MeasureID,StringType,true),
 StructField(MeasureName,StringType,true),
 StructField(Score,StringType,true),
 StructField(Sample,StringType,true),
 StructField(Footnote,StringType,true),
 StructField(MeasureStartDate,StringType,true),
 StructField(MeasureEndDate,StringType,true)]

In [13]:
proceduresFields = build_schema_list(proceduresFile)

Let's remove the fields that we don't need

In [12]:
del proceduresFields[14:16]
del proceduresFields[2:8]
proceduresLength = len(proceduresFields)
proceduresLength

8

In [13]:
proceduresFields.extend([StructField("SelectedMetric", BooleanType(), True)]) 
proceduresFields

[StructField(ProviderID,StringType,true),
 StructField(HospitalName,StringType,true),
 StructField(Condition,StringType,true),
 StructField(MeasureID,StringType,true),
 StructField(MeasureName,StringType,true),
 StructField(Score,StringType,true),
 StructField(Sample,StringType,true),
 StructField(Footnote,StringType,true),
 StructField(SelectedMetric,BooleanType,true)]

In [18]:
proceduresParts = proceduresNoHeader.map(lambda l: l.split('","'))
proceduresDataFields = proceduresParts.map(lambda p: (p[0], p[1], p[8], p[9], p[10], p[11], p[12], p[13], False))

In [19]:
proceduresSchema = StructType(proceduresFields)
proceduresDF = sqlContext.createDataFrame(proceduresDataFields, proceduresSchema)
sqlContext.registerDataFrameAsTable(proceduresDF, "procedures")
hContext.registerDataFrameAsTable(proceduresDF, "procedures")

Now we continue with the measures data

In [18]:
measuresFields = build_schema_list(measuresFile)

In [20]:
measuresFields

[StructField(MeasureName,StringType,true),
 StructField(MeasureID,StringType,true),
 StructField(MeasureStartQuarter,StringType,true),
 StructField(MeasureStartDate,StringType,true),
 StructField(MeasureEndQuarter,StringType,true),
 StructField(MeasureEndDate,StringType,true)]

In [23]:
measuresParts = measuresNoHeader.map(lambda l: l.split('","'))
measuresDataFields = measuresParts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5]))

In [24]:
measuresSchema = StructType(measuresFields)
measuresDF = sqlContext.createDataFrame(measuresDataFields, measuresSchema)
sqlContext.registerDataFrameAsTable(measuresDF, "measures")
hContext.registerDataFrameAsTable(measuresDF, "measures")

In [None]:
Finally, we transform the surveys tables

In [25]:
surveysFields

[StructField(ProviderID,StringType,true),
 StructField(HospitalName,StringType,true),
 StructField(Address,StringType,true),
 StructField(City,StringType,true),
 StructField(State,StringType,true),
 StructField(ZIPCode,StringType,true),
 StructField(CountyName,StringType,true),
 StructField(PhoneNumber,StringType,true),
 StructField(HCAHPSMeasureID,StringType,true),
 StructField(HCAHPSQuestion,StringType,true),
 StructField(HCAHPSAnswerDescription,StringType,true),
 StructField(PatientSurveyStarRating,StringType,true),
 StructField(PatientSurveyStarRatingFootnote,StringType,true),
 StructField(HCAHPSAnswerPercent,StringType,true),
 StructField(HCAHPSAnswerPercentFootnote,StringType,true),
 StructField(NumberofCompletedSurveys,StringType,true),
 StructField(NumberofCompletedSurveysFootnote,StringType,true),
 StructField(SurveyResponseRatePercent,StringType,true),
 StructField(SurveyResponseRatePercentFootnote,StringType,true),
 StructField(MeasureStartDate,StringType,true),
 StructField

In [26]:
del surveysFields[2:8]

In [27]:
surveysFields

[StructField(ProviderID,StringType,true),
 StructField(HospitalName,StringType,true),
 StructField(HCAHPSMeasureID,StringType,true),
 StructField(HCAHPSQuestion,StringType,true),
 StructField(HCAHPSAnswerDescription,StringType,true),
 StructField(PatientSurveyStarRating,StringType,true),
 StructField(PatientSurveyStarRatingFootnote,StringType,true),
 StructField(HCAHPSAnswerPercent,StringType,true),
 StructField(HCAHPSAnswerPercentFootnote,StringType,true),
 StructField(NumberofCompletedSurveys,StringType,true),
 StructField(NumberofCompletedSurveysFootnote,StringType,true),
 StructField(SurveyResponseRatePercent,StringType,true),
 StructField(SurveyResponseRatePercentFootnote,StringType,true),
 StructField(MeasureStartDate,StringType,true),
 StructField(MeasureEndDate,StringType,true)]

In [28]:
surveysParts = surveysNoHeader.map(lambda l: l.split('","'))
surveysDataFields = surveysParts.map(lambda p: (p[0], p[1], p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15], p[16], p[17], p[18], p[19], p[20]))

In [29]:
surveysSchema = StructType(surveysFields)
surveysDF = sqlContext.createDataFrame(surveysDataFields, surveysSchema)
sqlContext.registerDataFrameAsTable(surveysDF, "surveys")
hContext.registerDataFrameAsTable(surveysDF, "surveys")

We want to determine which set of procedures we should include in the evaluation of hopsitals. We could of course include all of them, but doing so might skew the data set in two ways. First, including procedures without data across a broad range of hospitals will skew overall metrics across hospitals. Second, including procedures where the scoring is disproportionately high (e.g., very near a 100%) will inflate the importance of small differences among hospitals. 

To address these two potential skews, we will use two criteria for procedure selection:
1. The % of hospitals for which data on the procedure is recorded >= 60%
2. The % of hospitals having a score of 97% or above <= 60%

Furthermore, for simplicity's sake, we will only focus on measures that occur in % format to ensure a consistency of metrics. Metrics that are based on time format (e.g, median time to pain med) can be converted to percentile format to enable a uniform comparison. However, this can be completed in as a further development of the model. 

In [30]:
ProcedureAssessment = hContext.sql('Select MeasureID, MeasureName, COUNT(Score) as Count, AVG(Score) as AvgScore, STDDEV(Score) as SD FROM procedures WHERE Score <> "Not Available" GROUP BY MeasureID, MeasureName ORDER BY Count DESC LIMIT 100')

# determine the number of hospitals in the procedures dataset
hospitalcount = hContext.sql('Select Count(DISTINCT ProviderID) as HospitalCount From Procedures').collect()
numOfHospitals = hospitalcount[0].HospitalCount

# convert the query to Pandas for easier processing
df = ProcedureAssessment.toPandas()

# Compute percent of hopsitals that have data on procedure
df['PercentofHospitals'] = df['Count'] / numOfHospitals * 100

#MeasureID IN ("AMI_7a", "AMI_8a", "HF_1", "HF_3", "SCIP_INF_1", "OP_7", "STK_4", "STK_8", "VTE_1", "VTE_5")

Determine the % of hospitals falling into "top tier" (i.e., 97% or above) for each procedure

In [31]:
topTier = hContext.sql('Select MeasureID, COUNT(Score) as TopTierCount FROM procedures WHERE Score >= 97 GROUP BY MeasureID, MeasureName ORDER BY TopTierCount DESC LIMIT 100')
# convert to Pandas dataframe to make it easier to manipulate data
topTierDF = topTier.toPandas()

Merge the two datasets and apply the selection criteria outlined above to find the initial set of "selected procedures"

In [32]:
merged = pd.merge(df, topTierDF, on='MeasureID', how='left')
merged['TopTierPercent'] = merged['TopTierCount']/merged['Count'] * 100
mergedSelection = merged[(merged.PercentofHospitals >= 60) & (merged.TopTierPercent <= 60)] 
mergedSelection

Unnamed: 0,MeasureID,MeasureName,Count,AvgScore,SD,PercentofHospitals,TopTierCount,TopTierPercent
0,PN_6,Initial antibiotic selection for CAP in immuno...,3973,94.219985,9.275894,82.633111,2223,55.952681
2,IMM_2,Immunization for influenza,3739,91.746456,11.944886,77.766223,1695,45.332977
3,IMM_3_FAC_ADHPCT,Healthcare workers given influenza vaccination,3657,80.680613,16.348048,76.060732,589,16.106098
4,VTE_1,Venous thromboembolism prophylaxis,3534,89.114318,15.257401,73.502496,1230,34.804754
7,ED_2b,ED2,3496,98.518593,62.166651,72.712146,1395,39.902746
11,OP_20,Door to diagnostic eval,3354,28.081992,16.882053,69.758735,20,0.596303
17,OP_21,Median time to pain med,3172,55.356873,17.722358,65.973378,79,2.490542
19,VTE_2,ICU venous thromboembolism prophylaxis,2955,94.810491,7.478245,61.460067,1679,56.818951
20,HF_1,Discharge instructions,2934,93.610089,11.827981,61.023295,1703,58.043626


We will drop anything that does not have a percentage-based metric to avoid complications around variable normalization at this stage. This focuses us on 6 key procedures to use to evaluate hospitals

In [33]:
mergedSelection['PercentageMetric'] = True 
mergedSelection.loc[7,'PercentageMetric'] = False
mergedSelection.loc[11,'PercentageMetric'] = False
mergedSelection.loc[17,'PercentageMetric'] = False
mergedSelection1 = mergedSelection[mergedSelection.PercentageMetric == True]
mergedSelection1 = mergedSelection1.drop('PercentageMetric', 1)
mergedSelection = mergedSelection1
mergedSelection

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  if __name__ == '__main__':
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self.obj[item] = s


Unnamed: 0,MeasureID,MeasureName,Count,AvgScore,SD,PercentofHospitals,TopTierCount,TopTierPercent
0,PN_6,Initial antibiotic selection for CAP in immuno...,3973,94.219985,9.275894,82.633111,2223,55.952681
2,IMM_2,Immunization for influenza,3739,91.746456,11.944886,77.766223,1695,45.332977
3,IMM_3_FAC_ADHPCT,Healthcare workers given influenza vaccination,3657,80.680613,16.348048,76.060732,589,16.106098
4,VTE_1,Venous thromboembolism prophylaxis,3534,89.114318,15.257401,73.502496,1230,34.804754
19,VTE_2,ICU venous thromboembolism prophylaxis,2955,94.810491,7.478245,61.460067,1679,56.818951
20,HF_1,Discharge instructions,2934,93.610089,11.827981,61.023295,1703,58.043626


Now that we have identified the procedures we will use, we want to compute the Average, Aggregate and Variance scores for each hospital across procedures. We shift the datasets to Pandas to accomplish this

In [113]:
hospitalsPD = hospitalsDF.toPandas()
proceduresPD = proceduresDF.toPandas()

We flag the SelectedMetrics in the procedures table and bring it back into SparkContext

In [203]:
proceduresPD['SelectedMetric'] = proceduresPD['MeasureID'].isin(mergedSelection['MeasureID'])
procedures = sqlContext.createDataFrame(proceduresPD)
sqlContext.registerDataFrameAsTable(procedures1, "procedures")
hContext.registerDataFrameAsTable(procedures1, "procedures")
#proceduresPD[proceduresPD.SelectedMetric == True]

Next we move to compute the scores for each hospital across the selected metrics. We have to update the data types to be enable us to compute aggregate values. We then compute the metrics and combine the dataframes

In [115]:
#proceduresPD.dtypes
proceduresPD = proceduresPD.convert_objects(convert_numeric=True)
proceduresPD.dtypes

ProviderID         object
HospitalName       object
Condition          object
MeasureID          object
MeasureName        object
Score             float64
Sample            float64
Footnote           object
SelectedMetric       bool
dtype: object

In [117]:
selectedProcedures = proceduresPD[proceduresPD.SelectedMetric == True]
x = selectedProcedures.groupby(by=['ProviderID'], as_index=False)['Score'].sum()
y = selectedProcedures.groupby(by=['ProviderID'], as_index=False)['Score'].mean()
z = selectedProcedures.groupby(by=['ProviderID'], as_index=False)['Score'].var()
a = pd.merge(hospitalsPD, x, on = 'ProviderID')
a = pd.merge(a, y, on = 'ProviderID')
a = pd.merge(a, z, on = 'ProviderID')
a['AggregateScore'] = a['Score_x']
a['AverageScore'] = a['Score_y']
a['VarianceScore'] = a['Score']
a = a.drop('Score', 1)
a = a.drop('Score_x', 1)
a = a.drop('Score_y', 1)
a

Unnamed: 0,ProviderID,HospitalName,Address,City,State,ZIPCode,CountyName,PhoneNumber,HospitalType,HospitalOwnership,EmergencyServices,AggregateScore,AverageScore,VarianceScore
0,"""020024",CENTRAL PENINSULA GENERAL HOSPITAL,250 HOSPITAL PLACE,SOLDOTNA,AK,99669,,9072624404,Acute Care Hospitals,Voluntary non-profit - Other,"Yes""",532,88.666667,176.266667
1,"""021301",PROVIDENCE VALDEZ MEDICAL CENTER,PO BOX 550,VALDEZ,AK,99686,,9078352249,Critical Access Hospitals,Government - Local,"Yes""",,,
2,"""021302",PROVIDENCE SEWARD HOSPITAL,"417 FIRST AVENUE, PO BOX 365",SEWARD,AK,99664,,9072245205,Critical Access Hospitals,Government - Local,"Yes""",,,
3,"""021304",PETERSBURG MEDICAL CENTER,PO BOX 589,PETERSBURG,AK,99833,,9077724291,Critical Access Hospitals,Government - Local,"Yes""",,,
4,"""021305",WRANGELL MEDICAL CENTER,310 BENNETT STREET PO BOX 1081,WRANGELL,AK,99929,,9078747000,Critical Access Hospitals,Government - Local,"Yes""",,,
5,"""021307",CORDOVA COMMUNITY MEDICAL CENTER,PO BOX 160 - 602 CHASE AVENUE,CORDOVA,AK,99574,,9074248000,Critical Access Hospitals,Government - Local,"Yes""",,,
6,"""021312",SAMUEL SIMMONDS MEMORIAL HOSPITAL,7000 UULA ST,BARROW,AK,99723,,9078524611,Critical Access Hospitals,Government - Federal,"Yes""",,,
7,"""021313",SOUTH PENINSULA HOSPITAL,4300 BARTLETT ST,HOMER,AK,99603,,9072358101,Critical Access Hospitals,Voluntary non-profit - Private,"Yes""",37,37.000000,
8,"""480001","ROY LESTER SCHNEIDER HOSPITAL,THE",9048 SUGAR ESTATE,ST THOMAS,VI,00801,,8097768311,Acute Care Hospitals,Government - Local,"Yes""",427,85.400000,122.800000
9,"""480002",GOV JUAN F LUIS HOSPITAL & MEDICAL CTR,"#4007 EST DIAMOND RUBY, CHRISTIANSTED",ST CROIX,VI,00820,,3407786311,Acute Care Hospitals,Government - State,"Yes""",434,86.800000,93.200000


Another item that can skew some of the results is having vast differences in teh number of procedures each hospital is actually scored on due to data availabilit. We want to look out for null values in the score column for our selected Procedures to where there are gaps. To avoid apples to orange comparison, we only include hospitals that have data available for all of the selected procedures  

In [176]:
# find the procedures that have at least one Null score
t = selectedProcedures[selectedProcedures['Score'].isnull()]

#set up an empty Pandas DF to store the results
columns = ['ProviderID', 'IncludeProviderID']
index = np.arange(len(t['ProviderID'].values))
m = pd.DataFrame(columns = columns, index = index)
m['ProviderID'] = t['ProviderID'].values
m['IncludeProviderID'] = False
m.dtypes

#get the unique values of ProviderId from this analysis
m.drop_duplicates(take_last = True, inplace = True)

6396


Unnamed: 0,ProviderID,IncludeProviderID
1,"""010008",False
4,"""010018",False
6,"""010022",False
7,"""010032",False
9,"""010044",False
10,"""010045",False
12,"""010047",False
13,"""010049",False
14,"""010051",False
15,"""010052",False


In [190]:
#pull the investigation of nulls together with the hospitals data to know which hospitals have a full data set that can be used for evaluation 

r = pd.merge(a, m, on = 'ProviderID', how = 'left')
print "all rows in hospital data", len(r)
print "rows to exclude because missing data on selected metrics", len(r[r.IncludeProviderID == False])
a = r[r.IncludeProviderID != False]

all rows 4786
rows to exclude because missing data 2189


2597

Bring the Pandas DFs back to SparkSQL so we can run queries against them

In [191]:
hospitals = sqlContext.createDataFrame(a)
sqlContext.registerDataFrameAsTable(hospitals, "hospitals")
hContext.registerDataFrameAsTable(hospitals, "hospitals")

# PART 3: Queries

The hospitals with the highest quality of care have high levels of performance in 6 key procedures:  
 - PN_6	Initial antibiotic selection for CAP in immuno...	
 - IMM_2  Immunization for influenza	
 - IMM_3  FAC_ADHPCT	Healthcare workers given influenza vaccination	
 - VTE_1  Venous thromboembolism prophylaxis	 
 - VTE_2  ICU venous thromboembolism prophylaxis	
 - HF_1	  Discharge instructions
 
The logic for the selection of these 6 are (1) the breadth of hospitals it each touches (i.e., >60% of sample) and (2) the variablity in performance. We see that the hospitals listed here have very high average scores (just under 100, high aggregate scores, and very low variance). 

In [192]:
bestHospitals = hContext.sql('Select ProviderID, HospitalName, AverageScore, AggregateScore, VarianceScore FROM hospitals WHERE AverageScore != "NaN" ORDER BY AverageScore DESC LIMIT 10')
bestHospitals.show()

+----------+--------------------+-----------------+--------------+-------------------+
|ProviderID|        HospitalName|     AverageScore|AggregateScore|      VarianceScore|
+----------+--------------------+-----------------+--------------+-------------------+
|   "140275|GENESIS HEALTH SY...|99.66666666666667|         598.0|0.26666666666666666|
|   "460003|SALT LAKE REGIONA...|99.66666666666667|         598.0| 0.6666666666666666|
|   "420065|BON SECOURS-ST FR...|             99.5|         597.0|                0.3|
|   "33015F|VA HUDSON VALLEY ...|             99.5|         199.0|                0.5|
|   "230072|HOLLAND COMMUNITY...|             99.5|         597.0|                0.7|
|   "420087|      ROPER HOSPITAL|             99.5|         597.0|                0.3|
|   "450419|TEXAS HEALTH HARR...|             99.5|         597.0|                0.7|
|   "370149|ST ANTHONY SHAWNE...|             99.5|         597.0|                0.3|
|   "210051|DOCTORS'  COMMUNI...|          

The states that are models of high quality care is built off of the view of hospitals. The methodology used here finds the states with the highest average score across its hospitals in order to rank quality of care by state. The results here support this as the states with highest average score lead the pack. 

In [196]:
bestStates = hContext.sql('Select State, AVG(AverageScore) AS StateAvgScore, AVG(AggregateScore) As StateAggScore, AVG(VarianceScore) As StateVarScore FROM hospitals GROUP BY State ORDER BY StateAvgScore DESC LIMIT 10')
bestStates.show()

+-----+-----------------+-----------------+------------------+
|State|    StateAvgScore|    StateAggScore|     StateVarScore|
+-----+-----------------+-----------------+------------------+
|   MD|95.90000000000002|558.6829268292682|21.905691056910573|
|   NH|95.60256410256412|573.6153846153846|38.802564102564105|
|   CO|95.42222222222222|572.5333333333333|29.944444444444446|
|   UT|95.04166666666666|           570.25|25.441666666666674|
|   ME|94.88421052631577|542.8421052631579| 67.75438596491227|
|   VA|94.78030303030307|568.6818181818181| 50.63282828282828|
|   WI|94.76333333333334|           568.58| 36.31266666666667|
|   NE|94.65625000000001|         567.9375|19.947916666666664|
|   NC|94.60833333333335|          563.775|44.527499999999996|
|   IA|94.10119047619047|564.6071428571429| 30.99642857142857|
+-----+-----------------+-----------------+------------------+



Standard deviation offers a good measure of variabilty of procedures across hospitals. We can also look at the % of hospitals that are in the "top tier" (97%+) on their metric. Here we examine both and find a high degree of overlap in the metrics that show the greatest varation between hospitals. 

In [202]:
ProcedureVariability = hContext.sql('Select MeasureID, MeasureName, STDDEV(Score) as SDScore FROM procedures GROUP BY MeasureID, MeasureName ORDER BY SDScore DESC LIMIT 10')
ProcedureVariability.show()

+---------+--------------------+------------------+
|MeasureID|         MeasureName|           SDScore|
+---------+--------------------+------------------+
|    ED_1b|                 ED1| 92.83178787038688|
|    ED_2b|                 ED2|  62.1666505939829|
|   OP_18b|               OP 18| 40.70077322337158|
|    OP_3b|Median Time to Tr...|29.479326601050527|
|    STK_4|Thrombolytic Therapy|21.954194561164023|
|    OP_23|     Head CT results|21.844653491103774|
|   AMI_7a|Fibrinolytic Ther...|  18.7794213613377|
|     OP_2|Fibrinolytic Ther...|18.048443797995763|
|    OP_21|Median time to pa...|17.722358013930915|
|    OP_20|Door to diagnosti...|16.882052975576602|
+---------+--------------------+------------------+



In [207]:
merged.sort('TopTierPercent', ascending = 1)[0:10]

Unnamed: 0,MeasureID,MeasureName,Count,AvgScore,SD,PercentofHospitals,TopTierCount,TopTierPercent
35,OP_5,Median Time to ECG,2099,8.333016,6.109382,43.656406,1,0.047642
30,PC_01,Percent of newborns whose deliveries were sche...,2520,4.23254,6.958258,52.412646,2,0.079365
11,OP_20,Door to diagnostic eval,3354,28.081992,16.882053,69.758735,20,0.596303
17,OP_21,Median time to pain med,3172,55.356873,17.722358,65.973378,79,2.490542
41,OP_23,Head CT results,959,66.713243,21.844653,19.945923,44,4.588113
47,OP_2,Fibrinolytic Therapy Received Within 30 Minute...,68,70.25,18.048444,1.414309,4,5.882353
43,OP_3b,Median Time to Transfer to Another Facility fo...,409,60.745721,29.479327,8.506656,39,9.535452
3,IMM_3_FAC_ADHPCT,Healthcare workers given influenza vaccination,3657,80.680613,16.348048,76.060732,589,16.106098
42,STK_4,Thrombolytic Therapy,874,82.900458,21.954195,18.178037,234,26.773455
46,CAC_3,Home Management Plan of Care Document,96,89.375,12.719121,1.996672,26,27.083333


In this query, we look at % of respondents on the HCAHPS survey that gave their hospital a 9 or 10 rating ("PatientSurveyTopRatingPercent") and compare it with the ProcedureQualityScore of the hospital (from the hospitals table). We do see a moderate degree of correlation between the two scores. Those hospitals that received top ratings from patients also tended to have higher procedure quality scores. However, the procedure quality scores for these hospitals are rather close together, much more so than the patient survey ratings. 

In [212]:
SurveysResults = hContext.sql('Select Surveys.ProviderID, Surveys.HospitalName, Surveys.HCAHPSAnswerPercent AS PatientSurveyTopRatingPercent, Hospitals.AverageScore As ProcedureQualityScore FROM Surveys INNER JOIN Hospitals ON Surveys.ProviderID = Hospitals.ProviderID WHERE Surveys.HCAHPSMeasureID = "H_HSP_RATING_9_10" AND Surveys.HCAHPSAnswerPercent <> "Not Available" ORDER BY Surveys.HCAHPSAnswerPercent DESC LIMIT 10')
SurveysResults.show()

+----------+--------------------+-----------------------------+---------------------+
|ProviderID|        HospitalName|PatientSurveyTopRatingPercent|ProcedureQualityScore|
+----------+--------------------+-----------------------------+---------------------+
|   "370215|OKLAHOMA HEART HO...|                           95|    98.33333333333333|
|   "370234|OKLAHOMA HEART HO...|                           93|                 97.5|
|   "030103|MAYO CLINIC HOSPITAL|                           92|                 95.0|
|   "100151|         MAYO CLINIC|                           91|    88.16666666666667|
|   "450604|HILL COUNTRY MEMO...|                           89|    96.66666666666667|
|   "360348|DUBLIN METHODIST ...|                           88|                 93.0|
|   "100315|      VIERA HOSPITAL|                           88|    96.33333333333333|
|   "240010|MAYO CLINIC HOSPI...|                           87|    96.83333333333333|
|   "260179|   ST LUKES HOSPITAL|                     