##DATASCI W205 - Exercise 1 - Investigations

In [1]:
##Import pyspark.sql and hivecontext
from pyspark.sql import *
sqlContext = HiveContext(sc)


In [2]:
##Check out all tables available in warehouse
sqlContext.sql("show tables").take(100) 

[Row(tableName=u'effective_care', isTemporary=False),
 Row(tableName=u'hosp_patientexperience', isTemporary=False),
 Row(tableName=u'hosp_surveys', isTemporary=False),
 Row(tableName=u'hosp_totalperformance', isTemporary=False),
 Row(tableName=u'hospitals', isTemporary=False),
 Row(tableName=u'measuredates', isTemporary=False),
 Row(tableName=u'readmissionreduction', isTemporary=False),
 Row(tableName=u'readmissions', isTemporary=False),
 Row(tableName=u't_hospitals', isTemporary=False),
 Row(tableName=u't_hospitalseffectivecare', isTemporary=False),
 Row(tableName=u't_hospitalseffectivecareranges', isTemporary=False),
 Row(tableName=u't_hospitalspatientexperiencescaledscore', isTemporary=False),
 Row(tableName=u't_hospitalsreadmissionratios', isTemporary=False),
 Row(tableName=u't_hospitalsreadmissionsscaledscoresbycat', isTemporary=False),
 Row(tableName=u't_hospitalstypecountsgranular', isTemporary=False),
 Row(tableName=u't_measuredates', isTemporary=False)]

#### Question 1: What hospitals are models of high-quality care—that is, which hospitals have the most consistently high scores for a variety of procedures?

In [3]:
t_measuredates = sqlContext.sql("select * from t_measuredates")

In [13]:
new_df = t_measuredates.toPandas()

In [4]:
t_measuredates.show()

measurename          measureid     measurestartquarter measureendquarter
ACS Participation... ACS_REGISTRY  3Q2013              2Q2014           
Statin Prescribed... AMI_10        4Q2013              3Q2014           
Aspirin Prescribe... AMI_2         4Q2013              3Q2014           
Fibrinolytic Ther... AMI_7a        4Q2013              3Q2014           
Primary PCI Recei... AMI_8a        4Q2013              3Q2014           
Relievers for Inp... CAC_1         4Q2013              3Q2014           
Systemic Corticos... CAC_2         4Q2013              3Q2014           
Home Management P... CAC_3         4Q2013              3Q2014           
Complication Rate... COMP_HIP_KNEE 2Q2011              1Q2014           
Median Time from ... ED_1b         4Q2013              3Q2014           
Admit Decision Ti... ED_2b         4Q2013              3Q2014           
Emergency Departm... EDV           1Q2013              4Q2013           
Central Line Asso... HAI_1         4Q2013          

In [5]:
##Determine Most important Measure Counts
t_measuredates.groupBy("measurestartquarter","measureendquarter").count().show()

measurestartquarter measureendquarter count
3Q2012              2Q2014            6    
3Q2013              2Q2014            8    
1Q2013              4Q2013            11   
4Q2013              1Q2014            2    
2Q2011              1Q2014            1    
4Q2012              3Q2013            1    
1Q2014              3Q2014            1    
3Q2011              2Q2014            16   
4Q2013              3Q2014            54   


In [6]:
## Determine categories and procedures to score hospitals on
ecr = sqlContext.sql("select * from t_hospitalseffectivecareranges")
counts = ecr.groupBy('Condition').count()
counts.show()
counts.registerTempTable("counts")

Condition            count
Children's Asthma    3    
Pregnancy and Del... 1    
Emergency Department 7    
Surgical Care Imp... 9    
Blood Clot Preven... 6    
Pneumonia            1    
Heart Failure        3    
Stroke Care          8    
Heart Attack or C... 9    
Preventive Care      2    


In [4]:
sql = 'select * from ' \
      't_hospitalseffectivecare'
t_hospitalseffectivecare = sqlContext.sql(sql)

##define representative set of procedures by condition that will be looked at, picked 1 procedure for each condittion
measurenamelist = ['VTE_3', 'CAC_3', 'OP_23', 'OP_2', 'HF_3', 'IMM_2', 'STK_4', 'SCIP_INF_1']

## send to Pandas to easily compute
ecr = t_hospitalseffectivecare.toPandas()

#slice on procedures
sliced_ec = ecr[ecr.measureid.isin(measurenamelist)]

#create a Spark Dataframe
t_hospitalseffectivecare_filtered = sqlContext.createDataFrame(sliced_ec)

#Register as a table
t_hospitalseffectivecare_filtered.registerTempTable("t_hospitalseffectivecare_filtered")

In [8]:
sqlContext.sql("select * from t_hospitalseffectivecare_filtered").show()

providerid condition            measureid  measurename          score sample
010001     Heart Failure        HF_3       ACEI or ARB for LVSD 98    203   
010001     Preventive Care      IMM_2      Immunization for ... 94    534   
010001     Surgical Care Imp... SCIP_INF_1 Prophylactic anti... 99    338   
010001     Blood Clot Preven... VTE_3      Anticoagulation o... 83    126   
010005     Heart Failure        HF_3       ACEI or ARB for LVSD 100   32    
010005     Preventive Care      IMM_2      Immunization for ... 96    965   
010005     Emergency Department OP_23      Head CT results      85    13    
010005     Surgical Care Imp... SCIP_INF_1 Prophylactic anti... 100   383   
010005     Blood Clot Preven... VTE_3      Anticoagulation o... 100   53    
010006     Heart Failure        HF_3       ACEI or ARB for LVSD 100   72    
010006     Preventive Care      IMM_2      Immunization for ... 99    522   
010006     Surgical Care Imp... SCIP_INF_1 Prophylactic anti... 100   378   

In [5]:
##look at those procedures measured over a certain period
sql = """SELECT 
       ecf.providerid
      ,ecf.condition
      ,ecf.measureid
      ,ecf.measurename
      ,ecf.score
      ,ecf.sample
      ,m.measurestartquarter
      ,m.measureendquarter
FROM
  t_hospitalseffectivecare_filtered as ecf
  left join
  t_measuredates as m 
  on ecf.measureid = m.measureid  
  where m.measurestartquarter = '4Q2013' and m.measureendquarter = '3Q2014'
  """

t_hospitalseffectivecare_filtered_date = sqlContext.sql(sql)

#Register as a table
t_hospitalseffectivecare_filtered_date.registerTempTable("t_hospitalseffectivecare_filtered_date")


In [6]:
##aggregate provider procedure scores
sql = """SELECT 
       sum(score) AS score_agg,
       stddev_samp(score) as score_stddev,
       sum(max) AS max_agg,
       sum(score) / sum(max) as perc_score,
       count(measureid) as cnt,
       providerid
FROM
  (SELECT ecf.providerid as providerid,
          ecf.condition as condition,
          ecf.measureid as measureid,
          ecf.measurename as measurename,
          ecf.score as score,
          ecf.sample as sample,
          ecr.max as max
   FROM t_hospitalseffectivecare_filtered_date AS ecf
   LEFT JOIN t_hospitalseffectivecareranges AS ecr ON ecf.measureid = ecr.measureid
   WHERE cast(ecf.sample AS double) > 30) AS x
GROUP BY providerid
ORDER BY cnt desc, perc_score desc, providerid"""

t_hospitalseffectivecare_filtered_scored = sqlContext.sql(sql)
t_hospitalseffectivecare_filtered_scored.registerTempTable("t_hospitalseffectivecare_filtered_scored")

In [12]:
t_hospitalseffectivecare_filtered_scored.show()

score_agg score_stddev       max_agg perc_score         cnt providerid
290.0     43.54882317583335  500.0   0.58               5   050045    
396.0     0.816496580927726  400.0   0.99               4   340001    
391.0     2.6299556396765835 400.0   0.9775             4   220077    
387.0     2.9860788111948193 400.0   0.9675             4   330101    
384.0     3.915780041490244  400.0   0.96               4   050077    
383.0     2.7537852736430506 400.0   0.9575             4   440104    
376.0     9.345230512584124  400.0   0.94               4   450388    
374.0     7.937253933193772  400.0   0.935              4   100006    
373.0     5.619905100029122  400.0   0.9325             4   230017    
373.0     6.3966136874651625 400.0   0.9325             4   340002    
368.0     3.559026084010437  400.0   0.92               4   420004    
365.0     7.58836829188814   400.0   0.9125             4   230038    
343.0     11.026483271348727 400.0   0.8575             4   450184    
302.0 

In [7]:
##exclude those providers with high readmission rations scores (1 = bad, 0 or null OK)
sql = """SELECT 
         score_agg,
         score_stddev,
         max_agg,
         perc_score,
         cnt,
         ecft.providerid,
         rr.aggregateexcessreadmissions
         
         FROM t_hospitalseffectivecare_filtered_scored as ecft
         left join 
              t_hospitalsreadmissionratios as rr
              on ecft.providerid = rr.providerid
         where rr.aggregateexcessreadmissions <> 1
         ORDER BY cnt desc, perc_score desc, providerid
         """

t_hospitalseffectivecare_filtered_scored_cleaned = sqlContext.sql(sql)
t_hospitalseffectivecare_filtered_scored_cleaned.registerTempTable("t_hospitalseffectivecare_filtered_scored_cleaned")

In [8]:
##join back to provider_ids and calculate the providers with lowest score variability (< 2 stds) and highest aggregates as top 10
sql = """SELECT 
         score_agg,
         score_stddev,
         max_agg,
         perc_score,
         cnt,
         ecftc.providerid,
         aggregateexcessreadmissions,
         h.hospitalname,
         h.state,
         h.hospitaltype
         
         FROM t_hospitalseffectivecare_filtered_scored_cleaned as ecftc
         inner join 
              t_hospitals as h
              on ecftc.providerid = h.providerid
         where score_stddev < 2
         ORDER BY cnt desc, perc_score desc, providerid
         """

t_best_hospitals = sqlContext.sql(sql)
#t_best_hospitals.show()

In [9]:
## send to Pandas finalize
best_hospitals_df = t_best_hospitals.toPandas()

In [10]:
out = best_hospitals_df[['hospitalname','score_agg','perc_score','score_stddev']].head(10)

In [11]:
out.to_csv("/usr/local/w205-hw/exercise_1/investigations/best_hospitals/best_hospitals.txt")

#### Question 2: What states are models of high-quality care?

In [12]:
#register table from answering question 1
#create a Spark Dataframe
t_best_hospitals = sqlContext.createDataFrame(best_hospitals_df)

#Register as a table
t_best_hospitals.registerTempTable("t_best_hospitals")


In [13]:
t_best_hospitals.schema.fields

[StructField(score_agg,DoubleType,true),
 StructField(score_stddev,DoubleType,true),
 StructField(max_agg,DoubleType,true),
 StructField(perc_score,DoubleType,true),
 StructField(cnt,LongType,true),
 StructField(providerid,StringType,true),
 StructField(aggregateexcessreadmissions,LongType,true),
 StructField(hospitalname,StringType,true),
 StructField(state,StringType,true),
 StructField(hospitaltype,StringType,true)]

In [14]:
##Recompute state scores on procedures using a similar methodology (based on the best hosptials)
sql = """SELECT 
       sum(score_agg) AS score_agg,
       stddev_samp(score_agg) as score_stddev,
       sum(max_agg) AS max_agg,
       sum(score_agg) / sum(max_agg) as perc_score,
       count(providerid) as cnt_providers,
       state
FROM
       t_best_hospitals AS x
GROUP BY state
ORDER BY cnt_providers desc, perc_score desc, state"""


t_best_states = sqlContext.sql(sql)
t_best_states.registerTempTable("t_best_states")

In [17]:
#write output
#t_best_states.show()
best_states_df = t_best_states.toPandas()
out = best_states_df[['state','score_agg','perc_score','score_stddev']].head(10)
out.to_csv("/usr/local/w205-hw/exercise_1/investigations/best_states/best_states.txt")

In [15]:
t_best_states.count()

50L

####Question 3: Which procedures have the greatest variability between hospitals?  


In [26]:
##Review variability in scores, not just those originally sampled

##look at all procedures measured over a certain period 4Q2013 and 3Q2014
sql = """SELECT 
       variance(score) AS score_variance,
       stddev_samp(score) as score_stddev,
       count(measureid) as cnt,
       measurename
  FROM
  (SELECT ecf.providerid as providerid,
          ecf.condition as condition,
          ecf.measureid as measureid,
          ecf.measurename as measurename,
          ecf.score as score,
          ecf.sample as sample,
          ecr.max as max
   FROM t_hospitalseffectivecare AS ecf
   LEFT JOIN t_hospitalseffectivecareranges AS ecr ON ecf.measureid = ecr.measureid
   LEFT JOIN t_measuredates as m on ecf.measureid = m.measureid  
   WHERE cast(ecf.sample AS double) > 30 and ecr.max = 100
         and m.measurestartquarter = '4Q2013' and m.measureendquarter = '3Q2014'
   
   ) AS x
GROUP BY measurename
ORDER BY score_variance desc, cnt desc, measurename"""


t_scorevariability = sqlContext.sql(sql)

#Register as a table
t_scorevariability.registerTempTable("t_scorevariability")
##t_scorevariability.show()

#write output
#t_scorevariability.show()
scorevariability_df = t_scorevariability.toPandas()
out = scorevariability_df[['score_variance','score_stddev','cnt','measurename']].head(10)
out.to_csv("/usr/local/w205-hw/exercise_1/investigations/hospital_variability/scorevariability.txt")

####Question 4: Are average scores for hospital quality or procedural variability correlated with patient survey responses?



In [35]:
##Just look at correlation between hospital quality and patient survey responses... More indicative IMO
##

#import seaborn as sns
#import matplotlib.pyplot as plt
##sns.pairplot(nba[["ast", "fg", "trb"]])
##plt.show()

#print(df.corr())

sql = """SELECT 
         ecftc.perc_score,
         hess.scaledscoreaverage,
         ecftc.providerid,
         h.hospitalname,
         h.state,
         h.hospitaltype
         FROM t_hospitalseffectivecare_filtered_scored_cleaned as ecftc
         inner join 
              t_hospitals as h
              on ecftc.providerid = h.providerid
         inner join
              t_hospitalspatientexperiencescaledscore as hess
               on ecftc.providerid = hess.providerid
         ORDER BY perc_score desc, providerid
         """

t_quality_to_experience_correlation = sqlContext.sql(sql)

#Register as a table
t_quality_to_experience_correlation.registerTempTable("t_quality_to_experience_correlation")

quality_to_experience_correlation_df = t_quality_to_experience_correlation.toPandas()


In [38]:
quality_to_experience_correlation_df[['perc_score','scaledscoreaverage']]

Unnamed: 0,perc_score,scaledscoreaverage
0,1.000000,0.6375
1,1.000000,0.6125
2,1.000000,0.5250
3,1.000000,0.1375
4,1.000000,0.4000
5,1.000000,0.1375
6,1.000000,0.2500
7,1.000000,0.0750
8,1.000000,0.8125
9,1.000000,0.1625


In [43]:
print(quality_to_experience_correlation_df[['perc_score','scaledscoreaverage']].corr())

                    perc_score  scaledscoreaverage
perc_score            1.000000            0.029226
scaledscoreaverage    0.029226            1.000000


In [51]:
out = quality_to_experience_correlation_df[['perc_score','scaledscoreaverage']].corr()
out.to_csv("/usr/local/w205-hw/exercise_1/investigations/hospitals_and_patients/qualitycorrelation.txt")