In [0]:
#  Copyright 2016-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#  SPDX-License-Identifier: MIT-0

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

glueContext = GlueContext(SparkContext.getOrCreate())



Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Authenticating with profile=default
glue_role_arn defined by user: arn:aws:iam::599471228997:role/Deploy-GlueJob-cdkGlueRole9180D9F4-IXF8M060FV55
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 56f3351f-9980-4f9c-b2f5-f30814f51d3c
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
Waiting for session 56f3351f-9980-4f9c-b2f5-f30814f51d3c to get into ready status...
Session 56f3351f-9980-4f9c-b2f5-f30814f51d3c has been created




In [1]:

# Data Catalog: database and table name
db_name = "payments"
tbl_name = "medicare"

# args = getResolvedOptions(sys.argv, ['output_bucket'])

try:
    args = getResolvedOptions(sys.argv, ['output_bucket'])
    output_dir = args['output_bucket']
except:
    # S3 location for output
    output_dir = 's3://deploy-gluejob-outputbucketf8672071-gg1mjydm1p1m/'

# Read data into a DynamicFrame using the Data Catalog metadata
medicare_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name)

# The `provider id` field will be choice between long and string

# Cast choices into integers, those values that cannot cast result in null
medicare_res = medicare_dyf.resolveChoice(specs = [('provider id','cast:long')])

# Remove erroneous records
medicare_df = medicare_res.toDF()
medicare_df = medicare_df.where("`provider id` is NOT NULL")

# Apply a lambda to remove the '$'
chop_f = udf(lambda x: x[1:], StringType())
medicare_df = medicare_df.withColumn("ACC", chop_f(medicare_df["average covered charges"])).withColumn("ATP", chop_f(medicare_df["average total payments"])).withColumn("AMP", chop_f(medicare_df["average medicare payments"]))

# Turn it back to a dynamic frame
medicare_tmp = DynamicFrame.fromDF(medicare_df, glueContext, "nested")

# Rename, cast, and nest with apply_mapping
medicare_nest = medicare_tmp.apply_mapping([('drg definition', 'string', 'drg', 'string'), 
                             ('provider id', 'long', 'provider.id', 'long'),
                             ('provider name', 'string', 'provider.name', 'string'),
                             ('provider city', 'string', 'provider.city', 'string'),
                             ('provider state', 'string', 'provider.state', 'string'),
                             ('provider zip code', 'long', 'provider.zip', 'long'),
                             ('hospital referral region description', 'string','rr', 'string'),
                             ('ACC', 'string', 'charges.covered', 'double'),
                             ('ATP', 'string', 'charges.total_pay', 'double'),
                             ('AMP', 'string', 'charges.medicare_pay', 'double')])





In [2]:
medicare_nest.show()


{"drg": "039 - EXTRACRANIAL PROCEDURES W/O CC/MCC", "provider": {"id": 10001, "name": "SOUTHEAST ALABAMA MEDICAL CENTER", "city": "DOTHAN", "state": "AL", "zip": 36301}, "rr": "AL - Dothan", "charges": {"covered": 32963.07, "total_pay": 5777.24, "medicare_pay": 4763.73}}
{"drg": "039 - EXTRACRANIAL PROCEDURES W/O CC/MCC", "provider": {"id": 10005, "name": "MARSHALL MEDICAL CENTER SOUTH", "city": "BOAZ", "state": "AL", "zip": 35957}, "rr": "AL - Birmingham", "charges": {"covered": 15131.85, "total_pay": 5787.57, "medicare_pay": 4976.71}}
{"drg": "039 - EXTRACRANIAL PROCEDURES W/O CC/MCC", "provider": {"id": 10006, "name": "ELIZA COFFEE MEMORIAL HOSPITAL", "city": "FLORENCE", "state": "AL", "zip": 35631}, "rr": "AL - Birmingham", "charges": {"covered": 37560.37, "total_pay": 5434.95, "medicare_pay": 4453.79}}
{"drg": "039 - EXTRACRANIAL PROCEDURES W/O CC/MCC", "provider": {"id": 10011, "name": "ST VINCENT'S EAST", "city": "BIRMINGHAM", "state": "AL", "zip": 35235}, "rr": "AL - Birmingham

In [5]:
# Write it out in Parquet
glueContext.write_dynamic_frame.from_options(frame = medicare_nest, connection_type = "s3", connection_options = {"path": output_dir}, format = "glueparquet")


<awsglue.dynamicframe.DynamicFrame object at 0x7f15c8467350>
