In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('usecase_4').getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
sc.setLogLevel('ERROR')

### Hospital Charges Data

In [7]:
hospital_df = spark.read.options(header=True, inferSchema=True).csv("inpatientCharges.csv")

In [9]:
hospital_df.show(3)

+--------------------+----------+--------------------+---------------------+------------+-------------+---------------+---------------------------------+---------------+---------------------+--------------------+-----------------------+
|       DRGDefinition|ProviderId|        ProviderName|ProviderStreetAddress|ProviderCity|ProviderState|ProviderZipCode|HospitalReferralRegionDescription|TotalDischarges|AverageCoveredCharges|AverageTotalPayments|AverageMedicarePayments|
+--------------------+----------+--------------------+---------------------+------------+-------------+---------------+---------------------------------+---------------+---------------------+--------------------+-----------------------+
|039 - EXTRACRANIA...|     10001|SOUTHEAST ALABAMA...| 1108 ROSS CLARK C...|      DOTHAN|           AL|          36301|                      AL - Dothan|             91|             32963.07|             5777.24|                4763.73|
|039 - EXTRACRANIA...|     10005|MARSHALL MEDICAL ..

In [11]:
hospital_df.printSchema()

root
 |-- DRGDefinition: string (nullable = true)
 |-- ProviderId: integer (nullable = true)
 |-- ProviderName: string (nullable = true)
 |-- ProviderStreetAddress: string (nullable = true)
 |-- ProviderCity: string (nullable = true)
 |-- ProviderState: string (nullable = true)
 |-- ProviderZipCode: integer (nullable = true)
 |-- HospitalReferralRegionDescription: string (nullable = true)
 |-- TotalDischarges: integer (nullable = true)
 |-- AverageCoveredCharges: double (nullable = true)
 |-- AverageTotalPayments: double (nullable = true)
 |-- AverageMedicarePayments: double (nullable = true)



In [12]:
hospital_df.count()

163065

### Problem Statement 1: Find the amount of Average Covered Charges per state.

In [17]:
hospital_df.groupBy('ProviderState').agg({'AverageCoveredCharges':'avg'}).show(5)

+-------------+--------------------------+
|ProviderState|avg(AverageCoveredCharges)|
+-------------+--------------------------+
|           AZ|         41200.06301999297|
|           SC|         35862.49456269757|
|           LA|        33085.372791542744|
|           MN|         27894.36182060391|
|           NJ|         66125.68627434727|
+-------------+--------------------------+
only showing top 5 rows



### Problem Statement 2: Find the amount of Average Total Payments charges per state.

In [18]:
hospital_df.groupBy('ProviderState').agg({'AverageTotalPayments':'avg'}).show(5)

+-------------+-------------------------+
|ProviderState|avg(AverageTotalPayments)|
+-------------+-------------------------+
|           AZ|       10154.528211153982|
|           SC|         9132.42075869336|
|           LA|        8638.662576808716|
|           MN|         9948.23696269982|
|           NJ|       10678.988646912556|
+-------------+-------------------------+
only showing top 5 rows



### Problem Statement 3: Find the amount of Average Medicare Payments charges per state.

In [19]:
hospital_df.groupBy('ProviderState').agg({'AverageMedicarePayments':'avg'}).show(5)

+-------------+----------------------------+
|ProviderState|avg(AverageMedicarePayments)|
+-------------+----------------------------+
|           AZ|           8825.717239565063|
|           SC|           7876.331524411663|
|           LA|           7387.704625041294|
|           MN|            8619.21498223801|
|           NJ|            9586.94005594695|
+-------------+----------------------------+
only showing top 5 rows



### Problem Statement 4: Find out the total number of Discharges per state and for each disease.

In [21]:
hospital_df.groupBy('ProviderState','DRGDefinition').agg({'TotalDischarges':'sum'}).show(5)

+-------------+--------------------+--------------------+
|ProviderState|       DRGDefinition|sum(TotalDischarges)|
+-------------+--------------------+--------------------+
|           KY|065 - INTRACRANIA...|                1937|
|           NY|101 - SEIZURES W/...|                4503|
|           IN|149 - DYSEQUILIBRIUM|                 700|
|           IA|178 - RESPIRATORY...|                 540|
|           WI|202 - BRONCHITIS ...|                 338|
+-------------+--------------------+--------------------+
only showing top 5 rows



In [22]:
from pyspark.sql.functions import desc,col

##### Sort By Asc

In [29]:
hospital_df.groupBy('ProviderState','DRGDefinition').agg({'TotalDischarges':'sum'}).sort('sum(TotalDischarges)').show(5)

+-------------+--------------------+--------------------+
|ProviderState|       DRGDefinition|sum(TotalDischarges)|
+-------------+--------------------+--------------------+
|           MT|251 - PERC CARDIO...|                  11|
|           UT|563 - FX, SPRN, S...|                  11|
|           ID|305 - HYPERTENSIO...|                  11|
|           WY|202 - BRONCHITIS ...|                  11|
|           AK|419 - LAPAROSCOPI...|                  11|
+-------------+--------------------+--------------------+
only showing top 5 rows



##### Sort by Desc

In [32]:
hospital_df.groupBy('ProviderState','DRGDefinition').agg({'TotalDischarges':'sum'}).orderBy(desc('sum(TotalDischarges)')).show(5)

+-------------+--------------------+--------------------+
|ProviderState|       DRGDefinition|sum(TotalDischarges)|
+-------------+--------------------+--------------------+
|           CA|871 - SEPTICEMIA ...|               34284|
|           TX|470 - MAJOR JOINT...|               30095|
|           FL|470 - MAJOR JOINT...|               29985|
|           CA|470 - MAJOR JOINT...|               29731|
|           TX|871 - SEPTICEMIA ...|               23144|
+-------------+--------------------+--------------------+
only showing top 5 rows



### Closing spark session

In [33]:
spark.stop()