In [30]:
import sys
import time

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

sc = SparkContext.getOrCreate()

hc = sc._jsc.hadoopConfiguration()
hc.set("fs.s3a.awsAccessKeyId", "minio")
hc.set("fs.s3a.secret.key", "minio123")
hc.set("fs.s3a.endpoint", "http://172.20.0.3:9000/")
hc.set("fs.s3a.connection.ssl.enabled","false")

glueContext = GlueContext(sc)
spark = glueContext.spark_session

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df = spark.read.option("header","true").csv("s3a://awsglue-datasets/Medicare_Hospital_Provider.csv")
df.printSchema()
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: string (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : string (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|      DRG Definition|Provider Id|       Provider Name|Provider Street Address|Provider City|Provider State|Provider 

In [36]:
medicare_dynamicframe = DynamicFrame.fromDF(df, glueContext, 'medicare')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
medicare_dynamicframe.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- DRG Definition: string
|-- Provider Id: string
|-- Provider Name: string
|-- Provider Street Address: string
|-- Provider City: string
|-- Provider State: string
|-- Provider Zip Code: string
|-- Hospital Referral Region Description: string
|--  Total Discharges : string
|--  Average Covered Charges : string
|--  Average Total Payments : string
|-- Average Medicare Payments: string

In [40]:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('Provider Id','cast:long')])
medicare_res.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- DRG Definition: string
|-- Provider Id: long
|-- Provider Name: string
|-- Provider Street Address: string
|-- Provider City: string
|-- Provider State: string
|-- Provider Zip Code: string
|-- Hospital Referral Region Description: string
|--  Total Discharges : string
|--  Average Covered Charges : string
|--  Average Total Payments : string
|-- Average Medicare Payments: string

In [42]:
medicare_res.toDF().where("`provider id` is NULL").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|      DRG Definition|Provider Id|  Provider Name|Provider Street Address|Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|                12|                $11961.41|                $4619.00|       

In [43]:
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("`provider id` is NOT NULL")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe[" Average Covered Charges "])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe[" Average Total Payments "])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["Average Medicare Payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows

In [50]:
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- drg: string
|-- provider: struct
|    |-- id: long
|    |-- name: string
|    |-- city: string
|    |-- state: string
|    |-- zip: long
|-- rr: string
|-- charges: struct
|    |-- covered: double
|    |-- total_pay: double
|    |-- medicare_pay: double

In [51]:
medicare_nest_dyf.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001, SOUTHEAST...|    AL - Dothan|[32963.07, 5777.2...|
|039 - EXTRACRANIA...|[10005, MARSHALL ...|AL - Birmingham|[15131.85, 5787.5...|
|039 - EXTRACRANIA...|[10006, ELIZA COF...|AL - Birmingham|[37560.37, 5434.9...|
|039 - EXTRACRANIA...|[10011, ST VINCEN...|AL - Birmingham|[13998.28, 5417.5...|
|039 - EXTRACRANIA...|[10016, SHELBY BA...|AL - Birmingham|[31633.27, 5658.3...|
|039 - EXTRACRANIA...|[10023, BAPTIST M...|AL - Montgomery|[16920.79, 6653.8...|
|039 - EXTRACRANIA...|[10029, EAST ALAB...|AL - Birmingham|[11977.13, 5834.7...|
|039 - EXTRACRANIA...|[10033, UNIVERSIT...|AL - Birmingham|[35841.09, 8031.1...|
|039 - EXTRACRANIA...|[10039, HUNTSVILL...|AL - Huntsville|[28523.39, 6113.3...|
|039 - EXTRACRANIA...|[10040

In [54]:
spark_df = medicare_nest_dyf.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
spark_df.write.format("orc").save("s3a://awsglue-datasets/output-dir/medicare.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…