In [11]:
from schemas import *
from pyspark.sql.types import StringType, FloatType, IntegerType
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from more_pyspark import get_spark_types, to_pandas
spark = SparkSession.builder.appName('Ops').getOrCreate()
from more_dfply import case_when, ifelse
from more_dfply.facets import text_facet, text_filter
from more_pyspark import pprint_schema
from composable import pipeable
Facility_to_fips = spark.read.csv('./data/CMS_Facility_To_fips_crosswalk.csv', header=True, schema=Facility_to_FIPS_schema)
Poverty = spark.read.csv("./data/PovertyEstimates.csv", header=True, schema=Poverty_schema)
Timely_Effective = spark.read.csv("./data/Timely_and_Effective_Care-Hospital.csv", header=True, schema=Timely_schema, dateFormat=Timely_date_format)

In [12]:
Timely_Effective >> pprint_schema
Timely_Effective.take(2) >> to_pandas

StructType([StructField('Facility ID', StringType(), True),
            StructField('Facility Name', StringType(), True),
            StructField('Address', StringType(), True),
            StructField('City', StringType(), True),
            StructField('State', StringType(), True),
            StructField('ZIP Code', StringType(), True),
            StructField('County Name', StringType(), True),
            StructField('Phone Number', StringType(), True),
            StructField('Condition', StringType(), True),
            StructField('Measure ID', StringType(), True),
            StructField('Measure Name', StringType(), True),
            StructField('Score', StringType(), True),
            StructField('Sample', StringType(), True),
            StructField('Footnote', StringType(), True),
            StructField('Start Date', DateType(), True),
            StructField('End Date', DateType(), True)])


Unnamed: 0,Facility ID,Facility Name,Address,City,State,ZIP Code,County Name,Phone Number,Condition,Measure ID,Measure Name,Score,Sample,Footnote,Start Date,End Date
0,10001,SOUTHEAST HEALTH MEDICAL CENTER,1108 ROSS CLARK CIRCLE,DOTHAN,AL,36301,Houston,(334) 793-8701,Emergency Department,EDV,Emergency department volume,high,,,2019-01-01,2019-12-31
1,10001,SOUTHEAST HEALTH MEDICAL CENTER,1108 ROSS CLARK CIRCLE,DOTHAN,AL,36301,Houston,(334) 793-8701,Preventive Care,IMM_3,Healthcare workers given influenza vaccination,99,4119.0,,2020-10-01,2021-03-31


In [13]:
Timely_Effective_2 = (Timely_Effective
.select(('Facility ID'), 'State', 'County Name', 'Condition', 'Measure Name', 'Score')
.where(col('Condition') == "Emergency Department")
.where((Timely_Effective['Measure Name'] == ("Average (median) time patients spent in the emergency department before leaving from the visit A lower number of minutes is better")) |
 (Timely_Effective['Measure Name'] == "Left before being seen"))
.select('Facility ID', 'State', 'County Name', 'Measure Name', 'Score')
.withColumn('Score', when(col('Score') == "Not Available", 0).otherwise(col('Score')))
.withColumn('Score', col('Score').cast('float'))
)
columns = ['Facility ID', 'State', 'County Name']
@pipeable
def spread(val_col, var_col, group_by_col, df):
    return  (df
             .groupBy(group_by_col)
             .pivot(val_col)
             .sum(var_col))
Timely_2_unstacked = Timely_Effective_2 >> spread('Measure Name', 'Score', columns)

Timely_Effective_3 = (Timely_2_unstacked
.withColumn('Avg_median_time_spent_in_emergency_department', col('Average (median) time patients spent in the emergency department before leaving from the visit A lower number of minutes is better'))
.drop('Average (median) time patients spent in the emergency department before leaving from the visit A lower number of minutes is better')
)

Timely_Effective_3.take(5) >> to_pandas

Unnamed: 0,Facility ID,State,County Name,Left before being seen,Avg_median_time_spent_in_emergency_department
0,50737,CA,Los Angeles,0.0,172.0
1,110105,GA,Colquitt,1.0,152.0
2,50115,CA,San Diego,3.0,284.0
3,110025,GA,Glynn,3.0,186.0
4,230279,MI,Livingston,0.0,0.0


In [14]:
Timely_with_FIPS = (Timely_Effective_3
.join(Facility_to_fips, 'Facility ID', how='left')
)
Timely_with_FIPS.collect() >> to_pandas

Unnamed: 0,Facility ID,State,County Name,Left before being seen,Avg_median_time_spent_in_emergency_department,FIPS
0,050737,CA,LOS ANGELES,0.0,172.0,06037
1,110105,GA,COLQUITT,1.0,152.0,13071
2,050115,CA,SAN DIEGO,3.0,284.0,06073
3,110025,GA,GLYNN,3.0,186.0,13127
4,230279,MI,LIVINGSTON,0.0,0.0,26093
...,...,...,...,...,...,...
4713,390044,PA,BERKS,3.0,208.0,42011
4714,460057,UT,SUMMIT,0.0,126.0,49043
4715,381324,OR,JEFFERSON,1.0,130.0,41031
4716,523302,WI,WINNEBAGO,0.0,0.0,55139


In [15]:
(Poverty
.select('FIPS_code', 'PCTPOVALL_2020')
).take(2) >> to_pandas

Unnamed: 0,FIPS_code,PCTPOVALL_2020
0,0,11.9
1,1000,14.9


In [16]:
Poverty_2 = (Poverty
.select('FIPS_code', 'PCTPOVALL_2020')
.withColumn('FIPS', col('FIPS_code'))
.select('FIPS', 'PCTPOVALL_2020')) 
Poverty_2 >> pprint_schema
Poverty_2.take(2) >> to_pandas

StructType([StructField('FIPS', StringType(), True),
            StructField('PCTPOVALL_2020', DoubleType(), True)])


Unnamed: 0,FIPS,PCTPOVALL_2020
0,0,11.9
1,1000,14.9


In [17]:
poverty_timely = (Timely_with_FIPS
.join(Poverty_2, 'FIPS', how='left')
.withColumn('Percent_poverty_2020', col('PCTPOVALL_2020'))
.drop('PCTPOVALL_2020')
)

poverty_timely.collect() >> to_pandas

Unnamed: 0,FIPS,Facility ID,State,County Name,Left before being seen,Avg_median_time_spent_in_emergency_department,Percent_poverty_2020
0,06037,050737,CA,LOS ANGELES,0.0,172.0,13.2
1,13071,110105,GA,COLQUITT,1.0,152.0,20.4
2,06073,050115,CA,SAN DIEGO,3.0,284.0,9.5
3,13127,110025,GA,GLYNN,3.0,186.0,15.5
4,26093,230279,MI,LIVINGSTON,0.0,0.0,5.3
...,...,...,...,...,...,...,...
4713,42011,390044,PA,BERKS,3.0,208.0,10.9
4714,49043,460057,UT,SUMMIT,0.0,126.0,4.5
4715,41031,381324,OR,JEFFERSON,1.0,130.0,12.5
4716,55139,523302,WI,WINNEBAGO,0.0,0.0,8.7
