# AWS Glue Notebook for Serverless Data Lake Workshop
This notebook contains the PySpark scripts run in AWS Glue to transform the data in the data lake. Each section refers a section in the lab. 

## Initialization
The first two sections initialize the Spark environment and only need to be run once. The first block may take a few seconds as it negotiates the Spark session.

In [None]:
## @ Import the AWS Glue libraries, pySpark we'll need 
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

In [None]:
## @ set up a single GlueContext.
sc = SparkContext.getOrCreate()

glueContext = GlueContext(sc)

## Lab - Transform / Decode data with AWS Glue
The first script is a straight forward transformation of the user activity table. The request column contains a number of fields embedded within it

```
GET /petstore/Cats/Treats
```
The column is split out to get the request type, top level, top page, and sub page.

The timestamp field is also parsed out to extract the date components: date, time, month, and year.


17/Jan/2018:10:43:54 
Date: 01/17/2018
Time: 10:43:54
Year: 2018
Month: 10

| ip_address | username | timestamp | request | http | bytes | requesttype | topdomain | subpage | date | time | year | month | toppage |
|------|------|------|------|------|------|------|------|------|------|------|------|------|------|
|   0.32.193.205  | grldmccfdm8 | 11/Oct/2018:23:36:54 | DELETE | /petstore/Bird/Treats | 500 | 927 | DELETE | petstore | Treats | 10/11/2018 | 23:36:54 | 2018 | 10 | Bird |
|   0.32.193.205  | grldmccfdm8 | 6/Jun/2017:13:03:54 | PUT | /petstore/Bird/Food | 500 | 927 | DELETE | petstore | Food | 06/06/2017 | 13:03:54 | 2017 | 6 | Bird |

The data is then written out in a partitioned parquet format.

*This process takes 3-5 minutes.*


In [None]:
spark = glueContext.spark_session
job = Job(glueContext)
job.init('^stackname^-exercise1')

## @ create the Glue DynamicFrame from table schema. A DynamicFrame is similar to a DataFrame, except that each record is 
## @ self-describing, so no schema is required initially.
useractivity = glueContext.create_dynamic_frame.from_catalog(database = "weblogs", table_name = "useractivity", transformation_ctx = "useractivity")

## @ ApplyMapping is one of the built in transforms that maps source columns and data types from a DynamicFrame to target columns 
## @ and data types in a returned DynamicFrame. You specify the mapping argument, which is a list of tuples that contain source column,
## @ source type, target column, and target type.
useractivityApplyMapping = ApplyMapping.apply(frame = useractivity, mappings = [("ip_address", "string", "ip_address", "string"), ("username", "string", "username", "string"), ("timestamp", "string", "timestamp", "string"), ("request", "string", "request", "string"), ("http", "long", "http", "long"), ("bytes", "long", "bytes", "long")], transformation_ctx = "applymapping1")

## @ ResolveChoice is another built in transform that you can use to specify how a column should be handled when it contains values of 
## @ multiple types. You can choose to either cast the column to a single data type, discard one or more of the types, or retain all 
## @ types in either separate columns or a structure. You can select a different resolution policy for each column or specify a global 
## @ policy that is applied to all columns.
resolvechoice2 = ResolveChoice.apply(frame = useractivityApplyMapping, choice = "make_struct", transformation_ctx = "resolvechoice2")

## @ DropNullFields transform removes null fields from a DynamicFrame. The output DynamicFrame does not contain fields of the null type
## @ in the schema.
useractivity = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @ We will leverage PySpark functions to manipulate our data, starting with converting glue DynamicFrame to DataFrame
dataframe0 = DynamicFrame.toDF(useractivity)

## @ Use PySpark functions to split request columns on '/' 
split_column = split(dataframe0['request'], '/')

dataframe0 = dataframe0.withColumn('requesttype', split_column.getItem(0))

dataframe0 = dataframe0.withColumn('topdomain', split_column.getItem(1))
dataframe0 = dataframe0.withColumn('toppage', split_column.getItem(2))
dataframe0 = dataframe0.withColumn('subpage', split_column.getItem(3))

## @ split timestamp column into date, time, year and month
dataframe0 = dataframe0.withColumn('date',date_format(from_unixtime(unix_timestamp('timestamp', 'd/MMM/yyyy:HH:mm:ss')), 'MM/dd/yyy'))
dataframe0 = dataframe0.withColumn('time',date_format(from_unixtime(unix_timestamp('timestamp', 'd/MMM/yyyy:HH:mm:ss')), 'HH:mm:ss'))

dataframe0 = dataframe0.withColumn('year', year(from_unixtime(unix_timestamp('timestamp', 'd/MMM/yyyy:HH:mm:ss'))))
dataframe0 = dataframe0.withColumn('month', month(from_unixtime(unix_timestamp('timestamp', 'd/MMM/yyyy:HH:mm:ss'))))


## @ convert dataframe to glue DynamicFrame and write the output in Parquet format partitioned on toppage column
useractivity = DynamicFrame.fromDF(dataframe0, glueContext, "name1")

writeUseractivityToS3 = glueContext.write_dynamic_frame.from_options(frame = useractivity, connection_type = "s3", connection_options = {"path": 's3://^ingestionbucket^/weblogs/useractivityconverted', "partitionKeys" :["toppage"]}, format = "parquet", transformation_ctx = "writeUseractivityToS3")

job.commit()

dataframe0.show()


## Results
To to the S3 bucket to view the results of the transformation and continue with the lab instructions.

## Lab - Join and relationalize data with AWS Glue
In this lab we will take two different datasets from different source systems and merge them to prepare a table that combines both useractivity and user profile datasets.

In [None]:

job = Job(glueContext)
job.init('^stackname^-exercise2')

## @ useractivity dynamicframe
useractivity = glueContext.create_dynamic_frame.from_catalog(database = "weblogs", table_name = "useractivity", transformation_ctx = "useractivity")

## @ applymappings to the dynamicframe to make sure we have the correct data types and column names
applymapping1 = ApplyMapping.apply(frame = useractivity, mappings = [("ip_address", "string", "ip_address", "string"), ("username", "string", "username", "string"), ("timestamp", "string", "timestamp", "string"), ("request", "string", "request", "string"), ("http", "long", "http", "long"), ("bytes", "long", "bytes", "long")], transformation_ctx = "applymapping1")

## @ resolve any issues with column data types
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

## @ drop any null fields
useractivity = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "useractivity")

## @ create userprofile dynamicframe
userprofile = glueContext.create_dynamic_frame.from_catalog(database="weblogs", table_name="userprofile")

## @ we will only keep the fields that we want and drop the rest and rename username to dy_username
userprofile = userprofile.drop_fields(['cc', 'password', 'ssn', 'email', 'phone','ip_address'])
userprofile = userprofile.rename_field('username','dy_username')

## @ as the data types in different datasets are different we are going to convert all column to string
## @ The Glue build in transform ApplyMapping, Maps source columns and data types from a DynamicFrame to target columns and data types 
## @ in a returned DynamicFrame. You specify the mapping argument, which is a list of tuples that contain source column, source type, 
## @ target column, and target type. In the below case we are converting the data types for zip and age to string and updating the column
## @ names for first_name & last_name
userprofile = ApplyMapping.apply(frame = userprofile, 
mappings = [("first_name", "string", "firstname", "string"), 
("dy_username", "string", "dy_username", "string"), 
("zip", "bigint", "zip", "string"), 
("age", "bigint", "age", "string"), 
("gender", "string", "gender", "long"),
("last_name", "string", "lastname", "long")
], transformation_ctx = "userprofile")

## @join useractivity and userprofile datasets to create one file and drop the duplicate column dy_username
joined = Join.apply(userprofile, useractivity, 'dy_username', 'username').drop_fields(['dy_username'])

glueContext.write_dynamic_frame.from_options(frame = joined,
          connection_type = "s3",
          connection_options = {"path": 's3://^ingestionbucket^/weblogs/joindatasets'},
          format = "parquet")

job.commit()

print 'Job Complete'



In [None]:
df = DynamicFrame.toDF(joined)

df.show()

## Results
Continue with the lab to explore the resulting table in the data lake.

## Create a UDF to simplify apply a hash function to columns

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import hashlib
from dateutil.parser import parse

def hash_cc(s):
    return hashlib.sha256(s).hexdigest()

In [None]:
job = Job(glueContext)
job.init('^stackname^-exercise3')

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "weblogs", table_name = "userprofile", transformation_ctx = "datasource0")


## @convert glue DynamicFrame to DataFrame to manipulate the columns
dataframe0 = DynamicFrame.toDF(datasource0)

hash_cc_f = udf(lambda x: hash_cc(x), StringType())

dataframe0 = dataframe0.withColumn("hash_cc", hash_cc_f(dataframe0["cc"])).withColumn("hash_ssn", hash_cc_f(dataframe0["ssn"]))
dataframe0 = dataframe0.drop('cc').drop('ssn').drop('password')

## @convert dataframe to glue DynamicFrame and write the output in Parquet format
datasource1 = DynamicFrame.fromDF(dataframe0, glueContext, "name1")


datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource1, connection_type = "s3", connection_options = {"path": 's3://^ingestionbucket^/weblogs/userprofile-secure'}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

print 'Job Complete'

In [None]:
dataframe0.show()

# Exercise 4 - Lookup

You are on your own now! Using you pyspark skills, create a simple UDF that performs a lookup on the data.

In this case, we're going create a lookup for the geocode of the IP Address. Instead of calling out to a geocoding service, we'll just return US if the first value is less than 100 and UK if the value is 100 or greater.

Using this function, create a lookup that creates a copy of the useractivity table that includes the country.

In [None]:
def geocode(ip):
    if int(ip.split('.')[0]) < 100:
        return "US"
    else:
        return "UK"
    

In [None]:
job = Job(glueContext)
job.init('Job4')

# Write Transformation Code here

job.commit()