In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('analysis').getOrCreate()
sc = spark.sparkContext


# Load all dataframes

# Fact table
discharges_df = spark.read.parquet("s3://udacity-data-engineering-nanodegree-capstone-project/tables/discharges.parquet")

# Dimension tables
facilities_df = spark.read.parquet("s3://udacity-data-engineering-nanodegree-capstone-project/tables/medical_facility.parquet")
apr_drg_code_df = spark.read.parquet("s3://udacity-data-engineering-nanodegree-capstone-project/tables/apr_drg_code.parquet")
apr_mdc_code_df = spark.read.parquet("s3://udacity-data-engineering-nanodegree-capstone-project/tables/apr_mdc_code.parquet")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1669873483530_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### What are the top three ZIP codes in medical costs?

In [2]:
df = discharges_df.join(facilities_df, discharges_df.facility_id == facilities_df.id)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
df = discharges_df.join(facilities_df, discharges_df.facility_id == facilities_df.id)

df.groupBy("zip_code")\
    .agg(\
        F.sum("total_costs").alias("zip_code_total_costs"),
        F.countDistinct("facility_id").alias("number_of_facilities")
    )\
    .select("zip_code", "number_of_facilities", "zip_code_total_costs")\
    .orderBy(F.col("zip_code_total_costs").desc())\
    .limit(3)\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+--------------------+
|zip_code|number_of_facilities|zip_code_total_costs|
+--------+--------------------+--------------------+
|   10021|                   4|2.1432856285000007E9|
|   10467|                   2|1.4222678575199995E9|
|   11030|                   1| 1.283856685279985E9|
+--------+--------------------+--------------------+

### What DRG + MDC results in the most charges to patients?

In [4]:
df = discharges_df.join(apr_drg_code_df, discharges_df.apr_drg_code == apr_drg_code_df.code).withColumnRenamed("description", "drg_description")\
    .join(apr_mdc_code_df, discharges_df.apr_mdc_code == apr_mdc_code_df.code).withColumnRenamed("description", "mdc_description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df.groupBy("apr_drg_code")\
    .agg(\
        F.sum("total_charges").alias("total_drg_charges"),
        F.first("drg_description").alias("drg_description"),
        F.first("mdc_description").alias("mdc_description")
    )\
    .select("apr_drg_code", "drg_description", "mdc_description", "total_drg_charges")\
    .orderBy(F.col("total_drg_charges").desc())\
    .limit(1)\
    .show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------------------------+----------------------------------------------------------------+--------------------+
|apr_drg_code|drg_description                       |mdc_description                                                 |total_drg_charges   |
+------------+--------------------------------------+----------------------------------------------------------------+--------------------+
|720         |SEPTICEMIA AND DISSEMINATED INFECTIONS|Infectious and Parasitic Diseases, Systemic or Unspecified Sites|2.6519396024100065E9|
+------------+--------------------------------------+----------------------------------------------------------------+--------------------+

### What is the most prevalent DRG at each medical facility?

In [6]:
df = discharges_df.join(apr_drg_code_df, discharges_df.apr_drg_code == apr_drg_code_df.code).withColumnRenamed("description", "drg_description")\
    .join(apr_mdc_code_df, discharges_df.apr_mdc_code == apr_mdc_code_df.code).withColumnRenamed("description", "mdc_description")\
    .join(facilities_df, discharges_df.facility_id == facilities_df.id)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df.groupBy("facility_id", "apr_drg_code")\
    .agg(\
        F.count('*').alias("discharge_count"),
        F.first("drg_description").alias("drg_description"),
        F.first("name").alias("facility_name")
    )\
    .orderBy(F.col("discharge_count").desc())\
    .groupBy("facility_id")\
    .agg(\
         F.first("apr_drg_code").alias("apr_drg_code"),
         F.first("discharge_count").alias("discharge_count"),
         F.first("drg_description").alias("drg_description"),
         F.first("facility_name").alias("facility_name")
    )\
    .select("facility_name", "apr_drg_code", "drg_description", "discharge_count")\
    .orderBy(F.col("facility_name").desc())\
    .show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------------------------------------------------------------------+------------+-------------------------------------------------------------------------------+---------------+
|facility_name                                                                              |apr_drg_code|drg_description                                                                |discharge_count|
+-------------------------------------------------------------------------------------------+------------+-------------------------------------------------------------------------------+---------------+
|Wyoming County Community Hospital                                                          |140         |CHRONIC OBSTRUCTIVE PULMONARY DISEASE                                          |147            |
|Wyckoff Heights Medical Center                                                             |204         |SYNCOPE AND COLLAPSE                                                           |30