# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::719386081370:role/role_glue_s3
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: f93ecaa1-fa5f-4dba-b643-edcdca2889fa
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session f93ecaa1-fa5f-4dba

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [None]:
import sys
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv,
                          ['JOB_NAME',
                           'day'])

In [18]:
import boto3

s3 = boto3.client('s3')
bucket_name = 'door2door-de'
prefix = 'data/' + args['day']
objects = []

# Paginate over the objects in the S3 bucket
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
    if 'Contents' in page:
        for obj in page['Contents']:
            if obj['Key'] != prefix:
                # Append the object's key to the list of objects
                objects.append('s3://' + bucket_name + '/' + obj['Key'])




In [35]:
dyf = glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': objects},
    format='json'
)
dyf.printSchema()

root
|-- event: string
|-- on: string
|-- at: string
|-- data: struct
|    |-- id: string
|    |-- location: struct
|    |    |-- lat: double
|    |    |-- lng: double
|    |    |-- at: string
|    |-- start: string
|    |-- finish: string
|-- organization_id: string


In [2]:
# dyf = glueContext.create_dynamic_frame.from_catalog(database='door2door', table_name='data')
# dyf.printSchema()

root
|-- event: string
|-- on: string
|-- at: string
|-- data: struct
|    |-- id: string
|    |-- location: struct
|    |    |-- lat: double
|    |    |-- lng: double
|    |    |-- at: string
|    |-- start: string
|    |-- finish: string
|-- organization_id: string


### 1. Separate the DynamicFrame according to the event


In [3]:
# Filter the input DynamicFrame based on the "on" column
vehicle_dyf = dyf.filter(lambda x: x["on"] == "vehicle")
period_dyf = dyf.filter(lambda x: x["on"] == "operating_period")




#### a. Mapping vehicle

In [4]:
# Define the mapping to extract fields from the "data" struct
mapping_vehicle = [
    ("data.id", "string", "id", "string"),
    ("data.location.lat", "double", "lat", "double"),
    ("data.location.lng", "double", "lng", "double"),
    ("data.location.at", "string", "location_at", "timestamp"),
    ("event", "string", "event", "string"),
]

# Use the ApplyMapping transform to extract fields and create a new DynamicFrame
dyf_vehicle = ApplyMapping.apply(
    frame=vehicle_dyf,
    mappings=mapping_vehicle
)




In [5]:
dyf_vehicle.toDF().show()

+--------------------+--------+--------+--------------------+------+
|                  id|     lat|     lng|         location_at| event|
+--------------------+--------+--------+--------------------+------+
|bac5188f-67c6-496...|52.45133|13.46045|2019-06-01 18:17:...|update|
|3a3eb23a-f22e-4fe...|52.45848|13.52647|2019-06-01 18:17:...|update|
|f0b87796-b25c-40b...|52.50309|13.33435|2019-06-01 18:17:...|update|
|9152c5d8-79cf-4fe...|52.50536|13.51655|2019-06-01 18:17:...|update|
|f06eb89c-ada0-41c...|52.49697|13.44936|2019-06-01 18:17:...|update|
|9d6a8840-def2-42b...|52.46324|13.34227|2019-06-01 18:17:...|update|
|3b0640d6-502d-462...|52.57786|13.26756|2019-06-01 18:17:...|update|
|98c8b8cb-7c2b-415...|52.50036|13.25032|2019-06-01 18:17:...|update|
|d6880741-ae7f-474...|52.47874|13.32032|2019-06-01 18:17:...|update|
|d759fc35-b25c-487...|52.44846|13.46759|2019-06-01 18:17:...|update|
|49c02de6-2bc6-46c...|52.44493|13.24099|2019-06-01 18:17:...|update|
|949798fc-50aa-47a...|52.55031|13.

#### b. Mapping period

In [6]:
# Define the mapping to extract fields from the "data" struct
mapping_period = [
    ("data.id", "string", "id", "string"),
    ("data.start", "string", "start", "timestamp"),
    ("data.finish", "string", "finish", "timestamp"),
    ("event", "string", "event", "string"),
]

# Use the ApplyMapping transform to extract fields and create a new DynamicFrame
dyf_period = ApplyMapping.apply(
    frame=period_dyf,
    mappings=mapping_period
)




In [7]:
dyf_period.toDF().show()

+----+--------------------+--------------------+------+
|  id|               start|              finish| event|
+----+--------------------+--------------------+------+
|op_2|2019-06-01 18:17:...|2019-06-01 18:22:...|create|
|op_1|2019-06-01 18:23:...|2019-06-01 18:28:...|create|
+----+--------------------+--------------------+------+


### 2. Define the schema for the fact table

In [8]:
from pyspark.sql.functions import monotonically_increasing_id


mapping = [
    ("on", "string", "on", "string"),
    ("at", "string", "date", "timestamp"),
    ("data.id", "string", "data_id", "string"),
    ("organization_id", "string", "organization_id", "string"),
]

# Use the ApplyMapping transform to add a new column for the "id" field
new_dyf = ApplyMapping.apply(
    frame=dyf,
    mappings=mapping
)

new_dyf = new_dyf.withColumn("id", monotonically_increasing_id())




In [9]:
new_dyf.toDF().show()

+-------+--------------------+--------------------+---------------+---+
|     on|                date|             data_id|organization_id| id|
+-------+--------------------+--------------------+---------------+---+
|vehicle|2019-06-01 18:17:...|bac5188f-67c6-496...|         org-id|  0|
|vehicle|2019-06-01 18:17:...|3a3eb23a-f22e-4fe...|         org-id|  1|
|vehicle|2019-06-01 18:17:...|f0b87796-b25c-40b...|         org-id|  2|
|vehicle|2019-06-01 18:17:...|9152c5d8-79cf-4fe...|         org-id|  3|
|vehicle|2019-06-01 18:17:...|f06eb89c-ada0-41c...|         org-id|  4|
|vehicle|2019-06-01 18:17:...|9d6a8840-def2-42b...|         org-id|  5|
|vehicle|2019-06-01 18:17:...|3b0640d6-502d-462...|         org-id|  6|
|vehicle|2019-06-01 18:17:...|98c8b8cb-7c2b-415...|         org-id|  7|
|vehicle|2019-06-01 18:17:...|d6880741-ae7f-474...|         org-id|  8|
|vehicle|2019-06-01 18:17:...|d759fc35-b25c-487...|         org-id|  9|
|vehicle|2019-06-01 18:17:...|49c02de6-2bc6-46c...|         org-

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [10]:
glueContext.write_dynamic_frame.from_catalog(frame=new_dyf, database="door2door", table_name="fact", transformation_ctx="datasink")
glueContext.write_dynamic_frame.from_catalog(frame=dyf_vehicle, database="door2door", table_name="vehicle", transformation_ctx="datasink")
glueContext.write_dynamic_frame.from_catalog(frame=dyf_period, database="door2door", table_name="period", transformation_ctx="datasink")

<awsglue.dynamicframe.DynamicFrame object at 0x7fcc689a00d0>
