# Dataset Preparation

In this notebook, we will combine together data from snap enrollment, medicare charges, and unemployment and organize them by year and state.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Exploration with Spark") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/22 03:25:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#spark.stop()

First I'll setup reading my files from hdfs and test

In [3]:
datasets_dir = 'hdfs://columbia.cs.colostate.edu:31175/final/datasets/'

inpatient_sets = [datasets_dir+'inpatient_charges_'+str(year)+'.csv' for year in range(2011,2015)]
snap_set = datasets_dir+'snap_enrollment.csv'
unemployment_set = datasets_dir+'unemployment.csv'

def read_file(csv_file_path):
    return spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("sep", ",")  \
        .load(csv_file_path)

In [4]:
inpatient = read_file(inpatient_sets[0])
inpatient.show()

                                                                                

+-----------+--------------------+-----------------------+-------------+--------------+----------------+--------------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|provider_id|       provider_name|provider_street_address|provider_city|provider_state|provider_zipcode|      drg_definition|hospital_referral_region_description|total_discharges|average_covered_charges|average_total_payments|average_medicare_payments|
+-----------+--------------------+-----------------------+-------------+--------------+----------------+--------------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|     220024|HOLYOKE MEDICAL C...|       575 BEECH STREET|      HOLYOKE|            MA|            1040|057 - DEGENERATIV...|                    MA - Springfield|              13|                  17210|                  7131|               

Now we load all of the inpatient files and determine how many categories of *drg_definition* exist in the set

In [5]:
from pyspark.sql.functions import lit

inpatient = [read_file(inpatient_sets[i]) for i in range(0,4)]
charges = inpatient[0].withColumn("year", lit(2011))
for i in range(1,4):
    charges = charges.union(inpatient[i].withColumn("year", lit(2011+i)))


drf_catalog = charges.select("drg_definition").distinct()

drf_catalog.show()
print(drf_catalog.count())

                                                                                

+--------------------+
|      drg_definition|
+--------------------+
|811 - RED BLOOD C...|
|329 - MAJOR SMALL...|
|191 - CHRONIC OBS...|
|683 - RENAL FAILU...|
|918 - POISONING &...|
|481 - HIP & FEMUR...|
|249 - PERC CARDIO...|
|390 - G.I. OBSTRU...|
|300 - PERIPHERAL ...|
|190 - CHRONIC OBS...|
|057 - DEGENERATIV...|
|243 - PERMANENT C...|
|252 - OTHER VASCU...|
|314 - OTHER CIRCU...|
|207 - RESPIRATORY...|
|690 - KIDNEY & UR...|
|247 - PERC CARDIO...|
|286 - CIRCULATORY...|
|292 - HEART FAILU...|
|563 - FX, SPRN, S...|
+--------------------+
only showing top 20 rows





565


                                                                                

We will rename the column with the state to "state". We will make this match for all of the datasets, so that we can eventually join them.

In [6]:
charges_ready = charges.withColumnRenamed("provider_state", "state")
charges_ready.show()

+-----------+--------------------+-----------------------+-------------+-----+----------------+--------------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+----+
|provider_id|       provider_name|provider_street_address|provider_city|state|provider_zipcode|      drg_definition|hospital_referral_region_description|total_discharges|average_covered_charges|average_total_payments|average_medicare_payments|year|
+-----------+--------------------+-----------------------+-------------+-----+----------------+--------------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+----+
|     220024|HOLYOKE MEDICAL C...|       575 BEECH STREET|      HOLYOKE|   MA|          1040.0|057 - DEGENERATIV...|                    MA - Springfield|              13|                  17210|                  7131|                     6359|2011|
|   

In [7]:
snap = read_file(snap_set)
snap.show()

+--------------------+-----------------------------+------------------------------+------------------------------+--------------------------------+---------------------------------+---------------------------------+-----------------+----------+-----+---------+-------+
|             GeoDesc|SNAP_PA_Participation_Persons|SNAP_NPA_Participation_Persons|SNAP_All_Participation_Persons|SNAP_PA_Participation_Households|SNAP_NPA_Participation_Households|SNAP_All_Participation_Households|SNAP_All_Issuance|      Date| FIPS|SubCounty|Program|
+--------------------+-----------------------------+------------------------------+------------------------------+--------------------------------+---------------------------------+---------------------------------+-----------------+----------+-----+---------+-------+
| 1000112 DE EBT KENT|                        35208|                           294|                         35502|                           17745|                               93|            

We intend to join the data by state and year. For the snap enrollment data, this means transforming the `Date` field into a year.

In [8]:
from pyspark.sql.functions import to_date, year, col

snap = snap.withColumn("Date", to_date(col("Date"), 'M/d/yyyy'))
snap = snap.withColumn("year", year(col("Date")))

snap = snap.filter((col("year") >= 2011) & (col("year") <= 2014))

snap.show()

+--------------------+-----------------------------+------------------------------+------------------------------+--------------------------------+---------------------------------+---------------------------------+-----------------+----------+-----+---------+-------+----+
|             GeoDesc|SNAP_PA_Participation_Persons|SNAP_NPA_Participation_Persons|SNAP_All_Participation_Persons|SNAP_PA_Participation_Households|SNAP_NPA_Participation_Households|SNAP_All_Participation_Households|SNAP_All_Issuance|      Date| FIPS|SubCounty|Program|year|
+--------------------+-----------------------------+------------------------------+------------------------------+--------------------------------+---------------------------------+---------------------------------+-----------------+----------+-----+---------+-------+----+
| 1000112 DE EBT KENT|                        35634|                            31|                         35665|                           16903|                               

The snap enrollment data also has the field `GeoDesc` which has the state in the text. We can extract this with basic string manipulation.

In [9]:
from pyspark.sql.functions import split

snap_w_state = snap.withColumn("state", split(col("GeoDesc"), " ")[1])
snap_w_state.show()

+--------------------+-----------------------------+------------------------------+------------------------------+--------------------------------+---------------------------------+---------------------------------+-----------------+----------+-----+---------+-------+----+-----+
|             GeoDesc|SNAP_PA_Participation_Persons|SNAP_NPA_Participation_Persons|SNAP_All_Participation_Persons|SNAP_PA_Participation_Households|SNAP_NPA_Participation_Households|SNAP_All_Participation_Households|SNAP_All_Issuance|      Date| FIPS|SubCounty|Program|year|state|
+--------------------+-----------------------------+------------------------------+------------------------------+--------------------------------+---------------------------------+---------------------------------+-----------------+----------+-----+---------+-------+----+-----+
| 1000112 DE EBT KENT|                        35634|                            31|                         35665|                           16903|             

For the unemployment data, the year just requires changing the capitalization. The state however is more difficult, as they spell out the names of the states. So we need to transform for example Colorado -> CO.

In [10]:
unemployment = read_file(unemployment_set)
unemployment = unemployment.withColumnRenamed("State/Area", "state").withColumnRenamed("Year", "year")
unemployment.show()

+---------+--------------------+----+-----+---------------------------------------------------------+----------------------------------------+--------------------------------------+------------------------------+-------------------------------------------------+--------------------------------+---------------------------------------------------+
|FIPS Code|               state|year|Month|Total Civilian Non-Institutional Population in State/Area|Total Civilian Labor Force in State/Area|Percent (%) of State/Area's Population|Total Employment in State/Area|Percent (%) of Labor Force Employed in State/Area|Total Unemployment in State/Area|Percent (%) of Labor Force Unemployed in State/Area|
+---------+--------------------+----+-----+---------------------------------------------------------+----------------------------------------+--------------------------------------+------------------------------+-------------------------------------------------+--------------------------------+---------

In [11]:
state_abbreviations = {
    'Alabama': 'AL', 'Alaska': 'AK', 'Arizona': 'AZ', 'Arkansas': 'AR', 'California': 'CA',
    'Colorado': 'CO', 'Connecticut': 'CT', 'Delaware': 'DE', 'Florida': 'FL', 'Georgia': 'GA',
    'Hawaii': 'HI', 'Idaho': 'ID', 'Illinois': 'IL', 'Indiana': 'IN', 'Iowa': 'IA',
    'Kansas': 'KS', 'Kentucky': 'KY', 'Louisiana': 'LA', 'Maine': 'ME', 'Maryland': 'MD',
    'Massachusetts': 'MA', 'Michigan': 'MI', 'Minnesota': 'MN', 'Mississippi': 'MS',
    'Missouri': 'MO', 'Montana': 'MT', 'Nebraska': 'NE', 'Nevada': 'NV', 'New Hampshire': 'NH',
    'New Jersey': 'NJ', 'New Mexico': 'NM', 'New York': 'NY', 'North Carolina': 'NC',
    'North Dakota': 'ND', 'Ohio': 'OH', 'Oklahoma': 'OK', 'Oregon': 'OR', 'Pennsylvania': 'PA',
    'Rhode Island': 'RI', 'South Carolina': 'SC', 'South Dakota': 'SD', 'Tennessee': 'TN',
    'Texas': 'TX', 'Utah': 'UT', 'Vermont': 'VT', 'Virginia': 'VA', 'Washington': 'WA',
    'West Virginia': 'WV', 'Wisconsin': 'WI', 'Wyoming': 'WY', 'District of Columbia': 'DC'
}


In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def abbreviate_state(full_name):
    return state_abbreviations.get(full_name, None)  

abbreviate_state_udf = udf(abbreviate_state, StringType())


We only have inpatient data on the years 2011-2014 so we drop the years we don't need in the unemployment

In [13]:
unemployment = unemployment.withColumn("state", abbreviate_state_udf(col("state")))
unemployment = unemployment.na.drop(subset=["state"])
unemployment = unemployment.filter((col("year") >= 2011) & (col("year") <= 2014))
unemployment = unemployment.withColumnRenamed("Percent (%) of Labor Force Unemployed in State/Area", "unemployment")
unemployment.show()

+---------+-----+----+-----+---------------------------------------------------------+----------------------------------------+--------------------------------------+------------------------------+-------------------------------------------------+--------------------------------+------------+
|FIPS Code|state|year|Month|Total Civilian Non-Institutional Population in State/Area|Total Civilian Labor Force in State/Area|Percent (%) of State/Area's Population|Total Employment in State/Area|Percent (%) of Labor Force Employed in State/Area|Total Unemployment in State/Area|unemployment|
+---------+-----+----+-----+---------------------------------------------------------+----------------------------------------+--------------------------------------+------------------------------+-------------------------------------------------+--------------------------------+------------+
|        1|   AL|2011|    1|                                             3,717,148   |                            2,21

Now we intend to combine all of the data. We will start with the inpatient charges. We desire a row for each state/year combo and each of those to have a column for every drg_definition. The avg_total_payments for those fields is what is placed in the drg_definition column. We get rid of any drg_definitions that are not present in all state/year combos.

In [14]:
charges_reduced = charges_ready.select('drg_definition', "state", "year", 'average_total_payments')
charges_reduced.show()

+--------------------+-----+----+----------------------+
|      drg_definition|state|year|average_total_payments|
+--------------------+-----+----+----------------------+
|057 - DEGENERATIV...|   MA|2011|                  7131|
|064 - INTRACRANIA...|   MA|2011|                 11786|
|065 - INTRACRANIA...|   MA|2011|                  7610|
|069 - TRANSIENT I...|   MA|2011|                  4860|
|101 - SEIZURES W/...|   MA|2011|                  5059|
|176 - PULMONARY E...|   MA|2011|                  6720|
|177 - RESPIRATORY...|   MA|2011|                 13027|
|178 - RESPIRATORY...|   MA|2011|                  9395|
|189 - PULMONARY E...|   MA|2011|                  8550|
|190 - CHRONIC OBS...|   MA|2011|                  7803|
|191 - CHRONIC OBS...|   MA|2011|                  6331|
|192 - CHRONIC OBS...|   MA|2011|                  4780|
|193 - SIMPLE PNEU...|   MA|2011|                  9577|
|194 - SIMPLE PNEU...|   MA|2011|                  6579|
|195 - SIMPLE PNEU...|   MA|201

In [15]:
from pyspark.sql.functions import countDistinct, first

total_drg_combos = charges_reduced.select("state", "year").distinct().count()

common_drgs = charges_reduced.groupBy("drg_definition").agg(countDistinct("state", "year").alias("count")) \
                .filter(col("count") == total_drg_combos) \
                .select("drg_definition")

charges_common = charges_reduced.join(common_drgs, "drg_definition", "inner")
transposed_charges = charges_common.groupBy("state", "year").pivot("drg_definition").agg(first("average_total_payments"))

transposed_charges.show()



24/04/22 03:25:45 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----+----+----------------------------------------------------------+---------------------------------------------------------------+----------------------+--------------------------------+--------------------------------------------------+-------------------------------------------+-------------------------------------------------+------------------------------------------------+------------------------------------------------------+---------------------------------------+--------------------------------------+--------------------------------------------+-----------------------------------------------------------------+-----------------------------------------+----------------------------------------------+--------------------------------------------------------------------------+-------------------------------------------------------+------------------------------------+---------------------------------------------------------+-------------------------------------------------------

Finally we prepare to join the dataframes. We do this by reducing the unemployment and snap enrollment data to just have the columns for `state`,`year` and the value of interest for that dataset. Both of these have duplicate entries by county or region that fit the profile of a state/year combo, so we have to average that data.

In [19]:
from pyspark.sql.functions import avg

unemployment_reduced = unemployment.select("state", "year", "unemployment")
unemployment_reduced = unemployment_reduced.groupBy("state", "year").agg(avg("unemployment").alias("avg_unemployment"))
unemployment_reduced.show()

+-----+----+------------------+
|state|year|  avg_unemployment|
+-----+----+------------------+
|   AZ|2013| 7.816666666666666|
|   WV|2012| 7.258333333333333|
|   MI|2013|             8.725|
|   SC|2013|             7.425|
|   NM|2013| 6.908333333333332|
|   HI|2014| 4.233333333333333|
|   MT|2011|             6.625|
|   IA|2011| 5.650000000000001|
|   FL|2014| 6.433333333333333|
|   TN|2011| 9.108333333333333|
|   IL|2012| 9.049999999999999|
|   IA|2013|4.7250000000000005|
|   VA|2013| 5.550000000000001|
|   MN|2014| 4.266666666666667|
|   CA|2013| 9.033333333333335|
|   GA|2012| 9.041666666666668|
|   MD|2014| 5.733333333333334|
|   WA|2013| 6.633333333333334|
|   DC|2011|10.191666666666668|
|   ND|2013| 2.891666666666666|
+-----+----+------------------+
only showing top 20 rows



In [20]:
snap_reduced = snap_w_state.withColumnRenamed("SNAP_All_Issuance", "snap")
snap_reduced = snap_reduced.select("state","year", "snap")
snap_reduced = snap_reduced.groupBy("state", "year").agg(avg("snap").alias("avg_snap"))
snap_reduced.show()

+-----+----+--------------------+
|state|year|            avg_snap|
+-----+----+--------------------+
|   AZ|2013|   9151211.733333332|
|   MI|2013|  2879850.1428571427|
|   WV|2012|        4.65712415E7|
|   SC|2013|  2506513.2608695654|
|   HI|2014|         1.5817335E7|
|   NM|2013|           1623640.2|
|   FL|2014|    6857740.71969697|
|   VA|2013|   901454.6390977444|
|   MN|2014|  243215.50218340612|
|   IL|2012|  2187851.6101694917|
|   IA|2013|  491554.93434343435|
|   CA|2013|1.0943420965517242E7|
|   GA|2012|   1615027.889937107|
|   MD|2014|  3878353.1041666665|
|   WA|2013|       1.397572075E8|
|   ND|2013|  139581.42156862744|
|   VT|2013|           6187163.0|
|   MN|2012|   275438.0616740088|
|   MA|2014|        1.05110731E8|
|   MT|2012|         1.6036237E7|
+-----+----+--------------------+
only showing top 20 rows



And finally we can join our datasets and save.

In [21]:
merged = transposed_charges.join(snap_reduced, ["state", "year"], "inner")
merged = merged.join(unemployment_reduced, ["state", "year"], "inner")
merged.show()

                                                                                

+-----+----+----------------------------------------------------------+---------------------------------------------------------------+----------------------+--------------------------------+--------------------------------------------------+-------------------------------------------+-------------------------------------------------+------------------------------------------------+------------------------------------------------------+---------------------------------------+--------------------------------------+--------------------------------------------+-----------------------------------------------------------------+-----------------------------------------+----------------------------------------------+--------------------------------------------------------------------------+-------------------------------------------------------+------------------------------------+---------------------------------------------------------+-------------------------------------------------------

In [22]:
output_path_parquet = 'hdfs://columbia.cs.colostate.edu:31175/final/uber/services_and_health.parquet'

merged.write.mode('overwrite').parquet(output_path_parquet)



                                                                                

In [23]:
output_path_csv = 'hdfs://columbia.cs.colostate.edu:31175/final/uber/services_and_health.csv'

merged.write.option("header", "true").mode('overwrite').csv(output_path_csv)



                                                                                