# Serverless Data Lake Immersion
## Lab 2.3 - Advanced Data Preparation with AWS Glue Interactive Sessions
This example shows how to do joins and filters with transforms on DynamicFrames.
For purposes of this Notebook, you need to have done the previous Labs (you should have a Database called `sdl-demo-data` your AWS Glue Data Catalog) as described in the lab guide.
### Getting started
**Important:** Before running the next step, update the *account_number* variable with your AWS Account Number (e.g. 0123456789112 for the Amazon S3 bucket called 'sdl-immersion-day--0123456789112'

DataFrames APIs support elaborate methods for slicing-and-dicing data. This includes operations such as "selecting" rows, columns, and cells by name or by number, filtering out rows, etc. Statistical data is usually very messy and contains lots of missing and incorrect values and range violations. Therefore, a critically important feature of DataFrames is the explicit management of missing data.
We will write a script that:
1. Queries data
2. Reformats data
3. Repartitions the data

Begin by running some boilerplate to import the AWS Glue libraries we'll need and set up a single `GlueContext`.
Then, start a Spark application and create dynamic frame from our the data in Amazon S3.
Some concepts:
- Spark provides a unified platform for writing big data applications, ranging from simple data loading and SQL queries to machine learning and streaming computation over the same engine and with a consistent set of APIs.
- Spark handles loading data from Amazon S3.
- You control your Spark Application through a driver process called the SparkSession.
- A Spark DataFrame is the most common Structured API and simply represents a table of data with rows and columns. (Not to be confused with R and Python DataFrames. Those (with some exceptions) exist on one machine rather than multiple machines)
- A schema defines the columns and data types within those columns.


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

account_number = "ADD_YOUR_AWS_ACCOUNT_NUMBER_HERE" # <-- Add your aws account number here!

spark = glueContext.spark_session

datasource0 = glueContext.create_dynamic_frame.from_catalog(
                            database = "sdl-demo-data", 
                            table_name = "raw", 
                            transformation_ctx = "datasource0")

### Schema of the Dataset
Next, you can easily examine the schemas that the crawler recorded in the Data Catalog. For example, to see the schema of the `raw` table, run the following code.

**Note:** To have a look at the schema, i.e. the structure of the DataFrame, we'll use the *printSchema* method. This will give us the different columns in our DataFrame, along with the data type and the nullable conditions for that particular column


In [2]:
print ("Count: ", datasource0.count())

df = datasource0.toDF()

df.printSchema()

df.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  10100
root
 |-- productName: string (nullable = true)
 |-- color: string (nullable = true)
 |-- department: string (nullable = true)
 |-- product: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- dateSoldSince: string (nullable = true)
 |-- dateSoldUntil: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- campaign: string (nullable = true)
 |-- partition_0: string (nullable = true)
 |-- partition_1: string (nullable = true)
 |-- partition_2: string (nullable = true)
 |-- partition_3: string (nullable = true)

+--------------------+-------+----------+--------+--------------------+--------------------+--------------------+-----+-----------+-----------+-----------+-----------+-----------+
|         productName|  color|department| product|            imageUrl|       dateSoldSince|       dateSoldUntil|price|   campaign|partition_0|partition_1|partition_2|partition_3|
+--------------------+-------+----------+--------+--------------------+--------

### Selecting Multiple Columns & Filtering Data
We can filter our data based on multiple conditions.

In [3]:
df.filter((df.campaign=='BlackFriday')).select('productName','product', 'department', 'price','campaign').limit(10).show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+-----------+-----+-----------+
|         productName| product| department|price|   campaign|
+--------------------+--------+-----------+-----+-----------+
|Unbranded Wooden ...|    Fish|  Computers|   55|BlackFriday|
|Unbranded Frozen Hat|    Tuna| Automotive|   85|BlackFriday|
|Sleek Metal Sausages|    Ball|   Jewelery|   57|BlackFriday|
|  Tasty Plastic Soap|   Chips|Electronics|   19|BlackFriday|
|Awesome Cotton Ch...|     Hat|      Tools|  138|BlackFriday|
|Handcrafted Fresh...|   Chair|     Sports|   51|BlackFriday|
|Gorgeous Plastic ...|   Chips|       Toys|   26|BlackFriday|
|Practical Plastic...|Keyboard|   Outdoors|   28|BlackFriday|
|    Tasty Soft Chips|    Fish| Industrial|   33|BlackFriday|
|Awesome Concrete ...|Computer|       Toys|   35|BlackFriday|
+--------------------+--------+-----------+-----+-----------+

### Perform transformations on data

You can easily transform data.

Let's only keep the fields that we want and rename `imageUrl` to `thumbnailImageUrl`. The dataset is small enough that we can look at the whole thing. The `toDF()` converts a DynamicFrame to a Spark DataFrame, so we can apply the
transforms in SparkSQL.

In [4]:
dsTransformed = datasource0.drop_fields(['color','hour']).rename_field('imageUrl', 'thumbnailImageUrl').rename_field('campaign', 'campaignType')
dfTransformed = dsTransformed.toDF()

dfTransformed.printSchema()

dfTransformed.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- productName: string (nullable = true)
 |-- department: string (nullable = true)
 |-- product: string (nullable = true)
 |-- dateSoldSince: string (nullable = true)
 |-- dateSoldUntil: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- partition_0: string (nullable = true)
 |-- partition_1: string (nullable = true)
 |-- partition_2: string (nullable = true)
 |-- partition_3: string (nullable = true)
 |-- thumbnailImageUrl: string (nullable = true)
 |-- campaignType: string (nullable = true)

+--------------------+----------+--------+--------------------+--------------------+-----+-----------+-----------+-----------+-----------+--------------------+------------+
|         productName|department| product|       dateSoldSince|       dateSoldUntil|price|partition_0|partition_1|partition_2|partition_3|   thumbnailImageUrl|campaignType|
+--------------------+----------+--------+--------------------+--------------------+-----+-----------+-----------+-----------+-----

### Export the transformed data to Amazon S3
Let's export the transformed dataset in the previous section to Amazon S3. Convert to Parquet format. The following call writes the table across multiple files to support fast parallel reads when doing analysis later.

In [5]:
glueContext.write_dynamic_frame.from_options(frame = dsTransformed,
              connection_type = "s3",
              connection_options = {"path": "s3a://sdl-immersion-day-" + account_number + "/output-etl-nb-jobs"},
              format = "parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7ff3dc08b828>

When execution is finished, go the the Amazon S3 folder, and verify that the files are written. For instance, the folder should look something like:

`2021-06-15 14:30:01      87705 part-00000-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2021-06-15 14:30:01      88180 part-00001-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2021-06-15 14:30:01      87545 part-00002-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2021-06-15 14:30:01      87705 part-00003-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2021-06-15 14:30:01      88180 part-00004-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet
2021-06-15 14:30:02      87545 part-00005-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet`

### Repartition Data
**Important:** Before running the cell below, make sure you are using the correct Amazon S3 path.

In the previous example, the data was exported to multiple Amazon S3 objects in parquet format. Since the data is small, let's combine them in a single partition.
#### Combine into a Single Partition
To put all the history data into a single file, we need to convert it to a data frame, repartition it, and
write it out.

In [6]:
dfSinglePartition = dfTransformed.repartition(1)
dfSinglePartition.write.parquet('s3://sdl-immersion-day-' + account_number + '/output-etl-nb-jobs/singlePartition')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

When execution is finished, go the the Amazon S3 folder, and verify that the files are written. For instance: the folder should look something like:

`2021-06-15 14:30:05    1435146 part-00000-95ad4fb6-d178-47ad-8072-d60d8d8e71fd-c000.snappy.parquet`

#### Repartition Based on a Field
Or if you want to separate it by the  `department`:


In [7]:
dfTransformed.write.parquet(
        's3a://sdl-immersion-day-' + account_number + '/output-etl-nb-jobs/byDepartment', 
        partitionBy=['department'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Note:**
Many other types of transformations could be done, such as joining tables. AWS Glue makes it easy to write it to relational databases like Amazon Redshift even with semi-structured data. It offers a transform, relationalize(), that flattens DynamicFrames no matter how complex the objects in the frame may be.

### Putting it together
Great! We now have the final table that we'd like to use for analysis in Amazon S3, the storage layer of our Data Lake in a compact, efficient format for analytics, that we can run SQL over in AWS Glue, Amazon Athena, or Amazon Redshift Spectrum.
 
Note that, many other types of transformations could be done (e.g. JOIN operations). We leave it to your imagination :)


### Congratulations!
You've Finished this lab.

If you want, you can click the **Save** button at the top of this notebook and safe this as an AWS Glue Job.