In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
conf = SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
sc = SparkContext(conf=conf)

# import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 pyspark-shell'

spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test1.Inpatient_Charges") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test1.Inpatient_Charges") \
.getOrCreate()

In [2]:
df = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()
#df.write.format('com.mongodb.spark.sql.DefaultSource').mode('append').save()

In [3]:
df.count()

163073

In [62]:
df = df.na.drop()

In [63]:
df.count()

156750

In [64]:
# remove the spaces from the dataSet
from pyspark.sql.functions import trim,col
df = df.withColumn("drg_definition",trim(col('drg_definition')))
df.select(['drg_definition']).distinct().show(1000,False)

+--------------------------------------------------------------------------+
|drg_definition                                                            |
+--------------------------------------------------------------------------+
|811 - RED BLOOD CELL DISORDERS W MCC                                      |
|329 - MAJOR SMALL & LARGE BOWEL PROCEDURES W MCC                          |
|191 - CHRONIC OBSTRUCTIVE PULMONARY DISEASE W CC                          |
|683 - RENAL FAILURE W CC                                                  |
|918 - POISONING & TOXIC EFFECTS OF DRUGS W/O MCC                          |
|481 - HIP & FEMUR PROCEDURES EXCEPT MAJOR JOINT W CC                      |
|"282 - ACUTE MYOCARDIAL INFARCTION                                        |
|249 - PERC CARDIOVASC PROC W NON-DRUG-ELUTING STENT W/O MCC               |
|390 - G.I. OBSTRUCTION W/O CC/MCC                                         |
|300 - PERIPHERAL VASCULAR DISORDERS W CC                                  |

In [65]:
df.createOrReplaceTempView('df')


In [66]:
df.groupBy('drg_definition').max().show(10, False)

+-----------------------------------------------------------+----------------------------+------------------------------+---------------------------+
|drg_definition                                             |max(average_covered_charges)|max(average_medicare_payments)|max(average_total_payments)|
+-----------------------------------------------------------+----------------------------+------------------------------+---------------------------+
|811 - RED BLOOD CELL DISORDERS W MCC                       |141750                      |24099                         |65772                      |
|329 - MAJOR SMALL & LARGE BOWEL PROCEDURES W MCC           |557900                      |100040                        |381799                     |
|191 - CHRONIC OBSTRUCTIVE PULMONARY DISEASE W CC           |112726                      |14690                         |66841                      |
|683 - RENAL FAILURE W CC                                   |111516                      |15861     

In [67]:
# converting the average covered charges , average total payments, average medicare payments to numeric
from pyspark.sql.types import IntegerType
df = df.withColumn("average_covered_charges", df["average_covered_charges"].cast(IntegerType()))
df = df.withColumn("average_total_payments", df["average_total_payments"].cast(IntegerType()))
df = df.withColumn("average_medicare_payments", df["average_medicare_payments"].cast(IntegerType()))

In [68]:
df.select(['drg_definition']).distinct().show()

+--------------------+
|      drg_definition|
+--------------------+
|811 - RED BLOOD C...|
|329 - MAJOR SMALL...|
|191 - CHRONIC OBS...|
|683 - RENAL FAILU...|
|918 - POISONING &...|
|481 - HIP & FEMUR...|
|"282 - ACUTE MYOC...|
|249 - PERC CARDIO...|
|390 - G.I. OBSTRU...|
|300 - PERIPHERAL ...|
|057 - DEGENERATIV...|
|190 - CHRONIC OBS...|
|243 - PERMANENT C...|
|252 - OTHER VASCU...|
|314 - OTHER CIRCU...|
|207 - RESPIRATORY...|
|247 - PERC CARDIO...|
|690 - KIDNEY & UR...|
|292 - HEART FAILU...|
|482 - HIP & FEMUR...|
+--------------------+
only showing top 20 rows



In [69]:
# from pyspark.sql.functions import mean, stddev, Column, monotonically_increasing_id
# This is find the min , max and stddev
import pyspark.sql.functions as f
newdf = df.groupBy('drg_definition').agg(f.max('average_total_payments').alias('max_average_total_payments'), 
                                         f.min('average_total_payments').alias('min_average_total_payments'),
                                         f.mean('average_total_payments').alias('mean_average_total_payments'),
                                         f.stddev('average_total_payments').alias('std_average_total_payments'),
                                         f.variance('average_total_payments').alias('var_average_total_payments'))



In [87]:
# Finding the most expensive DRG 
newdf.createOrReplaceTempView('newdf')
test = spark.sql("select drg_definition, mean_average_total_payments from newdf order by mean_average_total_payments desc")
test.show(10,False)

+-----------------------------------------------------------------+---------------------------+
|drg_definition                                                   |mean_average_total_payments|
+-----------------------------------------------------------------+---------------------------+
|"286 - CIRCULATORY DISORDERS EXCEPT AMI                          |59938.14435389989          |
|870 - SEPTICEMIA OR SEVERE SEPSIS W MV 96+ HOURS                 |50283.62579957356          |
|"280 - ACUTE MYOCARDIAL INFARCTION                               |46157.79531511433          |
|853 - INFECTIOUS & PARASITIC DISEASES W O.R. PROCEDURE W MCC     |44479.98981818182          |
|207 - RESPIRATORY SYSTEM DIAGNOSIS W VENTILATOR SUPPORT 96+ HOURS|43614.84595524957          |
|329 - MAJOR SMALL & LARGE BOWEL PROCEDURES W MCC                 |41660.656949152544         |
|"287 - CIRCULATORY DISORDERS EXCEPT AMI                          |35336.166868932036         |
|"391 - ESOPHAGITIS                     

In [71]:
#Most expensive DRGs also have the highest variance in costs, following is the most costly 
test = spark.sql("select drg_definition,var_average_total_payments from newdf order by var_average_total_payments desc")
test.show(10,False)

+-----------------------------------------------------------------+--------------------------+
|drg_definition                                                   |var_average_total_payments|
+-----------------------------------------------------------------+--------------------------+
|"286 - CIRCULATORY DISORDERS EXCEPT AMI                          |1.1923660901492999E9      |
|870 - SEPTICEMIA OR SEVERE SEPSIS W MV 96+ HOURS                 |1.1004698844500055E9      |
|207 - RESPIRATORY SYSTEM DIAGNOSIS W VENTILATOR SUPPORT 96+ HOURS|8.247975466506679E8       |
|"280 - ACUTE MYOCARDIAL INFARCTION                               |7.62396875188549E8        |
|853 - INFECTIOUS & PARASITIC DISEASES W O.R. PROCEDURE W MCC     |7.047162381862136E8       |
|329 - MAJOR SMALL & LARGE BOWEL PROCEDURES W MCC                 |5.72318240735696E8        |
|"391 - ESOPHAGITIS                                               |3.056253794230354E8       |
|"281 - ACUTE MYOCARDIAL INFARCTION               

In [72]:
# Most expensive DRGs also have the highest range in costs
newdf = newdf.withColumn("Range",col("max_average_total_payments") -col("min_average_total_payments"))
newdf.show(10, False)


+-----------------------------------------------------------+--------------------------+--------------------------+---------------------------+--------------------------+--------------------------+------+
|drg_definition                                             |max_average_total_payments|min_average_total_payments|mean_average_total_payments|std_average_total_payments|var_average_total_payments|Range |
+-----------------------------------------------------------+--------------------------+--------------------------+---------------------------+--------------------------+--------------------------+------+
|811 - RED BLOOD CELL DISORDERS W MCC                       |65772                     |6111                      |10264.377973568282         |5572.860274350443         |3.10567716374333E7        |59661 |
|329 - MAJOR SMALL & LARGE BOWEL PROCEDURES W MCC           |381799                    |20298                     |41660.656949152544         |23923.17371787648         |5.72318240

In [73]:
from pyspark.sql.functions import desc
newdf.sort(desc("Range")).show()


+--------------------+--------------------------+--------------------------+---------------------------+--------------------------+--------------------------+------+
|      drg_definition|max_average_total_payments|min_average_total_payments|mean_average_total_payments|std_average_total_payments|var_average_total_payments| Range|
+--------------------+--------------------------+--------------------------+---------------------------+--------------------------+--------------------------+------+
|870 - SEPTICEMIA ...|                    447714|                     20536|          50283.62579957356|          33173.3309218415|      1.1004698844500055E9|427178|
|207 - RESPIRATORY...|                    414555|                     22311|          43614.84595524957|        28719.288756002785|       8.247975466506679E8|392244|
|853 - INFECTIOUS ...|                    391446|                     22112|          44479.98981818182|        26546.492012810537|       7.047162381862136E8|369334|
|329

In [74]:
# Which states are the most expensive for each treatment?
df.createOrReplaceTempView('df')
newdf.createOrReplaceTempView('newdf')
#query1 = spark.sql("SELECT `drg_definition`, `provider_state`, t.maxAvgPaymentPerDRG  FROM df hc INNER JOIN 
#                   (SELECT `drg_definition`, 'max_average_total_payments' FROM newdf GROUP BY `DRG Definition`) t 
 #                  ON hc.`drg_definition` == t.drg AND hc.`average_total_payments` == t.average_total_payments ORDER BY provider_state ASC")

query1 = spark.sql('''SELECT `drg_definition`, `provider_state` as providerState, t.maxAvgPaymentPerDRG FROM df hc 
                   INNER JOIN(SELECT `drg_definition` as drg, MAX('average_total_payments') as maxAvgPaymentPerDRG FROM newdf GROUP BY `drg_definition`) t 
                   ON hc.`drg_definition` == t.drg AND hc.`average_total_payments` == t.maxAvgPaymentPerDRG ORDER BY providerState ASC''')

query1.show()









+--------------+-------------+-------------------+
|drg_definition|providerState|maxAvgPaymentPerDRG|
+--------------+-------------+-------------------+
+--------------+-------------+-------------------+



In [7]:
df.columns

['_id',
 'average_covered_charges',
 'average_medicare_payments',
 'average_total_payments',
 'drg_definition',
 'hospital_referral_region_description',
 'provider_city',
 'provider_id',
 'provider_name',
 'provider_state',
 'provider_street_address',
 'provider_zip_code',
 'total_discharges']

In [88]:

q = spark.sql('''SELECT drg_definition as drg, max_average_total_payment FROM newdf GROUP BY drg_definition''')
q.show(100, False)

AnalysisException: "cannot resolve '`max_average_total_payment`' given input columns: [newdf.std_average_total_payments, newdf.mean_average_total_payments, newdf.max_average_total_payments, newdf.drg_definition, newdf.min_average_total_payments, newdf.var_average_total_payments, newdf.Range]; line 1 pos 30;\n'Aggregate [drg_definition#826], [drg_definition#826 AS drg#1319, 'max_average_total_payment]\n+- SubqueryAlias `newdf`\n   +- Project [drg_definition#826, max_average_total_payments#948, min_average_total_payments#950, mean_average_total_payments#952, std_average_total_payments#962, var_average_total_payments#972, (max_average_total_payments#948 - min_average_total_payments#950) AS Range#1085]\n      +- Aggregate [drg_definition#826], [drg_definition#826, max(average_total_payments#901) AS max_average_total_payments#948, min(average_total_payments#901) AS min_average_total_payments#950, avg(cast(average_total_payments#901 as bigint)) AS mean_average_total_payments#952, stddev_samp(cast(average_total_payments#901 as double)) AS std_average_total_payments#962, var_samp(cast(average_total_payments#901 as double)) AS var_average_total_payments#972]\n         +- Project [_id#0, average_covered_charges#887, cast(average_medicare_payments#140 as int) AS average_medicare_payments#915, average_total_payments#901, drg_definition#826, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n            +- Project [_id#0, average_covered_charges#887, average_medicare_payments#140, cast(average_total_payments#126 as int) AS average_total_payments#901, drg_definition#826, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n               +- Project [_id#0, cast(average_covered_charges#112 as int) AS average_covered_charges#887, average_medicare_payments#140, average_total_payments#126, drg_definition#826, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                  +- Project [_id#0, average_covered_charges#112, average_medicare_payments#140, average_total_payments#126, trim(drg_definition#758, None) AS drg_definition#826, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                     +- Filter AtLeastNNulls(n, _id#0,average_covered_charges#112,average_medicare_payments#140,average_total_payments#126,drg_definition#758,hospital_referral_region_description#5,provider_city#6,provider_id#7,provider_name#8,provider_state#9,provider_street_address#10,provider_zip_code#11,total_discharges#12)\n                        +- Project [_id#0, average_covered_charges#112, average_medicare_payments#140, average_total_payments#126, trim(drg_definition#75, None) AS drg_definition#758, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                           +- Project [_id#0, average_covered_charges#112, cast(average_medicare_payments#2 as int) AS average_medicare_payments#140, average_total_payments#126, drg_definition#75, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                              +- Project [_id#0, average_covered_charges#112, average_medicare_payments#2, cast(average_total_payments#3 as int) AS average_total_payments#126, drg_definition#75, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                                 +- Project [_id#0, cast(average_covered_charges#1 as int) AS average_covered_charges#112, average_medicare_payments#2, average_total_payments#3, drg_definition#75, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                                    +- Project [_id#0, average_covered_charges#1, average_medicare_payments#2, average_total_payments#3, trim(drg_definition#4, None) AS drg_definition#75, hospital_referral_region_description#5, provider_city#6, provider_id#7, provider_name#8, provider_state#9, provider_street_address#10, provider_zip_code#11, total_discharges#12]\n                                       +- Filter AtLeastNNulls(n, _id#0,average_covered_charges#1,average_medicare_payments#2,average_total_payments#3,drg_definition#4,hospital_referral_region_description#5,provider_city#6,provider_id#7,provider_name#8,provider_state#9,provider_street_address#10,provider_zip_code#11,total_discharges#12)\n                                          +- Relation[_id#0,average_covered_charges#1,average_medicare_payments#2,average_total_payments#3,drg_definition#4,hospital_referral_region_description#5,provider_city#6,provider_id#7,provider_name#8,provider_state#9,provider_street_address#10,provider_zip_code#11,total_discharges#12] MongoRelation(MongoRDD[0] at RDD at MongoRDD.scala:51,Some(StructType(StructField(_id,StructType(StructField(oid,StringType,true)),true), StructField(average_covered_charges,StringType,true), StructField(average_medicare_payments,StringType,true), StructField(average_total_payments,StringType,true), StructField(drg_definition,StringType,true), StructField(hospital_referral_region_description,StringType,true), StructField(provider_city,StringType,true), StructField(provider_id,StringType,true), StructField(provider_name,StringType,true), StructField(provider_state,StringType,true), StructField(provider_street_address,StringType,true), StructField(provider_zip_code,StringType,true), StructField(total_discharges,StringType,true))))\n"

In [77]:
newdf.show()

+--------------------+--------------------------+--------------------------+---------------------------+--------------------------+--------------------------+------+
|      drg_definition|max_average_total_payments|min_average_total_payments|mean_average_total_payments|std_average_total_payments|var_average_total_payments| Range|
+--------------------+--------------------------+--------------------------+---------------------------+--------------------------+--------------------------+------+
|811 - RED BLOOD C...|                     65772|                      6111|         10264.377973568282|         5572.860274350443|        3.10567716374333E7| 59661|
|329 - MAJOR SMALL...|                    381799|                     20298|         41660.656949152544|         23923.17371787648|        5.72318240735696E8|361501|
|191 - CHRONIC OBS...|                     66841|                      4532|         7228.5399337504605|         3939.685255258306|      1.5521119910499701E7| 62309|
|683