# Goal
### Overall Goal
See how useful Spark is to explore a large data set.
__*PUDF_base_all_tab.txt*__ is 10 GB and 18 M observations on 260 features.

### Specific to this notebook
Generating a cleaner data set and doing additional analysis on:
1. PROVIDER_NAME
2. *ADMIT_WEEKDAY*
3. *pat_age*
4. ~~RACE~~
5. *ETHNICITY*
6. ~~FIRST_PAYMENT_SRC~~
7. ~~SECONDARY_PAYMENT_SRC~~
8. ADMITTING_DIAGNOSIS
9. *TYPE_OF_ADMISSION*
10. *SOURCE_OF_ADMISSION*
11. ~~SEX_CODE~~

__*Justifying Columns to be Removed*__
<table align="left">
<tr>
<th>Column</th><th>Reason</th>
</tr>
<tr><td>SEX_CODE</td><td>Data seems invalid. Male or Female account for less than 40% of observations</td></tr>
<tr><td>RACE</td><td>Over 70% of the race values are not in the dicitonary.</td></tr>
<tr><td>FIRST_PAYMENT_SRC</td><td>Variable is not in dictionary and values don't match any related columns in dictionary.</td></tr>
<tr><td>SECONDARY_PAYMENT_SRC</td><td>Variable is not in dictionary and values don't match any related columns in dictionary.</td></tr>
<tr><td>TYPE_OF_ADMISSION</td><td>Most of the values are NULL. A lot of the actually provided values are not in the dictionary.</td></tr>
<tr><td>SOURCE_OF_ADMISSION</td><td>Most values are not in dictionary.</td></tr>
<tr><td>ETHNICITY</td><td>Number of hispanic folks was very small. Suggests the data is flawed.</td></tr>
</table>

__*Handling Invalid Data*__
<table align="left">
<tr>
<th>Column</th><th>Comments</th>
</tr>
<tr><td>pat_age</td><td>Loosely numeric,but actually represents age range classes. Will attempt to impute values of the invalid age classes. Valid classes have values that can be parsed to numeric. Invalid ones do not.</td></tr>
<tr><td>LENGTH_OF_STAY</td><td>Will remove the few non-numeric values</td></tr>
<tr><td>ADMIT_WEEKDAY</td><td>Most values are in dictionary. Will remove the few invalid values (few than 0.1%)</td></tr>
</table>

In [6]:
from os import path, getcwd
import sys

from pyspark.sql.functions import when
import pandas as pd
import numpy as np

In [73]:
def get_distinct_number(df, col):
    if df is None or col is None:
        print("The dataframe or col is invalid")
        return 0
    elif col not in df.columns:
        print("The column '{}' is not in the dataframe." \
             .format(col))
    else:
        return df.groupBy(col).count().count()

def get_topN_group(df, col, sort=True, highest_first=True):
    if df is None or col is None:
        print("The dataframe or col is invalid")
        return 0
    elif col not in df.columns:
        print("The column '{}' is not in the dataframe." \
             .format(col))
    else:
        if sort:
            return df.groupBy(col).count().orderBy("count", ascending = not highest_first)
        else:
            return df.groupBy(col).count()   

def is_integer(test_string):
    try:
        int(test_string)
        return True
    except:
        return False

is_integer_udf = pyspark.sql.functions.udf(is_integer)

In [3]:
texas_df = spark.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true', delimiter='\t')\
    .load(path.join(getcwd(), "..", "data", "PUDF_base_all_tab.txt"))

In [14]:
for name, dtype in texas_df.select(
    "LENGTH_OF_STAY",
    "PROVIDER_NAME",
    "ADMIT_WEEKDAY",
    "pat_age",
    "ADMITTING_DIAGNOSIS"
).dtypes:
    print(name, dtype)

('LENGTH_OF_STAY', 'string')
('PROVIDER_NAME', 'string')
('ADMIT_WEEKDAY', 'string')
('pat_age', 'string')
('ETHNICITY', 'string')
('ADMITTING_DIAGNOSIS', 'string')
('TYPE_OF_ADMISSION', 'string')
('SOURCE_OF_ADMISSION', 'string')


# Munging

In [52]:
texas_df_selected_subset = texas_df.select(
    "LENGTH_OF_STAY",
    "PROVIDER_NAME",
    "ADMIT_WEEKDAY",
    "pat_age",
    "ADMITTING_DIAGNOSIS"
).filter(
    (texas_df["LENGTH_OF_STAY"] != '*') &\
    (texas_df["LENGTH_OF_STAY"] != 'LENGTH_OF_STAY') &\
    (texas_df["pat_age"] != '*') &\
    (texas_df["pat_age"] != '**') &\
    (texas_df["pat_age"] != 'ZZ') &\
    (texas_df["ADMIT_WEEKDAY"] != 'null') &\
    (texas_df["ADMIT_WEEKDAY"] != '*') &\
    (texas_df["ADMIT_WEEKDAY"] != 'RACE') &\
    (texas_df["ADMIT_WEEKDAY"] != 'ADMIT_WEEKDAY') 
    
)

### LOS & Age

__*Convert Numeric Fields*__

In [54]:
texas_df_selected_subset = texas_df_selected_subset.select(
    texas_df_selected_subset.LENGTH_OF_STAY.cast('float'),
    "PROVIDER_NAME",
    "ADMIT_WEEKDAY",
    "pat_age", # is a class 01-22, 01-26
    "ADMITTING_DIAGNOSIS"
)

In [72]:
texas_df_selected_subset =\
texas_df_selected_subset.withColumn("PAT_AGE_CLASS",\
    when(pyspark.sql.functions.udf(is_integer)(texas_df_selected_subset["pat_age"]),\
         texas_df_selected_subset["pat_age"]).otherwise("-99")
)

AnalysisException: u"cannot resolve 'CASE WHEN is_integer(pat_age) THEN `pat_age` ELSE '-99' END' due to data type mismatch: WHEN expressions in CaseWhen should all be boolean type, but the 1th when expression's type is is_integer(pat_age#27);;\n'Project [LENGTH_OF_STAY#2402, PROVIDER_NAME#2, ADMIT_WEEKDAY#25, pat_age#27, ADMITTING_DIAGNOSIS#70, CASE WHEN is_integer(pat_age#27) THEN pat_age#27 ELSE -99 END AS PAT_AGE_CLASS#4516]\n+- Project [cast(LENGTH_OF_STAY#26 as float) AS LENGTH_OF_STAY#2402, PROVIDER_NAME#2, ADMIT_WEEKDAY#25, pat_age#27, ADMITTING_DIAGNOSIS#70]\n   +- Filter ((((((((NOT (LENGTH_OF_STAY#26 = *) && NOT (LENGTH_OF_STAY#26 = LENGTH_OF_STAY)) && NOT (pat_age#27 = *)) && NOT (pat_age#27 = **)) && NOT (pat_age#27 = ZZ)) && NOT (ADMIT_WEEKDAY#25 = null)) && NOT (ADMIT_WEEKDAY#25 = *)) && NOT (ADMIT_WEEKDAY#25 = RACE)) && NOT (ADMIT_WEEKDAY#25 = ADMIT_WEEKDAY))\n      +- Project [LENGTH_OF_STAY#26, PROVIDER_NAME#2, ADMIT_WEEKDAY#25, pat_age#27, ADMITTING_DIAGNOSIS#70]\n         +- Relation[discharge_qtr#0,thcic_id#1,PROVIDER_NAME#2,FAC_TEACHING_IND#3,FAC_PSYCH_IND#4,FAC_REHAB_IND#5,FAC_ACUTE_CARE_IND#6,FAC_SNF_IND#7,FAC_LONG_TERM_AC_IND#8,FAC_OTHER_LTC_IND#9,FAC_PEDS_IND#10,SPEC_UNIT_1#11,SPEC_UNIT_2#12,SPEC_UNIT_3#13,SPEC_UNIT_4#14,SPEC_UNIT_5#15,ENCOUNTER_INDICATOR#16,SEX_CODE#17,TYPE_OF_ADMISSION#18,SOURCE_OF_ADMISSION#19,PAT_STATE#20,PAT_ZIP#21,PAT_COUNTRY#22,county#23,... 236 more fields] csv\n"

__*Verify converstion*__

In [10]:
%timeit -n1 -r1 \
print("Covarience between LOS & Age is: {}".format(\
    texas_df_selected_subset.stat.cov("LENGTH_OF_STAY", "pat_age")))

Covarience between LOS & Age is: 5.54018762405
1 loop, best of 1: 1min 3s per loop


In [55]:
texas_df_selected_subset.describe("LENGTH_OF_STAY", "pat_age").show()

+-------+------------------+------------------+
|summary|    LENGTH_OF_STAY|           pat_age|
+-------+------------------+------------------+
|  count|          17649996|          17649996|
|   mean|2.2979328720527756|11.600564067726559|
| stddev| 9.559145399910616|4.8548219739708705|
|    min|               1.0|                00|
|    max|            9999.0|                WC|
+-------+------------------+------------------+



In [50]:
print("original total {}.\nprocessed total {}.".format(texas_df.count(), texas_df_selected_subset.count()))
print("original: {}.\nprocessed: {}"\
      .format(get_distinct_number(texas_df, 'pat_age'), 
             get_distinct_number(texas_df_selected_subset, 'pat_age')))

original total 18158133.
processed total 17909964.
original: 48.
processed: 28


In [61]:
get_topN_group(texas_df, 'pat_age').show(30)

+-------+-------+
|pat_age|  count|
+-------+-------+
|     MA|4610783|
|     MC|3310117|
|     12|1703555|
|     09|1196770|
|     HM|1115220|
|     CI| 995008|
|     BL| 957798|
|     16| 539789|
|     15| 393975|
|     00| 371497|
|     11| 296231|
|     **| 261029|
|     13| 228088|
|     18| 178597|
|     17| 170855|
|     CH| 161169|
|     07| 157008|
|     14| 155918|
|     19| 155492|
|     08| 155088|
|     ZZ| 152283|
|     10| 115089|
|     20|  94173|
|     MB|  69652|
|     OF|  63743|
|     23|  62185|
|     WC|  59674|
|     21|  57908|
|     06|  51705|
|     02|  50076|
+-------+-------+
only showing top 30 rows



## Verify Munging

__*Admit weekday*__

Should only have days 1-6

In [57]:
texas_df_selected_subset.groupBy("ADMIT_WEEKDAY")\
.agg(
    pyspark.sql.functions.mean("LENGTH_OF_STAY").alias("avg"),
    pyspark.sql.functions.stddev("LENGTH_OF_STAY").alias("stddev"),
    pyspark.sql.functions.count("LENGTH_OF_STAY").alias("count")
)\
.show()

+-------------+------------------+------------------+-------+
|ADMIT_WEEKDAY|               avg|            stddev|  count|
+-------------+------------------+------------------+-------+
|            7| 4.894237577829325|22.265765444919495| 245730|
|            3|2.6647023225901676| 10.59716782799657|2371275|
|            5|1.7696207153633128| 8.765847092424828|3472669|
|            6| 5.121894903833316|  7.74468810346812| 254662|
|            1| 4.368173355152989|15.498527644625481| 630013|
|            4|1.9790783037914266| 3.491743657288062|9926633|
|            2| 4.258737486882755|29.653987444012046| 749014|
+-------------+------------------+------------------+-------+



# Persisting Data

In [58]:
texas_df_selected_subset.toPandas().to_csv(
    path.join(getcwd(), "..", "data", "texas_df_selected_subsetelected_subset.csv"))

### Test stuff

In [5]:
cols = ['LENGTH_OF_STAY', 'pat_age']
texas_df.select(cols).show(2)

+--------------+-------+
|LENGTH_OF_STAY|pat_age|
+--------------+-------+
|          0002|     07|
|          0005|     12|
+--------------+-------+
only showing top 2 rows



In [20]:
texas_df_selected_subset\
    .filter(\
        (texas_df_selected_subset["ADMIT_WEEKDAY"] == 'null')|\
        (texas_df_selected_subset["ADMIT_WEEKDAY"] == '*'))\
    .withColumn("ADMIT_WEEKDAY", 
        when(\
            (texas_df_selected_subset["ADMIT_WEEKDAY"] == 'null')|\
            (texas_df_selected_subset["ADMIT_WEEKDAY"] == '*'), 'BOTH'\
            )\
    )\
    .select('ADMIT_WEEKDAY', 'LENGTH_OF_STAY').show(5)

+-------------+--------------+
|ADMIT_WEEKDAY|LENGTH_OF_STAY|
+-------------+--------------+
|         BOTH|           2.0|
|         BOTH|           2.0|
|         BOTH|           2.0|
|         BOTH|           2.0|
|         BOTH|           2.0|
+-------------+--------------+
only showing top 5 rows



In [45]:
texas_df.select("ETHNICITY").distinct()\
    .filter((texas_df["ETHNICITY"] == '1') | (texas_df["ETHNICITY"] == '2'))\
    .withColumn("ETHNICITY", when(texas_df["ETHNICITY"] == '1', 'one').otherwise("two")).show(20)

+---------+
|ETHNICITY|
+---------+
|      one|
|      two|
+---------+



In [59]:
texas_df_selected_subset.columns

['LENGTH_OF_STAY',
 'PROVIDER_NAME',
 'ADMIT_WEEKDAY',
 'pat_age',
 'ADMITTING_DIAGNOSIS']

In [67]:
test_df = spark.createDataFrame(sc.emptyRDD(), texas_df_selected_subset.schema)
print(test_df.columns)
test_df = test_df.withColumn("new_col", pyspark.sql.functions.lit(None))
print("New col:\n{}".format(test_df.columns))

['LENGTH_OF_STAY', 'PROVIDER_NAME', 'ADMIT_WEEKDAY', 'pat_age', 'ADMITTING_DIAGNOSIS']
New col:
['LENGTH_OF_STAY', 'PROVIDER_NAME', 'ADMIT_WEEKDAY', 'pat_age', 'ADMITTING_DIAGNOSIS', 'new_col']
