
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

## STEPS

### 1- Upload data to S3
### 2- Create glue connector pointing to redshift cluster
### 3- Grant role with policies to glue ( Passrole, GlueServiceRole, s3 access)
### 4- Add job bookmarks and schedule the script



In [7]:
%glue_version 3.0
%number_of_workers 3
%worker_type G.1X
%idle_timeout 60
%connections redshift_tutorial_connector
%%configure
{
    "--job-bookmark-option": "job-bookmark-enable"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Setting Glue version to: 3.0
Previous number of workers: 5
Setting new number of workers to: 3
Previous worker type: G.1X
Setting new worker type to: G.1X
Current idle_timeout is 2880 minutes.
idle_timeout has been set to 60 minutes.
Connections to be included:
redshift_tutorial_connector
The following configurations have been updated: {'--job-bookmark-option': 'job-bookmark-enable'}


In [2]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




In [3]:
params = []
if '--JOB_NAME' in sys.argv:
    params.append('JOB_NAME')
args = getResolvedOptions(sys.argv, params)
if 'JOB_NAME' in args:
    jobname = args['JOB_NAME']
else:
    jobname = "glue_redshift_tutorial_job"
job.init(jobname, args)




In [4]:
nyc_taxi_trip_input_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type = "s3", 
    connection_options = {
        "paths": ["s3://redshift-tutorial-datasets/Glue_data/taxi/yellow_tripdata_2022-01.parquet"]
    }, 
    format = "parquet",
    transformation_ctx = "nyc_taxi_trip_input_dyf"
)

nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type = "s3", 
    connection_options = {
        "paths": ["s3://redshift-tutorial-datasets/Glue_data/lookup/taxi+_zone_lookup.csv"]
    }, 
    format = "csv",
    format_options= {
        'withHeader': True
    },
    transformation_ctx = "nyc_taxi_zone_lookup_dyf"
)




In [5]:
nyc_taxi_trip_apply_mapping_dyf = ApplyMapping.apply(
    frame = nyc_taxi_trip_input_dyf, 
    mappings = [
        ("VendorID","Long","VendorID","Integer"), 
        ("tpep_pickup_datetime","Timestamp","tpep_pickup_datetime","Timestamp"), 
        ("tpep_dropoff_datetime","Timestamp","tpep_dropoff_datetime","Timestamp"), 
        ("passenger_count","Double","passenger_count","Integer"), 
        ("trip_distance","Double","trip_distance","Double"),
        ("RatecodeID","Double","RatecodeID","Integer"), 
        ("store_and_fwd_flag","String","store_and_fwd_flag","String"), 
        ("PULocationID","Long","PULocationID","Integer"), 
        ("DOLocationID","Long","DOLocationID","Integer"),
        ("payment_type","Long","payment_type","Integer"), 
        ("fare_amount","Double","fare_amount","Double"),
        ("extra","Double","extra","Double"), 
        ("mta_tax","Double","mta_tax","Double"),
        ("tip_amount","Double","tip_amount","Double"), 
        ("tolls_amount","Double","tolls_amount","Double"), 
        ("improvement_surcharge","Double","improvement_surcharge","Double"), 
        ("total_amount","Double","total_amount","Double"), 
        ("congestion_surcharge","Double","congestion_surcharge","Double"), 
        ("airport_fee","Double","airport_fee","Double")
    ],
    transformation_ctx = "nyc_taxi_trip_apply_mapping_dyf"
)

nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type = "s3", 
    connection_options = {
        "paths": ["s3://redshift-tutorial-datasets/Glue_data/lookup/taxi+_zone_lookup.csv"]
    }, 
    format = "csv",
    format_options= {
        'withHeader': True
    },
    transformation_ctx = "nyc_taxi_zone_lookup_dyf"
)




In [6]:
def trip_duration(start_timestamp,end_timestamp):
    minutes_diff = (end_timestamp - start_timestamp).total_seconds() / 60.0
    return(minutes_diff)
def transformRecord(rec):
    rec["trip_duration"] = trip_duration(rec["tpep_pickup_datetime"], rec["tpep_dropoff_datetime"])
    return rec
nyc_taxi_trip_final_dyf = Map.apply(
    frame = nyc_taxi_trip_apply_mapping_dyf, 
    f = transformRecord, 
    transformation_ctx = "nyc_taxi_trip_final_dyf"
)




In [9]:
nyc_taxi_trip_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = nyc_taxi_trip_final_dyf, 
    catalog_connection = "redshift_tutorial_connector", 
    connection_options =  {"dbtable": "public.f_nyc_yellow_taxi_trip","database": "dev"}, 
    redshift_tmp_dir = "s3://redshift-tutorial-datasets/Glue_data/temporary/", 
    transformation_ctx = "nyc_taxi_trip_sink_dyf"
)




In [10]:
job.commit()


