# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::268701953255:role/glue-service-role1
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 52f93799-b6d4-46cc-97ad-957486d6b19e
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 52f93799-b6d

### Importing functions and types

In [2]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, MapType




### Defining schema, to be used in reading the json files

In [3]:
schema = StructType([
    StructField('Patient', StringType(), True),
    StructField('Identifiers and Credentials',StringType(), True),
    StructField('Applicable Resources', ArrayType(StringType()), True)
    ])




### Reading data from s3://sample-bucket-hek/fhir_test_patients/

In [4]:
sample_path = 's3://sample-bucket-hek/fhir_test_patients/'
df = spark.read.schema(schema) \
      .option("multiline","true") \
      .json(sample_path)




### Display dataframe, read from json

In [5]:
df.show(100, truncate=False)

+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
|Patient       |Identifiers and Credentials                                                                                                                    |Applicable Resources                                                                                                                    |
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
|Camila Lopez  |{"FHIR":"erXuFYUfucBZaryVksYEcMg3","External":"Z6129","MRN":203713,"MyChart Login Username

### Converting 'Identifiers and Credentials' column of json string to map type

In [7]:
df2=df.withColumn("Identifiers and Credentials",from_json(col("Identifiers and Credentials"),MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)

root
 |-- Patient: string (nullable = true)
 |-- Identifiers and Credentials: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- Applicable Resources: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
|Patient       |Identifiers and Credentials                                                                                                                     |Applicable Resources                                                                                                                    |
+--------------+-----------------------------------------------------------------------------------------------------------------

### Flatten maptype column to individual columns

In [18]:
df3 = df2.withColumn("Identifiers_and_Credentials_FHIR", col("Identifiers and Credentials").getItem("FHIR")) \
        .withColumn("Identifiers_and_Credentials_External", col("Identifiers and Credentials").getItem("External")) \
        .withColumn("Identifiers_and_Credentials_MRN", col("Identifiers and Credentials").getItem("MRN")) \
        .withColumn("Identifiers_and_Credentials_MyChart_Login_Username", col("Identifiers and Credentials").getItem("MyChart Login Username")) \
        .withColumn("Identifiers_and_Credentials_MyChart_Login_Password", col("Identifiers and Credentials").getItem("MyChart Login Password")) \
        .drop(col("Identifiers and Credentials"))

#Convert arraytype column Applicable Resources to string, as csv does not support arraytype columns
#AnalysisException: CSV data source does not support array<string> data type.
df4 = df3.withColumn("Applicable Resources", col("Applicable Resources").cast("String"))




### Display final dataframe before saving as csv

In [19]:
df4.select("Patient", "Identifiers_and_Credentials_FHIR", "Identifiers_and_Credentials_External", 
           "Identifiers_and_Credentials_MRN", "Identifiers_and_Credentials_MyChart_Login_Username", 
           "Identifiers_and_Credentials_MyChart_Login_Password", "Applicable Resources").show(100, truncate=False)

+--------------+--------------------------------+------------------------------------+-------------------------------+--------------------------------------------------+--------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
|Patient       |Identifiers_and_Credentials_FHIR|Identifiers_and_Credentials_External|Identifiers_and_Credentials_MRN|Identifiers_and_Credentials_MyChart_Login_Username|Identifiers_and_Credentials_MyChart_Login_Password|Applicable Resources                                                                                                                    |
+--------------+--------------------------------+------------------------------------+-------------------------------+--------------------------------------------------+--------------------------------------------------+----------------------------------------------------------------

### Save dataframe to csv

In [22]:
output_path = 's3://sample-bucket-hek/fhir_output/'
df4.select("Patient", "Identifiers_and_Credentials_FHIR", "Identifiers_and_Credentials_External", 
           "Identifiers_and_Credentials_MRN", "Identifiers_and_Credentials_MyChart_Login_Username", 
           "Identifiers_and_Credentials_MyChart_Login_Password", "Applicable Resources")\
    .coalesce(1) \
    .write.option("header",True) \
     .csv(output_path)


