# ETL Job on AWS GLUE

This script was developed using AWS Glue and Python 3.6. It is used to extract data from S3, transform it and load it back to S3.
It uses the remote Spark environment provided by AWS Glue.

In [5]:
%iam_role arn:aws:iam::891377119959:role/glue-synth-medical
%region us-east-1

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Current iam_role is None
iam_role has been set to arn:aws:iam::891377119959:role/glue-synth-medical.
Previous region: us-east-1
Setting new region to: us-east-1
Region is set to: us-east-1


In [7]:
import boto3
from pyspark.context import SparkContext
from pyspark.sql.functions import col, to_date, DataFrame
from awsglue.context import GlueContext
import matplotlib.pyplot as plt


# from awsglue.transforms import *
# from awsglue.utils import getResolvedOptions
# from awsglue import DynamicFrame

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Session ID: 983f96d9-e81e-4dcc-b2bc-fd11c46dcb8c


Exception encountered while creating session: An error occurred (SignatureDoesNotMatch) when calling the GetCallerIdentity operation: Signature expired: 20240201T181737Z is now earlier than 20240201T194606Z (20240201T200106Z - 15 min.) 
Traceback (most recent call last):
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 168, in do_execute
    self.create_session()
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 602, in create_session
    additional_args = self._get_additional_arguments()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 665, in _get_additional_arguments
    user_id = self._get_user_id()
              ^^^^^^^^^^^^

In [8]:
DATABASE = "cmsdesynpuf1k"
VOCABULARY = "XXXX"

def load_medical_data(
    table_id: str,
    database_name: str = DATABASE,
) -> DataFrame:
    df = load_table_to_df(table_id, database_name=database_name)
    df = cast_date_columns_and_drop_timestamp(df)
    df.createOrReplaceTempView(table_id)
    return df

def load_vocabulary(
    table_id: str,
    database_name: str = VOCABULARY,
    ) -> DataFrame:
    df = load_table_to_df(table_id, database_name=database_name)
    df.createOrReplaceTempView(table_id)
    return df 

def load_table_to_df(table_id: str, database_name: str) -> DataFrame:
    df = glueContext.create_data_frame_from_catalog(
        database=database_name,
        table_name=get_table_name(table_id),
        transformation_ctx=f"load_{table_id}_DF",
        useSparkDataSource=True,
    )
    return df

def get_table_name(table_id: str) -> str:
    table_name_pattern = f"cdm_{table_id}_csv_bz2"
    
    glue_client = boto3.client("glue")
    matching_tables = glue_client.get_tables(
        DatabaseName=DATABASE,
        Expression=table_name_pattern
    )['TableList']
    
    if len(matching_tables) == 0:
        raise ValueError(f"Table {table_id} not found")
    if len(matching_tables) > 1:
        raise ValueError(f"Multiple tables found for {table_id}")
    return matching_tables[0]["Name"]

def cast_date_columns_and_drop_timestamp(df: DataFrame) -> DataFrame:
    """Transform date columns from bigint to date and drop columns 
    specifying time of day (they end with 00:00:00)"""
    columns_out = []
    for c in df.columns:
        if c.endswith("00:00:00"):
            continue
        elif c.endswith("_date"):
            c_out = to_date(col(c).cast('string'), 'yyyyMMdd').alias(c)
        else:
            c_out = col(c)
        columns_out.append(c_out)
                
    return df.select(columns_out)

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Session ID: 983f96d9-e81e-4dcc-b2bc-fd11c46dcb8c


Exception encountered while creating session: An error occurred (SignatureDoesNotMatch) when calling the GetCallerIdentity operation: Signature expired: 20240201T181739Z is now earlier than 20240201T194608Z (20240201T200108Z - 15 min.) 
Traceback (most recent call last):
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 168, in do_execute
    self.create_session()
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 602, in create_session
    additional_args = self._get_additional_arguments()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 665, in _get_additional_arguments
    user_id = self._get_user_id()
              ^^^^^^^^^^^^

In [9]:
# condition_df = load_medical_data("condition_occurrence")
# era_start_date = condition_df.select('condition_start_date').toPandas()
# plt.hist(era_start_date, bins=50, alpha=0.7, color='blue')
# %matplot plt

In [10]:
death_df = load_medical_data("death")
person_df = load_medical_data("person")
condition_df = load_medical_data("condition_occurrence")
observation_period_df = load_medical_data("observation_period")
# vocabulary_df = load_medical_data("xxxx", database_name=VOCABULARY)

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Session ID: 983f96d9-e81e-4dcc-b2bc-fd11c46dcb8c


Exception encountered while creating session: An error occurred (SignatureDoesNotMatch) when calling the GetCallerIdentity operation: Signature expired: 20240201T181746Z is now earlier than 20240201T194614Z (20240201T200114Z - 15 min.) 
Traceback (most recent call last):
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 168, in do_execute
    self.create_session()
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 602, in create_session
    additional_args = self._get_additional_arguments()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 665, in _get_additional_arguments
    user_id = self._get_user_id()
              ^^^^^^^^^^^^

In [11]:
sql_create_cohort_script = """
WITH selected_condition AS (
    SELECT
        person_id,
        condition_concept_id,
        condition_start_date
        -- condition_end_date,
        -- condition_end_reason
    FROM 
        condition_occurrence
    WHERE
        condition_concept_id = 320128
)
SELECT
    sc.person_id,
    sc.condition_concept_id,
    sc.condition_start_date,
    CASE
        WHEN d.death_date IS NULL THEN 1
        ELSE 0
    END AS dead
FROM
    selected_condition AS sc
    INNER JOIN 
    observation_period AS op
    ON sc.person_id = op.person_id    
    AND DATEDIFF(sc.condition_start_date, op.observation_period_start_date) > 360
    AND DATEDIFF(op.observation_period_end_date, sc.condition_start_date) > 360

    LEFT JOIN
    death as d
    ON sc.person_id = d.person_id
    AND DATEDIFF(sc.condition_start_date, d.death_date) > 360
;
"""

sql_add_features_script = """
SELECT
    c.person_id,
    c.dead,
    p.gender_concept_id,
    p.race_concept_id,
    p.ethnicity_concept_id,
    p.year_of_birth,
    p.location_id
FROM
    cohort as c
    JOIN 
    person as p
    ON c.person_id = p.person_id
;
"""

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Session ID: 983f96d9-e81e-4dcc-b2bc-fd11c46dcb8c


Exception encountered while creating session: An error occurred (SignatureDoesNotMatch) when calling the GetCallerIdentity operation: Signature expired: 20240201T181816Z is now earlier than 20240201T194642Z (20240201T200142Z - 15 min.) 
Traceback (most recent call last):
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 168, in do_execute
    self.create_session()
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 602, in create_session
    additional_args = self._get_additional_arguments()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alejo/anaconda3/envs/data-science/lib/python3.11/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_base/BaseKernel.py", line 665, in _get_additional_arguments
    user_id = self._get_user_id()
              ^^^^^^^^^^^^

In [14]:
cohort = spark.sql(sql_create_cohort_script)
cohort.createOrReplaceTempView("cohort")




In [19]:
features = spark.sql(sql_add_features_script)




In [25]:
(features.write
 .format("parquet")
 .option("compression", "snappy")
 .mode("overwrite")
 .save("s3://synth-medical/data/"))


