# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0 and 3.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X, G.2X, G.4X and G.8X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



####  Run this cell to set up and start your interactive session.


In [5]:
%idle_timeout 2880
%number_of_workers 5

Current idle_timeout is 60 minutes.
idle_timeout has been set to 2880 minutes.
Previous number of workers: 5
Setting new number of workers to: 5


# Importing the `libraries`

In [1]:
%glue_ray

import ray
import pandas
import pyarrow
from ray import data
import time
from ray.data import ActorPoolStrategy


Previous Job type: glueray
Setting new Job type to glueray
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::507922848584:role/AWSGlueServiceRole-glueworkshop
Trying to create a Glue session for the kernel.
Worker Type: Z.2X
Number of Workers: 5
Session ID: c9cfedfb-3a7f-4aaa-8742-54f71201ce57
Job Type: glueray
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
--auto-scaling-ray-min-workers 1
Waiting for session c9cfedfb-3a7f-4aaa-8742-54f71201ce57 to get into ready status...
Session c9cfedfb-3a7f-4aaa-8742-54f71201ce57 has been created.


# Initialize a `Ray` Cluster with AWS Glue

In [2]:
ray.init('auto')


RayContext(dashboard_url='127.0.0.1:8265', python_version='3.9.16', ray_version='2.0.0', ray_commit='{{RAY_COMMIT_SHA}}', address_info={'node_ip_address': '2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3', 'raylet_ip_address': '2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2023-05-15_15-44-22_009317_1668/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2023-05-15_15-44-22_009317_1668/sockets/raylet', 'webui_url': '127.0.0.1:8265', 'session_dir': '/tmp/ray/session_2023-05-15_15-44-22_009317_1668', 'metrics_export_port': 8080, 'gcs_address': '2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3:6379', 'address': '2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3:6379', 'dashboard_agent_listen_port': 52365, 'node_id': '6273d5cf59d7337eca8aa808b919553cfb5fd4a5545f5636b254fc6e'})


2023-05-15 15:46:05,827	INFO worker.py:1329 -- Connecting to existing Ray cluster at address: 2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3:6379...
2023-05-15 15:46:05,834	INFO worker.py:1511 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [3]:
ray.cluster_resources()

{'node:2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3': 1.0, 'CPU': 8.0, 'memory': 40241570612.0, 'object_store_memory': 20120785305.0}


In [4]:
ray.nodes()

[{'NodeID': '6273d5cf59d7337eca8aa808b919553cfb5fd4a5545f5636b254fc6e', 'Alive': True, 'NodeManagerAddress': '2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3', 'NodeManagerHostname': 'localhost', 'NodeManagerPort': 44481, 'ObjectManagerPort': 38363, 'ObjectStoreSocketName': '/tmp/ray/session_2023-05-15_15-44-22_009317_1668/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2023-05-15_15-44-22_009317_1668/sockets/raylet', 'MetricsExportPort': 8080, 'NodeName': '2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3', 'alive': True, 'Resources': {'node:2600:1f13:4de:1a21:e7c1:8c9:c8a0:a1f3': 1.0, 'memory': 40241570612.0, 'CPU': 8.0, 'object_store_memory': 20120785305.0}}]


# Read the dataset in `Parquet` file format

In [12]:
start = time.time()
ds = ray.data.read_parquet("s3://amazon-reviews-pds/parquet/product_category=Toys/")
end = time.time()

print(f"Reading the data to dataframe: {round(end - start, 2)} seconds")

Reading the data to dataframe: 1.87 seconds


In [13]:
ds.schema()

marketplace: string
customer_id: string
review_id: string
product_id: string
product_parent: string
product_title: string
star_rating: int32
helpful_votes: int32
total_votes: int32
vine: string
verified_purchase: string
review_headline: string
review_body: string
review_date: date32[day]
year: int32
-- schema metadata --
org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 1036


In [14]:
ds.show(1)

{'marketplace': 'DE', 'customer_id': '47664801', 'review_id': 'R1DLS0N7LMVO8U', 'product_id': 'B000VSAEYI', 'product_parent': '128046393', 'product_title': 'Intex 57454NP - Ozean Play Center, 254 x 196 x 79 cm', 'star_rating': 5, 'helpful_votes': 0, 'total_votes': 0, 'vine': 'N', 'verified_purchase': 'N', 'review_headline': 'Top!', 'review_body': 'Wir sind total begeistert und unser Sohn ebenfalls.<br />Der Artikel hat bisher gehalten, was er verspricht.<br />Wir haben viel Spaß und ich kann das Becken weiterempfehlen.', 'review_date': datetime.date(2012, 6, 30), 'year': 2012}


# Applying few `transformations` with Ray

### 1) Add the given `new column` to the dataset and show the sample record after adding a new column


In [15]:

ds = ds.add_column( "helpful_votes_ratio", lambda df: df["helpful_votes"] / df["total_votes"])

ds.show(1)

{'marketplace': 'DE', 'customer_id': '47664801', 'review_id': 'R1DLS0N7LMVO8U', 'product_id': 'B000VSAEYI', 'product_parent': '128046393', 'product_title': 'Intex 57454NP - Ozean Play Center, 254 x 196 x 79 cm', 'star_rating': 5, 'helpful_votes': 0, 'total_votes': 0, 'vine': 'N', 'verified_purchase': 'N', 'review_headline': 'Top!', 'review_body': 'Wir sind total begeistert und unser Sohn ebenfalls.<br />Der Artikel hat bisher gehalten, was er verspricht.<br />Wir haben viel Spaß und ich kann das Becken weiterempfehlen.', 'review_date': datetime.date(2012, 6, 30), 'year': 2012, 'helpful_votes_ratio': nan}


Read->Map_Batches: 100%|##########| 10/10 [00:33<00:00,  3.30s/it]


#### 2) Dropping `few columns` from the underlying Dataset 


In [17]:
ds = ds.drop_columns(["review_body", "vine", "product_parent", "verified_purchase", "review_headline"])

ds.schema()

PandasBlockSchema(names=['marketplace', 'customer_id', 'review_id', 'product_id', 'product_title', 'star_rating', 'helpful_votes', 'total_votes', 'review_date', 'year', 'helpful_votes_ratio'], types=[dtype('O'), dtype('O'), dtype('O'), dtype('O'), dtype('O'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('O'), dtype('int32'), dtype('float64')])


Map_Batches: 100%|##########| 10/10 [00:04<00:00,  2.26it/s]


# Write the `processed data` into the S3 bucket

In [18]:
my_bucket = 's3://data-engg-demo/dataset/ray_output/'

ds.write_parquet(my_bucket)

Write Progress: 100%|##########| 10/10 [00:12<00:00,  1.24s/it]
