# Serverless Data Lake Immersion
## Lab 2.3 - Advanced Data Preparation with Developer Endpoints and Notebook
`(Revision History:
PA5, 2019-10-19, @akirmak: updated Section 7.2 based on feedback from identified by @rmichaud and @werberm and the solution proposed by @greenste. PA4 excluded.
PA4, 2019-10-19, @akirmak: Advanced Spark ETL logic added as bonus
PA3, 2019-05-09, @akirmak: updated based on feedback from @hohenber
PA2, 2018-12-13, @akirmak 
PA1, 2018-12-07`

This example shows how to do joins and filters with transforms on DynamicFrames.

For purposes of our Immersion Day, we are assuming that you have done the previous Lab assignments (Create Firehose delivery stream, ingest simulated product catalogue data to S3, crawled this data and put the results into a database called `<your initials>-tame-bda-immersion-gdb` and a table called `raw` in your Data Catalog, as described in the lab guide.

### 2. Getting started

DataFrames APIs support elaborate methods for slicing-and-dicing the data. It includes operations such as "selecting" rows, columns, and cells by name or by number, filtering out rows, etc. Statistical data is usually very messy and contains lots of missing and incorrect values and range violations. So a critically important feature of DataFrames is the explicit management of missing data.

We will write a script that:

1. Queries data
2. Reformats data
3. Repartitions the data

Begin by running some boilerplate to import the AWS Glue libraries we'll need and set up a single `GlueContext`.
Then, start a Spark application and create dynamic frame from our the data in S3. 

Some concepts:

- Spark provides a unified platform for writing big data applications, ranging from simple data loading and SQL queries to machine learning and streaming computation over the same engine and with a consistent set of APIs.
- Spark handles loading data from Amazon S3. 
- You control your Spark Application through a driver process called the SparkSession.
- A Spark DataFrame is the most common Structured API and simply represents a table of data with rows and columns. (Not to be confused with R and Python DataFrames. Those (with some exceptions) exist on one machine rather than multiple machines)
- Schema is the list that defines the columns and types within those columns.

**Important** Before running the next step, update the *initials* variable with your initials (e.g. fs-tame-bda-immersion-gdb for Frank Sinatra)

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

initials = "ADD_YOUR_INITIALS_HERE" # <-- Add your initials here!

spark = glueContext.spark_session

datasource0 = glueContext.create_dynamic_frame.from_catalog(
                            database = initials + "-tame-bda-immersion-gdb", 
                            table_name = "raw", 
                            transformation_ctx = "datasource0")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1575272987821_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3. Schema of the Dataset
Next, you can easily examine the schemas that the crawler recorded in the Data Catalog. For example, to see the schema of the `raw` table, run the following code:

Note: To have a look at the schema, i.e. the structure of the DataFrame, we'll use the printSchema method. This will give us the different columns in our DataFrame, along with the data type and the nullable conditions for that particular column


In [3]:
print ("Count: ", datasource0.count())

df = datasource0.toDF()

df.printSchema()

df.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  45000
root
 |-- productName: string (nullable = true)
 |-- product: string (nullable = true)
 |-- department: string (nullable = true)
 |-- color: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- dateSoldSince: string (nullable = true)
 |-- dateSoldUntil: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- campaign: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- hour: string (nullable = true)

+--------------------+--------+----------+------+--------------------+--------------------+--------------------+-----+-----------+----+-----+---+----+
|         productName| product|department| color|            imageUrl|       dateSoldSince|       dateSoldUntil|price|   campaign|year|month|day|hour|
+--------------------+--------+----------+------+--------------------+--------------------+--------------------+-----+-----------+----+-----+---+----+
|Refined Froze

### 4. Selecting Multiple Columns & Filtering Data
We can filter our data based on multiple conditions 

In [4]:
df.filter((df.campaign=='BlackFriday')).select('productName','product', 'department', 'price','campaign').limit(10).show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+----------+-----+-----------+
|         productName| product|department|price|   campaign|
+--------------------+--------+----------+-----+-----------+
|Refined Frozen To...|    Tuna|     Music|  144|BlackFriday|
|Gorgeous Frozen Ball|  Towels|     Games|   33|BlackFriday|
|Unbranded Wooden ...|Sausages|     Books|  123|BlackFriday|
|Handmade Rubber S...|    Bike|     Books|  116|BlackFriday|
|Awesome Rubber Shoes|   Bacon|Automotive|  119|BlackFriday|
| Generic Cotton Bike|    Tuna|     Shoes|  109|BlackFriday|
| Refined Frozen Tuna|     Hat|    Sports|   37|BlackFriday|
|  Sleek Frozen Pants|Computer|     Music|   58|BlackFriday|
|Handcrafted Rubbe...|    Ball|  Outdoors|   45|BlackFriday|
|Practical Soft Co...|  Towels|    Health|   52|BlackFriday|
+--------------------+--------+----------+-----+-----------+

### 5. Perform transformations on data

You can easily transform data.

Let's only keep the fields that we want and rename `imageUrl` to `thumbnailImageUrl`. The dataset is small enough that we can look at the whole thing. The `toDF()` converts a DynamicFrame to a Spark DataFrame, so we can apply the
transforms in SparkSQL.

In [5]:
dsTransformed = datasource0.drop_fields(['color','hour']).rename_field('imageUrl', 'thumbnailImageUrl').rename_field('campaign', 'campaignType')
dfTransformed = dsTransformed.toDF()

dfTransformed.printSchema()

dfTransformed.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- productName: string (nullable = true)
 |-- product: string (nullable = true)
 |-- department: string (nullable = true)
 |-- dateSoldSince: string (nullable = true)
 |-- dateSoldUntil: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- thumbnailImageUrl: string (nullable = true)
 |-- campaignType: string (nullable = true)

+--------------------+--------+----------+--------------------+--------------------+-----+----+-----+---+--------------------+------------+
|         productName| product|department|       dateSoldSince|       dateSoldUntil|price|year|month|day|   thumbnailImageUrl|campaignType|
+--------------------+--------+----------+--------------------+--------------------+-----+----+-----+---+--------------------+------------+
|Refined Frozen To...|    Tuna|     Music|Tue Mar 13 2018 2...|Sat Feb 02 2019 1...|  144|2018|   11| 15|http://lorempixe

### 6. Export transformed data to S3

Let's export the transformed dataset in the previous section to S3. Convert to Parquet format. The following call writes the table across multiple files to support fast parallel reads when doing analysis later:

In [6]:
glueContext.write_dynamic_frame.from_options(frame = dsTransformed,
              connection_type = "s3",
              connection_options = {"path": "s3://" + initials + "-tame-bda-immersion/output-etl-nb-jobs"},
              format = "parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7f4761bb2240>

When execution is finished, go the the S3 folder, and verify that the files are written. For instance: the folder should look something like:

`
2018-12-07 22:42:56      87705 part-00000-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2018-12-07 22:41:55      87572 part-00000-48a202cd-86eb-4109-b3e6-f7f2bef549ef-c000.snappy.parquet
2018-11-21 01:32:34      87572 part-00000-7f23bfb7-7a9f-4eee-bd00-4cf7ab085f57-c000.snappy.parquet
2018-12-07 22:42:56      88180 part-00001-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2018-12-07 22:41:55      88180 part-00001-48a202cd-86eb-4109-b3e6-f7f2bef549ef-c000.snappy.parquet
2018-11-21 01:32:34      88180 part-00001-7f23bfb7-7a9f-4eee-bd00-4cf7ab085f57-c000.snappy.parquet
2018-12-07 22:42:56      87545 part-00002-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2018-12-07 22:41:55      87851 part-00002-48a202cd-86eb-4109-b3e6-f7f2bef549ef-c000.snappy.parquet
2018-11-21 01:32:34      87545 part-00002-7f23bfb7-7a9f-4eee-bd00-4cf7ab085f57-c000.snappy.parquet`

### 7. Repartition Data
**Important:** Before running the cell below, make sure you are using the correct S3 path.


In the previous example, the data was exported to multiple S3 objects in parquet format. Since the data is small, let's combine them in a single partition.

#### 7.1 Combine into a Single Partition
To put all the history data into a single file, we need to convert it to a data frame, repartition it, and
write it out.

In [9]:
dfSinglePartition = dfTransformed.repartition(1)
dfSinglePartition.write.parquet('s3://' + initials + '-tame-bda-immersion/output-etl-nb-jobs/singlePartition')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

When execution is finished, go the the S3 folder, and verify that the files are written. For instance: the folder should look something like:

`2018-12-07 22:55:13    1435146 part-00000-95ad4fb6-d178-47ad-8072-d60d8d8e71fd-c000.snappy.parquet`

#### 7.2 Repartition Based on a Field

Or if you want to separate it by the  `department`:

**Update 2-Dec-2019:** If you get an error that the spark job is aborted, try the command with "s3a://" instead of "s3://". More details are here: https://issues.apache.org/jira/browse/HADOOP-10400 

In [10]:
dfTransformed.write.parquet(
        's3://' + initials + '-tame-bda-immersion/output-etl-nb-jobs/byDepartment', 
        partitionBy=['department'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Note:**
Many other types of transformations could be done, such as joining tables. AWS Glue makes it easy to write it to relational databases like Redshift even with semi-structured data. It offers a transform, relationalize(), that flattens DynamicFrames no matter how complex the objects in the frame may be.

### 8. Putting it together
Great! We now have the final table that we'd like to use for analysis in S3, the storage layer of our Data Lake in a compact, efficient format for analytics, that we can run SQL over in AWS Glue, Athena, or Redshift Spectrum.
 
Note that, many other types of transformations could be done (e.g. JOIN operations). We leave it to your imagination :) 


### 9. Congratulations! 
You've Finished this lab. 

**Very Important:** SageMaker Notebooks run on EC2, and therefore you will be billed by the second unless you save your work (by downloading to your local computer) & terminate the SageMaker notebook instance. 

### 10. Cleaning up resources 

Please 
 1. download this notebook to your computer by selecting ` File -> Download as -> Notebook (.ipynb)`. 
 1. Terminate this instance. Remember that you can always recreate it from the `AWS Glue Console` by selecting the terminated instance and `Cloning` its configuration.
 
 Thank you.
