### Setting up Environment with namespaces, variables, and functions

In [1]:
import pandas as pd
import pickle

In [2]:
def to_pandas_df(spark_df, num_rows=20):
    '''Converts spark.DataFrame to pandas.DataFrame for easy printing in Jupyter
    
    This function makes it easy to peak into a spark.DataFrame.
    Note: spark.sql.toPandas() will crash if DataFrame overloads driver's memory
    
    Args:
        * spark_df - spark.DataFrame object
        * num_rows - number of rows to convert
        
    Returns:
        * pandas DataFrame
    '''
    
    temp_df = spark_df.take(num_rows)
    return pd.DataFrame(temp_df,
                        columns=temp_df[0].__fields__)

---

### Load 2009-2010 IP data

#### First time have inferSchema=True then pickle schema so we can reuse for next time to speed up read from disk

In [3]:
# .read supports wildcards *
thcic_ip_df = (sqlContext
                   .read
                   .csv(path='hdfs://sandbox.hortonworks.com/user/guest/thcic/spark/PUDF_base*_tab.txt',
                        sep='\t',
                        header=True,
                        inferSchema=True))

In [4]:
to_pandas_df(thcic_ip_df, num_rows=5)

Unnamed: 0,DISCHARGE_QTR,THCIC_ID,PROVIDER_NAME,FAC_TEACHING_IND,FAC_PSYCH_IND,FAC_REHAB_IND,FAC_ACUTE_CARE_IND,FAC_SNF_IND,FAC_LONG_TERM_AC_IND,FAC_OTHER_LTC_IND,...,HCFA_MDC,APR_MDC,HCFA_DRG,APR_DRG,RISK_MORTALITY,ILLNESS_SEVERITY,ATTENDING_PHYS_UNIF_ID,OPERATING_PHYS_UNIF_ID,CERT_STATUS,RECORD_ID
0,2009Q1,100.0,Austin State Hospital,,X,,,,,,...,19,19,880,756,1,1,9999999998,,2,120090000509
1,2009Q1,100.0,Austin State Hospital,,X,,,,,,...,19,19,881,754,1,1,9999999998,,2,120090000373
2,2009Q1,100.0,Austin State Hospital,,X,,,,,,...,19,19,881,754,1,2,9999999998,,2,120090000283
3,2009Q1,100.0,Austin State Hospital,,X,,,,,,...,19,19,881,754,1,2,9999999998,,2,120090000159
4,2009Q1,100.0,Austin State Hospital,,X,,,,,,...,19,19,881,754,1,1,9999999998,,2,120090000477


In [5]:
f = open('IP_schema', 'wb')
pickle.dump(thcic_ip_df.schema, f)

#### Load schema via pickle, use second read for subsequent analyses

In [6]:
f = open('IP_schema', 'rb')
thcic_ip_schema = pickle.load(f)

In [7]:
thcic_ip_df.schema == thcic_ip_schema

True

In [8]:
# .read supports wildcards *
thcic_ip_df = (sqlContext
                   .read
                   .csv(path='hdfs://sandbox.hortonworks.com/user/guest/thcic/spark/PUDF_base*_tab.txt',
                        sep='\t',
                        header=True,
                        schema=thcic_ip_schema))

---

### How many patients per MS-DRG per COUNTY per DISCHARGE_QTR

* Simple Analysis
    * How many patients per county per year?
    * map this out in folium
    
    * We also have MS-DRG so we can see how that is changing
    * in planning, this is the start to see how you are faring against you competitors for people in certain zip codes
    for specific product lines

In [9]:
thcic_ip_qtr_county_msdrg = (thcic_ip_df
                                 .select('DISCHARGE_QTR', 'COUNTY', 'HCFA_DRG')
                                 .groupby(['DISCHARGE_QTR', 'COUNTY', 'HCFA_DRG'])
                                 .count())

In [10]:
thcic_grouped_pandas = thcic_ip_qtr_county_msdrg.toPandas()

In [11]:
thcic_grouped_pandas.head()

Unnamed: 0,DISCHARGE_QTR,COUNTY,HCFA_DRG,count
0,2009Q1,167.0,30.0,4
1,2009Q1,471.0,158.0,4
2,2009Q1,201.0,256.0,28
3,2009Q1,245.0,465.0,3
4,2009Q1,471.0,947.0,2


### Remove NAs if any

In [12]:
thcic_grouped_pandas.isnull().sum()

DISCHARGE_QTR       2
COUNTY           5638
HCFA_DRG           13
count               0
dtype: int64

In [13]:
thcic_grouped_pandas = thcic_grouped_pandas.dropna()

In [14]:
thcic_grouped_pandas.isnull().sum()

DISCHARGE_QTR    0
COUNTY           0
HCFA_DRG         0
count            0
dtype: int64

### Convert from float to int for DRG (MS-DRG-XXX format)
### Convert from float to zero padded string for FIPS county code (Texas FIPS format 48XXX)

In [15]:
thcic_grouped_pandas['HCFA_DRG'] = (thcic_grouped_pandas['HCFA_DRG']
                                        .astype(int))

In [16]:
thcic_grouped_pandas['COUNTY'] = (thcic_grouped_pandas['COUNTY']
                                      .astype(int)
                                      .astype(str)
                                      .str.zfill(3))

In [17]:
thcic_grouped_pandas.head()

Unnamed: 0,DISCHARGE_QTR,COUNTY,HCFA_DRG,count
0,2009Q1,167,30,4
1,2009Q1,471,158,4
2,2009Q1,201,256,28
3,2009Q1,245,465,3
4,2009Q1,471,947,2


### Save to csv so we can use in next process

In [18]:
thcic_grouped_pandas.to_csv('thcic-discharges-by-dischargeqtr_county_msdrg.csv', index=False)

### Number of rows

In [19]:
thcic_ip_df.count()

5929937

In [20]:
thcic_grouped_pandas.count()

DISCHARGE_QTR    487629
COUNTY           487629
HCFA_DRG         487629
count            487629
dtype: int64

----

### Can we group hospitals by MS-DRG data?

* find similar hospitals
* similar by quarter
* similar by year
* similar across entire dataset
* based on MSDRG procedures (HCFA_DRG)
* PCA then k-means cluster analysis

In [21]:
thcic_ip_qtr_provider_msdrg = (thcic_ip_df
                                   .select('DISCHARGE_QTR', 'PROVIDER_NAME', 'HCFA_DRG')
                                   .groupby(['DISCHARGE_QTR', 'PROVIDER_NAME', 'HCFA_DRG'])
                                   .count())

In [22]:
thcic_provider_grouped_pandas = thcic_ip_qtr_provider_msdrg.toPandas()

In [23]:
thcic_provider_grouped_pandas.head()

Unnamed: 0,DISCHARGE_QTR,PROVIDER_NAME,HCFA_DRG,count
0,2009Q1,UT Medical Branch Hospital,235.0,2
1,2009Q1,UT MD Anderson Cancer Center,740.0,10
2,2009Q1,Baptist St Anthonys Health System-Baptist Campus,28.0,1
3,2009Q1,Baptist St Anthonys Health System-Baptist Campus,59.0,2
4,2009Q1,Baptist St Anthonys Health System-Baptist Campus,252.0,11


In [24]:
thcic_provider_grouped_pandas.isnull().sum()

DISCHARGE_QTR      2
PROVIDER_NAME    160
HCFA_DRG          13
count              0
dtype: int64

In [25]:
thcic_provider_grouped_pandas = thcic_provider_grouped_pandas.dropna()
thcic_provider_grouped_pandas.isnull().sum()

DISCHARGE_QTR    0
PROVIDER_NAME    0
HCFA_DRG         0
count            0
dtype: int64

In [26]:
thcic_provider_grouped_pandas['HCFA_DRG'] = (thcic_provider_grouped_pandas['HCFA_DRG']
                                                 .astype(int))

In [27]:
thcic_provider_grouped_pandas.head()

Unnamed: 0,DISCHARGE_QTR,PROVIDER_NAME,HCFA_DRG,count
0,2009Q1,UT Medical Branch Hospital,235,2
1,2009Q1,UT MD Anderson Cancer Center,740,10
2,2009Q1,Baptist St Anthonys Health System-Baptist Campus,28,1
3,2009Q1,Baptist St Anthonys Health System-Baptist Campus,59,2
4,2009Q1,Baptist St Anthonys Health System-Baptist Campus,252,11


In [28]:
thcic_provider_grouped_pandas.to_csv('thcic-discharges-by-dischargeqtr_provider_msdrg.csv', index=False)

### How many rows did we save?

In [29]:
thcic_ip_df.count()

5929937

In [30]:
thcic_provider_grouped_pandas.count()

DISCHARGE_QTR    697445
PROVIDER_NAME    697445
HCFA_DRG         697445
count            697445
dtype: int64