# AWS Glue Studio Notebook

**You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.**

**Optional: Run this cell to see available notebook commands ("magics")**.

In [11]:
%help


# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0 and 3.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X, G.2X, G.4X and G.8X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



##  Run this cell to set up and start your interactive session.

In [18]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2
%extra_py_files s3://807388292768-us-east-1-artifacts/versioned-artifacts/simple_glue-sbx_extra_py_files/LATEST
%additional_python_modules s3pathlib==2.0.1

You are already connected to a glueetl session 387e51cd-ab65-470b-8c80-a7946bf286da.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 387e51cd-ab65-470b-8c80-a7946bf286da.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 387e51cd-ab65-470b-8c80-a7946bf286da.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 387e51cd-ab65-470b-8c80-a7946bf286da.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 2
Setting new number of workers to: 2


You are already connected to a glueetl session 387e51cd-ab65-470b-8c80-a7946bf286da.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Extra py files to be included:
s3://807388292768-us-east-1-artifacts/versioned-artifacts/simple_glue-sbx_extra_py_files/LATEST


You are already connected to a glueetl session 387e51cd-ab65-470b-8c80-a7946bf286da.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Additional python modules to be included:
s3pathlib==2.0.1


# Glue Job Script

## Prepare

In [2]:
# -*- coding: utf-8 -*-

"""
This is a sample Glue Job to demonstrate:

- Glue job scripting best practice
    - Parameter management
    - Fast development in interactive Glue Jupyter Notebook
- Glue job unit test best practice
- Glue job integration test best practice
"""

# standard library
import typing as T
import sys
import os
import dataclasses
from pprint import pprint

# third party library
from s3pathlib import S3Path

# pyspark and glue stuff
from pyspark.context import SparkContext

from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

# custom library
from simple_glue.glue_libs.glue_utils import double_a_column




## Identify Run Mode

In the lifecycle of a Glue ETL script, from development to testing and production, various requirements and scenarios arise. These requirements can differ based on the specific stage of development. Let's explore some examples:

**Development Stage**:

- In the initial development phase using Jupyter Notebook, it is often beneficial to connect the ETL script to a smaller dataset to facilitate faster execution time. Additionally, overriding input parameters can aid in the debugging process.
- To enhance readability and maintainability, it may be helpful to include additional information or code snippets within the script.

**Unit Testing Stage**:

- During unit testing in the Glue container image within the CI runtime, the focus may be on testing the transformation logic rather than the entire ETL logics. In such cases, it might be preferable to skip the "read the data" and "write the data" phases and only import the relevant portions of the code.

**Production Stage**:

- In a production environment, debug and logging logic may not be required. And we should use the real production data to run the ETL logics.

While the ETL logic remains mostly the same, there are slight differences depending on the runtime environment. Maintaining multiple copies of the same Glue ETL script and keeping them in sync can be challenging.

To address this issue, we have a small code snippet that defines some boolean flag variables to identify the runtime environment. With this approach, we can keep the majority of the ETL logic consistent across all stages while making slight modifications when necessary. Consequently, we only need to maintain a single script, providing the flexibility to customize the ETL logic based on the runtime environment.

In [3]:
# identify the current run Mode
IS_GLUE_JOB_RUN = False
IS_DEV_NOTEBOOK = False
IS_LOCAL = False

# in real glue job runtime, it always has --JOB_RUN_ID argument
if "--JOB_RUN_ID" in sys.argv:
    IS_GLUE_JOB_RUN = True
    print("Now we are on GLUE_JOB_RUN mode")
# in Jupyter Notebook glue job runtime, the $HOME is /home/spark
elif os.environ["HOME"] == "/home/spark":
    IS_DEV_NOTEBOOK = True
    print("Now we are on DEV_NOTEBOOK mode")
# otherwise, we assume that we are on local development or CI runtime for development or testing
else:
    IS_LOCAL = True
    print("Now we are on IS_LOCAL mode")

# Sometimes you can force to run in DEV_NOTEBOOK mode in regular Glue job run for debugging
# Also you can force to run in GLUE_JOB_RUN mode in your jupyter notebook, for example, testing the production data connection
# ensure that you commented this out before committing the code

# IS_GLUE_JOB_RUN = True
# IS_DEV_NOTEBOOK = True

Now we are on DEV_NOTEBOOK mode


## Parameter Management

The AWS Glue official documentation gives an example of loading parameters from the Spark submit command line. 

```python
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
param1 = args.get("param1")
param2 = args.get("param2")
param3 = args.get("param3")
```

Now, this method works, but let's be honest—it's not the most elegant solution. You end up with a bunch of variables in your code, and it's hard to tell which ones come from the input parameter and which ones are derived from it. Plus, figuring out the input/output interface of your Glue script becomes a bit of a headache.

But don't worry, we've got a solution! We've added a thin "parameter management" layer to your script. It's a data class that acts as a hub for all your ETL script parameters. The static attributes represent the Glue job parameters, while the dynamic methods calculate and give you the derived values. And here's the best part—you can define your own logic to load parameters from different places depending on the situation. Want to hard code some parameters while developing in Jupyter Notebook? Go for it! Need to load parameters from the AWS Parameter Store during production? No problem!

By using this approach, you'll notice a huge improvement in the readability and maintainability of your ETL scripts. Plus, it takes the burden of parameter loading off your main ETL logic and keeps everything nice and organized in one place.

In [4]:
@dataclasses.dataclass
class Param:
    s3uri_input: str
    s3uri_output: str

    @classmethod
    def load(cls, args: T.Optional[T.Dict[str, str]]):
        if IS_GLUE_JOB_RUN:
            return cls(
                s3uri_input=args.get("s3uri_input"),
                s3uri_output=args.get("s3uri_output"),
            )
        else:
            return cls(
                s3uri_input="s3://807388292768-us-east-1-data/projects/simple_glue/dev/unittest/unnest/input/",
                s3uri_output="s3://807388292768-us-east-1-data/projects/simple_glue/dev/unittest/unnest/output/",
            )

    @property
    def s3dir_input(self) -> S3Path:
        """
        The S3Path object version of the input S3 folder.
        """
        return S3Path(self.s3uri_input)

    @property
    def s3dir_output(self) -> S3Path:
        """
        The S3Path object version of the output S3 folder.
        """
        return S3Path(self.s3uri_output)




## Utility Functions

In order to enhance the maintainability of your ETL logic, it is advisable to consider breaking down the intermediate transformation steps into smaller Python functions with clear input and output definitions and documentations. These Python functions essentially serve as fundamental building blocks, which can be imported and individually tested during the unit testing phase. This modular approach enables you to validate the functionality of each function in isolation. Subsequently, once you have confidence in the reliability of these functions, you can seamlessly orchestrate them together to streamline your ETL logic. The result is a codebase that is organized and easier to manage.

In [5]:
def transform_data(gdf: DynamicFrame) -> DynamicFrame:
    """
    unnest / flatten complicte object and double the value of the columhn ``details.value``.
    """
    gdf_transformed = gdf.unnest(transformation_ctx="gdf_unnested")
    gdf_transformed = double_a_column(
        gdf_transformed,
        col_name="details.value",
        trans_ctx="double_a_column"
    )
    return gdf_transformed




## Glue ETL Logics

In [10]:
class GlueETL:
    def run(self):
        self.step0_preprocess()
        self.step1_read_data()
        self.step2_transform_data()
        self.step3_write_data()
        self.step4_post_process()
        return self.gdf_transformed

    def step0_preprocess(self):
        # print("--- sys.argv ---")
        # pprint(sys.argv)
        # print("--- env vars ---")
        # for k, v in os.environ.items():
        #     print(f"{k}={v}")

        self.spark_ctx = SparkContext.getOrCreate()
        self.glue_ctx = GlueContext(self.spark_ctx)
        self.spark_ses = self.glue_ctx.spark_session

        if IS_GLUE_JOB_RUN:
            self.args = getResolvedOptions(
                sys.argv,
                [
                    "JOB_NAME",
                    "s3uri_input",
                    "s3uri_output",
                ]
            )
            self.job = Job(self.glue_ctx)
            self.job.init(self.args["JOB_NAME"], self.args)
        else:
            self.args = None
            self.job = None

        self.param = Param.load(self.args)

    def step1_read_data(self):
        self.gdf = self.glue_ctx.create_dynamic_frame.from_options(
            connection_type="s3",
            connection_options=dict(
                paths=[
                    self.param.s3dir_input.uri,
                ],
                recurse=True,
            ),
            format="json",
            format_options=dict(multiLine="true"),
            transformation_ctx="read_gdf",
        )

    def step2_transform_data(self):
        self.gdf_transformed = transform_data(self.gdf)

    def step3_write_data(self):
        self.glue_ctx.write_dynamic_frame.from_options(
            frame=self.gdf_transformed,
            connection_type="s3",
            connection_options=dict(
                path=self.param.s3dir_output.uri,
            ),
            format="parquet",
            transformation_ctx="write_gdf",
        )

    def step4_post_process(self):
        # only commit job in real Glue job run
        if IS_GLUE_JOB_RUN:
            self.job.commit()




## Entry Point API Call

In [11]:
# If it is not LOCAL runtime, we should run the ETL logics
if IS_LOCAL is False:
    gdf_transformed = GlueETL().run()
    if IS_DEV_NOTEBOOK:
        gdf_transformed.toDF().show()

+-------------+--------+------------+
|details.value|event_id|details.name|
+-------------+--------+------------+
|            2|     e-1| measurement|
|            4|     e-2| measurement|
|           20|    e-10| measurement|
|            6|     e-3| measurement|
|           22|    e-11| measurement|
|           24|    e-12| measurement|
|           14|     e-7| measurement|
|            8|     e-4| measurement|
|           16|     e-8| measurement|
|           10|     e-5| measurement|
|           18|     e-9| measurement|
|           12|     e-6| measurement|
+-------------+--------+------------+
