
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session. The second cell in this notebook contains all the needed magics to start your session so all you need to do is execute it.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0                                                        |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %delete_session             |              |  Deletes the current session and kills the cluster. User stops being charged.                                                                             |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |

### Set the type and number of worker nodes tto be used to run the spark jobs

In [2]:
%worker_type G.2X

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

It looks like there is a newer version of the kernel available. The latest version is stderr='ERROR:Exception:\nTraceback(mostrecentcalllast and you have 0.24 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Previous worker type: G.1X
Setting new worker type to: G.2X


In [4]:
%number_of_workers 20

Previous number of workers: 5
Setting new number of workers to: 20


In [8]:
# Execute this cell to configure and start your interactive session.
%session_id_prefix my-session-yez3d
%idle_timeout 60
%%configure 
{
  "region": "us-east-1",
  "iam_role": "<arn of the iam role>"
}

Setting session ID prefix to my-session-yez3d
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'region': 'us-east-1', 'iam_role': 'arn:aws:iam::960351580303:role/jupyter-test-glue-policy'}


In [None]:
# Remember to execute the config cell first
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from pyspark.sql.types import *
  
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)

Authenticating with profile=default
glue_role_arn defined by user: arn:aws:iam::960351580303:role/jupyter-test-glue-policy
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.2X
Number of Workers: 20
Session ID: my-session-yez3d-03674402-5808-417d-a553-278c98588cc9
Applying the following default arguments:
--glue_kernel_version 0.24
--enable-glue-datacatalog true
Waiting for session my-session-yez3d-03674402-5808-417d-a553-278c98588cc9 to get into ready status...
Session my-session-yez3d-03674402-5808-417d-a553-278c98588cc9 has been created




### Read the data from the csv source files

In [1]:
source = "<source data location in s3>"

dbName = "webinar_ingestion_db"

tableName = "yellow_taxi_data"




### Define the schema of the source

In [4]:
dfSchema = StructType([
      StructField('VendorID', StringType(), True),
      StructField('tpep_pickup_datetime', StringType(), True),
      StructField('tpep_dropoff_datetime', StringType(), True),
      StructField('passenger_count', StringType(), True),
      StructField('trip_distance', StringType(), True),
      StructField('pickup_longitude', StringType(), True),
      StructField('pickup_latitude', StringType(), True),
      StructField('RateCodeID', StringType(), True),
      StructField('store_and_fwd_flag', StringType(), True),
      StructField('dropoff_longitude', StringType(), True),
      StructField('dropoff_latitude', StringType(), True),
      StructField('payment_type', StringType(), True),
      StructField('fare_amount', StringType(), True),
      StructField('extra', StringType(), True),
      StructField('mta_tax', StringType(), True),
      StructField('tip_amount', StringType(), True),
      StructField('tolls_amount', StringType(), True),
      StructField('improvement_surcharge', StringType(), True),
      StructField('total_amount', StringType(), True)
])




In [5]:

df = (spark.read
      .schema(dfSchema)
      .option("header",True)
      .option("sep",",")
      .csv(source))

df.show()

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|   -73.993896484375|40.750110626220703|        

In [6]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)


### Do some transformations on the data

In [16]:
df = (df
    .withColumn('tpep_pickup_datetime', lit((to_timestamp(col('tpep_pickup_datetime'), "yyyy-MM-dd HH:mm:ss"))))
    .withColumn('tpep_dropoff_datetime', lit((to_timestamp(col('tpep_dropoff_datetime'), "yyyy-MM-dd HH:mm:ss"))))
    .withColumn("tpep_datetime", lit((unix_timestamp(col('tpep_dropoff_datetime')) - unix_timestamp(col('tpep_pickup_datetime')))/60).cast("int"))
    .withColumn("file_name", lit(input_file_name()))
    .withColumn("insert_timestamp", lit(from_utc_timestamp(current_timestamp(), 'IST'))))
    
df.show()

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------------+--------------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|tpep_datetime|           file_name|    insert_timestamp|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------------+-----

In [15]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- tpep_datetime: integer (nullable = true)
 |-- file_name: string (nullable = false)
 |-- insert_timestamp: timestamp (

In [8]:
rowCount = df.count()

print(f"count: {rowCount}")

count: 23655844


### Save the DataFrame as a table

In [None]:
df.write.format("parquet").mode("overwrite").saveAsTable(f"{dbName}.{tableName}")

### Send ingestion alert

In [11]:
import boto3

client = boto3.client('sns')

message = f"""

Data ingestion to : {dbName}.{tableName}

Source: {source}

Number of row inserted: {rowCount}

"""

print(message)

try:
    response = client.publish(
        TopicArn='<arn of the sns topic>',
        Message=message,
        Subject='Data ingestion alert'
    )
except ValueError as err:
    print(f"Error while publishing message: {err}")
    



Data ingestion to : webinar_ingestion_db.yellow_taxi_data

Source: s3://webinaringestionbucket/source/taxi/yellow_tripdata_*.csv

Number of row inserted: 23655844
