# Importing GlueContext
reference: https://aws.amazon.com/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/

To get started, enter the following import statements in the PySpark shell. We import GlueContext, which wraps the Spark SQLContext, thereby providing mechanisms to interact with Apache Spark:

In [None]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
glueContext = GlueContext(SparkContext.getOrCreate())

## Dataset 1
We first generate a Spark DataFrame consisting of dummy data of an order list for a fictional company. We process the data using AWS Glue PySpark functions.

Enter the following code into the shell:

In [None]:
order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],\
               ['1006', '547', 'NO', '1418901256', '75034'],\
               ['1007', '823', 'YES', '1418901300', '75023'],\
               ['1008', '912', 'NO', '1418901400', '82091'],\
               ['1009', '321', 'YES', '1418902000', '90093']\
             ]

# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema = order_schema)

In [None]:
df_orders.show()

## DynamicFrame
A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required. We convert the df_orders DataFrame into a DynamicFrame.

Enter the following code in the shell:

In [None]:
dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf")

Now that we have our Dynamic Frame, we can start working with the datasets with AWS Glue transform functions.

## ApplyMapping
The columns in our data might be in different formats, and you may want to change their respective names. ApplyMapping is the best option for changing the names and formatting all the columns collectively. For our dataset, we change some of the columns to Long from String format to save storage space later. We also shorten the column zipcode to zip. See the following code:

In [None]:
# Input 
dyf_applyMapping = ApplyMapping.apply( frame = dyf_orders, mappings = [ 
  ("order_id","String","order_id","Long"), 
  ("customer_id","String","customer_id","Long"),
  ("essential_item","String","essential_item","String"),
  ("timestamp","String","timestamp","Long"),
  ("zipcode","String","zip","Long")
])

dyf_applyMapping.printSchema()


## Filter
We now want to prioritize our order delivery for essential items. We can achieve that using the Filter function:

In [None]:
# Input 
dyf_filter = Filter.apply(frame = dyf_applyMapping, f = lambda x: x["essential_item"] == 'YES')

dyf_filter.toDF().show()

## Map
Map allows us to apply a transformation to each record of a Dynamic Frame. For our case, we want to target a certain zip code for next day air shipping. We implement a simple “next_day_air” function and pass it to the Dynamic Frame:

In [None]:
# Input 

# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True

def next_day_air(rec):
  if rec["zip"] == 75034:
    rec["next_day_air"] = True
  return rec

mapped_dyF =  Map.apply(frame = dyf_applyMapping, f = next_day_air)

mapped_dyF.toDF().show()

## Dataset 2
To ship essential orders to the appropriate addresses, we need customer data. We demonstrate this by generating a custom JSON dataset consisting of zip codes and customer addresses. In this use case, this data represents the customer data of the company that we want to join later on.

We generate JSON strings consisting of customer data and use the Spark json function to convert them to a JSON structure (enter each jsonStr variable one at a time in case the terminal errors out):

In [None]:
# Input 
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'
df_row = spark.createDataFrame([
  Row(json=jsonStr1),
  Row(json=jsonStr2),
  Row(json=jsonStr3),
  Row(json=jsonStr4)
])

df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))
df_json.show()

In [None]:
# Input
df_json.printSchema()

To convert the DataFrame back to a DynamicFrame to continue with our operations, enter the following code:

In [None]:
# Input
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

## SelectFields
To join with the order list, we don’t need all the columns, so we use the SelectFields function to shortlist the columns we need. In our use case, we need the zip code column, but we can add more columns as the argument paths accepts a list:

In [None]:
# Input
dyf_selectFields = SelectFields.apply(frame = dyf_filter, paths=['zip'])

dyf_selectFields.toDF().show()

## Join
The Join function is straightforward and manages duplicate columns. We had two columns named zip from both datasets. AWS Glue added a period (.) in one of the duplicate column names to avoid errors:

In [None]:
# Input
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')
dyf_join.toDF().show()

# DropFields
Because we don’t need two columns with the same name, we can use DropFields to drop one or multiple columns all at once. The backticks (`) around .zip inside the function call are needed because the column name contains a period (.):

In [None]:
# Input
dyf_dropfields = DropFields.apply(
  frame = dyf_join,
  paths = "`.zip`"
)

dyf_dropfields.toDF().show()

## Relationalize
The Relationalize function can flatten nested structures and create multiple dynamic frames. Our customer column from the previous operation is a nested structure, and Relationalize can convert it into multiple flattened DynamicFrames:

In [None]:
# Input
dyf_relationize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

To see the DynamicFrames, we can’t run a .show() yet because it’s a collection. We need to check what keys are present. See the following code:

In [None]:
# Input
dyf_relationize.keys()

In the follow-up function in the next section, we show how to pick the DynamicFrame from a collection of multiple DynamicFrames.

## SelectFromCollection
The SelectFromCollection function allows us to retrieve the specific DynamicFrame from a collection of DynamicFrames. For this use case, we retrieve both DynamicFrames from the previous operation using this function.

To retrieve the first DynamicFrame, enter the following code:

In [None]:
# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root')

dyf_selectFromCollection.toDF().show()

To retrieve the second DynamicFrame, enter the following code:

In [None]:
# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root_customers')

dyf_selectFromCollection.toDF().show()

## RenameField
The second DynamicFrame we retrieved from the previous operation introduces a period (.) into our column names and is very lengthy. We can change that using the RenameField function:

In [None]:
# Input
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

dyf_dropfields_rf.toDF().show()

## ResolveChoice
ResloveChoice can gracefully handle column type ambiguities. For more information about the full capabilities of ResolveChoice, see the GitHub repo.

In [None]:
# Input
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

dyf_resolveChoice.printSchema()

## Dataset 3
We generate another dataset to demonstrate a few other functions. In this use case, the company’s warehouse inventory data is in a nested JSON structure, which is initially in a String format. See the following code:

In [None]:
# Input
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
               ],\
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}\
              '],
    		   ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]


warehouse_schema = StructType([StructField("warehouse_loc", StringType())\
                              ,StructField("data", StringType())])

df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema = warehouse_schema)
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

dyf_warehouse.printSchema()

## Unbox
We use Unbox to extract JSON from String format for the new data. Compare the preceding printSchema() output with the following code:

In [None]:
# Input
dyf_unbox = Unbox.apply(frame = dyf_warehouse, path = "data", format="json")
dyf_unbox.printSchema()

# Input 
dyf_unbox.toDF().show()

## Unnest
Unnest allows us to flatten a single DynamicFrame to a more relational table format. We apply Unnest to the nested structure from the previous operation and flatten it:

In [None]:
# Input
dyf_unnest = UnnestFrame.apply(frame = dyf_unbox)

dyf_unnest.printSchema()

dyf_unnest.toDF().show()

## DropNullFields
The DropNullFields function makes it easy to drop columns with all null values. Our warehouse data indicated that it was out of pears and can be dropped. We apply the DropNullFields function on the DynamicFrame, which automatically identifies the columns with null values and drops them:

In [None]:
# Input
dyf_dropNullfields = DropNullFields.apply(frame = dyf_unnest)

dyf_dropNullfields.toDF().show()

## SplitFields
SplitFields allows us to split a DyanmicFrame into two. The function takes the field names of the first DynamicFrame that we want to generate followed by the names of the two DynamicFrames:

In [None]:
# Input
dyf_splitFields = SplitFields.apply(frame = dyf_dropNullfields, paths = ["`data.strawberry`", "`data.pineapple`"], name1 = "a", name2 = "b")

For the first DynamicFrame, see the following code:

In [None]:
# Input
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

For the second Dynamic Frame, see the following code:

In [None]:
# Input
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

## SplitRows
SplitRows allows us to filter our dataset within a specific range of counts and split them into two DynamicFrames:

In [None]:
# Input
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more'

For the first Dynamic Frame, see the following code:

In [None]:
# Input
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, 'pa_200_less')
dyf_pa_200_less.toDF().show()

For the second Dynamic Frame, see the following code:

In [None]:
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, 'pa_200_more')
dyf_pa_200_more.toDF().show()

## Spigot
Spigot allows you to write a sample dataset to a destination during transformation. For our use case, we write the top 10 records locally:

In [None]:
# Input
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

Depending on your local environment configuration, Spigot may run into errors. Alternatively, you can use an AWS Glue endpoint or an AWS Glue ETL job to run this function.

## Write Dynamic Frame
The write_dynamic_frame function writes a DynamicFrame using the specified connection and format. For our use case, we write locally (we use a connection_type of S3 with a POSIX path argument in connection_options, which allows writing to local storage):

In [None]:
# Input
glueContext.write_dynamic_frame.from_options(\
frame = dyf_splitFields,\
connection_options = {'path': '/home/glue/GlueLocalOutput/'},\
connection_type = 's3',\
format = 'json')