# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import functions 
from pyspark.sql.functions import when
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::275865128547:role/s3-glue-role
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 3201b870-087c-462d-bd21-8bffc3c22322
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
Waiting for session 3201b870-087c-462d

## Extract Data

In [2]:
distance_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/distance/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
group_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/group/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
promotion_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/promotion/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
reservation_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/reservation/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
station_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/station/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
user_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/user/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
vehicle_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/vehicle/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()
vehiclesize_table = glueContext.create_dynamic_frame_from_options(connection_type='s3',connection_options={'paths':["s3://car-sharing-bucket/data/vehiclesize/"]},
                                            format='csv', format_options={"withHeader":True,"optimizePerformance":True}).toDF()




## Transform Data

### group

In [3]:
dim_group = group_table
dim_group.printSchema()

root
 |-- groupid: string (nullable = true)
 |-- groupcode: string (nullable = true)
 |-- name: string (nullable = true)


In [4]:
dim_group = dim_group.withColumn("groupid",dim_group['groupid'].cast("int"))
dim_group.printSchema()

root
 |-- groupid: integer (nullable = true)
 |-- groupcode: string (nullable = true)
 |-- name: string (nullable = true)


### promotion

In [5]:
dim_promotion = promotion_table.drop('master_promotionid','require_promotionid')
dim_promotion = dim_promotion.na.drop(how='any',subset=['promotioncode'])
dim_promotion.printSchema()

root
 |-- promotionid: string (nullable = true)
 |-- promotiontype: string (nullable = true)
 |-- promotioncode: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- effectedtime: string (nullable = true)
 |-- expiredtime: string (nullable = true)


In [6]:
dim_promotion = dim_promotion.withColumn("promotionid",dim_promotion['promotionid'].cast("int"))
dim_promotion = dim_promotion.withColumn("effectedtime",functions.to_timestamp(dim_promotion['effectedtime'], 'yyyy-MM-dd HH:mm:ss'))
dim_promotion = dim_promotion.withColumn("expiredtime",functions.to_timestamp(dim_promotion['expiredtime'], 'yyyy-MM-dd HH:mm:ss'))
dim_promotion.printSchema()

root
 |-- promotionid: integer (nullable = true)
 |-- promotiontype: string (nullable = true)
 |-- promotioncode: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- effectedtime: timestamp (nullable = true)
 |-- expiredtime: timestamp (nullable = true)


### station

In [7]:
dim_station = station_table.drop('zone','availableparking','district_th','province_th','address')
dim_station = dim_station.withColumnRenamed('province_en\r','province_en')
dim_station.printSchema()

root
 |-- stationid: string (nullable = true)
 |-- stationcode: string (nullable = true)
 |-- stationstatus: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- district_en: string (nullable = true)
 |-- province_en: string (nullable = true)


In [8]:
dim_station = dim_station.withColumn("stationid",dim_station['stationid'].cast("int"))
dim_station.printSchema()

root
 |-- stationid: integer (nullable = true)
 |-- stationcode: string (nullable = true)
 |-- stationstatus: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- district_en: string (nullable = true)
 |-- province_en: string (nullable = true)


### user

In [9]:
dim_user = user_table.drop('last_login\r')
dim_user.printSchema()

root
 |-- userid: string (nullable = true)
 |-- activated: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- foreigner: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- registedtime: string (nullable = true)


In [10]:
dim_user = dim_user.withColumn("userid",dim_user['userid'].cast("int"))
dim_user.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- activated: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- foreigner: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- registedtime: string (nullable = true)


### vehicle

In [13]:
vehiclesize = vehiclesize_table.select('vehicleid','brand','model','size','type').withColumnRenamed('vehicleid','_vehicleid')
dim_vehicle = vehicle_table.join(vehiclesize, vehicle_table.vehicleid==vehiclesize._vehicleid, 'left').drop('_vehicleid')
dim_vehicle.printSchema()

root
 |-- vehicleid: string (nullable = true)
 |-- vehiclecode: string (nullable = true)
 |-- vehicletype: string (nullable = true)
 |-- vehiclesystem: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)


In [17]:
dim_vehicle = dim_vehicle.withColumn("vehicleid",dim_vehicle['vehicleid'].cast("int"))
dim_vehicle.printSchema()

root
 |-- vehicleid: integer (nullable = true)
 |-- vehiclecode: string (nullable = true)
 |-- vehicletype: string (nullable = true)
 |-- vehiclesystem: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)


### reservation

In [18]:
distance = distance_table.withColumnRenamed('reservationno','_reservationno').withColumnRenamed('distance\r','distance')
fac_reservation = reservation_table.select('reservationno','reservationstate','userid','groupid','vehicleid','stationid','promotionid','reservestarttime','reservestoptime','reservehours','discount','totalprice','chargetotal','category')
fac_reservation = fac_reservation.filter((fac_reservation.groupid!=0)&(fac_reservation.stationid!=0))
fac_reservation = fac_reservation.join(distance, fac_reservation.reservationno==distance._reservationno, 'left').drop('_reservationno')
fac_reservation.printSchema()

root
 |-- reservationno: string (nullable = true)
 |-- reservationstate: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- groupid: string (nullable = true)
 |-- vehicleid: string (nullable = true)
 |-- stationid: string (nullable = true)
 |-- promotionid: string (nullable = true)
 |-- reservestarttime: string (nullable = true)
 |-- reservestoptime: string (nullable = true)
 |-- reservehours: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- totalprice: string (nullable = true)
 |-- chargetotal: string (nullable = true)
 |-- category: string (nullable = true)
 |-- distance: string (nullable = true)


In [19]:
fac_reservation = fac_reservation.withColumn("reservationno",fac_reservation['reservationno'].cast("int"))
fac_reservation = fac_reservation.withColumn("userid",fac_reservation['userid'].cast("int"))
fac_reservation = fac_reservation.withColumn("groupid",fac_reservation['groupid'].cast("int"))
fac_reservation = fac_reservation.withColumn("vehicleid",fac_reservation['vehicleid'].cast("int"))
fac_reservation = fac_reservation.withColumn("stationid",fac_reservation['stationid'].cast("int"))
fac_reservation = fac_reservation.withColumn("promotionid",fac_reservation['promotionid'].cast("int"))
fac_reservation = fac_reservation.withColumn("reservestarttime",functions.to_timestamp(fac_reservation['reservestarttime'], 'yyyy-MM-dd HH:mm:ss'))
fac_reservation = fac_reservation.withColumn("reservestoptime",functions.to_timestamp(fac_reservation['reservestoptime'], 'yyyy-MM-dd HH:mm:ss'))
fac_reservation = fac_reservation.withColumn("reservehours",fac_reservation['reservehours'].cast("int"))
fac_reservation = fac_reservation.withColumn("totalprice",fac_reservation['totalprice'].cast("double"))
fac_reservation = fac_reservation.withColumn("discount",fac_reservation['discount'].cast("double"))
fac_reservation = fac_reservation.withColumn("chargetotal",fac_reservation['chargetotal'].cast("double"))
fac_reservation = fac_reservation.withColumn("distance",fac_reservation['distance'].cast("double"))
fac_reservation.printSchema()

root
 |-- reservationno: integer (nullable = true)
 |-- reservationstate: string (nullable = true)
 |-- userid: integer (nullable = true)
 |-- groupid: integer (nullable = true)
 |-- vehicleid: integer (nullable = true)
 |-- stationid: integer (nullable = true)
 |-- promotionid: integer (nullable = true)
 |-- reservestarttime: timestamp (nullable = true)
 |-- reservestoptime: timestamp (nullable = true)
 |-- reservehours: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- totalprice: double (nullable = true)
 |-- chargetotal: double (nullable = true)
 |-- category: string (nullable = true)
 |-- distance: double (nullable = true)


## Load Data

### Load to AWS S3 and Create table in Glue Catalog

In [21]:
def load_data_s3(df,path,database_name,table_name,df_name):
    s3output = glueContext.getSink(
      path=path,
      connection_type="s3",
      updateBehavior="UPDATE_IN_DATABASE",
      partitionKeys=[],
      compression="snappy",
      enableUpdateCatalog=True,
      transformation_ctx="s3output",
    )
    s3output.setCatalogInfo(catalogDatabase=database_name, catalogTableName=table_name)
    s3output.setFormat("glueparquet")
    s3output.writeFrame(DynamicFrame.fromDF(df, glueContext, df_name))




In [22]:
#dim_group
load_data_s3(dim_group,'s3://car-sharing-bucket-haup/transformed_data/dim_group',"dim_carsharing","dim_group","dim_group")
#dim_promotion
load_data_s3(dim_promotion,'s3://car-sharing-bucket-haup/transformed_data/dim_promotion',"dim_carsharing","dim_promotion","dim_promotion")
#dim_station
load_data_s3(dim_station,'s3://car-sharing-bucket-haup/transformed_data/dim_station',"dim_carsharing","dim_station","dim_station")
#dim_user
load_data_s3(dim_user,'s3://car-sharing-bucket-haup/transformed_data/dim_user',"dim_carsharing","dim_user","dim_user")
#dim_vehicle
load_data_s3(dim_vehicle,'s3://car-sharing-bucket-haup/transformed_data/dim_vehicle',"dim_carsharing","dim_vehicle","dim_vehicle")
#fac_reservation
load_data_s3(fac_reservation,'s3://car-sharing-bucket-haup/transformed_data/fac_reservation',"dim_carsharing","fac_reservation","fac_reservation")


