# Execute Incremental Processing Job with AWS Glue <a name="top"></a>

## Table of Contents:

1. [Overview](#Overview)
2. [Execute the Full Load Pipeline](#Execute-the-Full-Load-Pipeline)
3. [Execute the Incremental Load Pipeline](#Execute-the-Incremental-Load-Pipeline)
4. [Wrap-up](#Wrap-up)

## Overview
[(Back to the top)](#top)

We will continue this module to implement the data pipeline below in this notebook. 

<img src="../resources/module2_architecture_diagram.png" alt="Module2 Architecture Diagram]" style="width: 1000px;"/>

In this notebook, we will run the following steps :

* Define a AWS Glue Database Connection for the Amazon Redshift Database.
* Crawl the Amazon Redshift Database to load the tables in the AWS Glue Catalog.
* Execute the full load job.
* Deploy and execute the incremental job with AWS Glue Bookmarking enabled.
* Execute some Insert statements.
* Crawl the incremental data tables. 
* Run the incremental job and Validate Results.
* Repeat steps 5 and 6 to demonstrate the AWS Glue Bookmarking feature.

## Execute the Full Load Pipeline
[(Back to the top)](#top)


#### Define the AWS Glue Database Connection

**Step 1**: Create a AWS Glue Database connection to the Amazon Redshift Database:

- Navigate to the AWS Glue console at Services -> AWS Glue
- From the left-hand panel menu, navigate to Data Catalog -> Databases -> Connections.
- Click on the button ‘Add Connection’ to create a new AWS Glue Database Connection.
- Fields to fill in:
   - Page:  Set up your connection’s properties. 
      - Connection name: **redshiftdb**
      - Connection type: **Amazon Redshift**
   - Page: Set up access to your data store.
      - Cluster: Select the Redshift Cluster
      - Database name: **sales_analytics_dw**
      - Username: **awsuser**
      - Password: **S3cretPwd99**
- Click on the button ‘Finish’ to create the AWS Glue Database Connection.
- Select the new AWS Glue Database Connection and click on 'Test Connection' to test connectivity.

#### Crawl the Redshift Schema


**Step 2**: Let's run a AWS Glue Crawler on the schema in the Amazon Redshift Database:

- Navigate to the AWS Glue console at Services -> AWS Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new AWS Glue Crawler.
- Fields to fill in:
  - Page: Add information about your crawler
     - Crawler name: **redshift_sales_analytics_crawler**
  - Page: Add a data store
     - Choose a data store: **JDBC**
     - Connection: **redshiftdb**
     - Include path: **sales_analytics_dw/public/%**
  - Page: Choose an IAM role
     - IAM Role: Choose the IAM Role **###iam_role###**
  - Page: Configure the crawler's output
     - Database: Click on ‘Add database’ and enter database name as **redshift_sales_analytics**.
    
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on 'Run crawler' to run the Crawler.


In [None]:
## We will simulate the AWS Glue job arguments 
import sys
sys.argv = ["load_SALES_ORDER_FACT.py","--JOB_NAME", "load_SALES_ORDER_FACT","--TempDir","s3://###s3_bucket###/data/temp/"]

In [None]:
## Glue boilerplate code

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3, json
from awsglue.context import GlueContext, DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
print (args['JOB_NAME']+" START...")
if 'sc' not in vars(): sc = SparkContext()
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## Glue boilerplate code

Let's read the first table - SALES_ORDER:

In [None]:
datasource0 = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://###s3_bucket###/dms-full-load-path/salesdb/SALES_ORDER/"]}, format="parquet")
print "Rows read from table: SALES_ORDER : "+str(datasource0.count())
datasource0.printSchema()

Let's read the second table - SALES_ORDER_DETAIL:

In [None]:
datasource1 = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://###s3_bucket###/dms-full-load-path/salesdb/SALES_ORDER_DETAIL/"]}, format="parquet")
print "Rows read from table: SALES_ORDER_DETAIL : "+str(datasource1.count())
datasource1.printSchema()

We will use AWS Glue's Join syntax to join the tables:

In [None]:
datasource2=datasource0.join( ["ORDER_ID"],["ORDER_ID"], datasource1, transformation_ctx = "join")
print " Rows after Join transform: "+str(datasource2.count())
datasource2.printSchema()

Let's use Spark SQL to add some computed columns to the data - EXTENDED_PRICE and PROFIT

- EXTENDED_PRICE = QUANTITY * UNIT_PRICE
- PROFIT = QUANTITY * ( UNIT_PRICE - SUPPLY_COST )

In [None]:
datasource2.toDF().createOrReplaceTempView("tbl0") 
df1 = spark.sql("Select a.*, bround(a.QUANTITY*a.UNIT_PRICE,2) as EXTENDED_PRICE, \
bround(QUANTITY*(UNIT_PRICE-SUPPLY_COST) ,2) as PROFIT, \
DATE_FORMAT(ORDER_DATE,'yyyyMMdd') as DATE_KEY \
from (Select * from tbl0) a")
df1.show(5)
datasource4=DynamicFrame.fromDF(df1, glueContext,'datasource4')

Let's map the columns to the target table schema:

In [None]:
applymapping_dynf = ApplyMapping.apply(frame = datasource4, mappings = [("DISCOUNT", "decimal(10,2)", "discount", "decimal(10,2)"), ("UNIT_PRICE", "decimal(10,2)", "unit_price", "decimal(10,2)"), ("TAX", "decimal(10,2)", "tax", "decimal(10,2)"), ("SUPPLY_COST", "decimal(10,2)", "supply_cost", "decimal(10,2)"), ("PRODUCT_ID", "int", "product_id", "int"), ("QUANTITY", "int", "quantity", "int"), ("LINE_ID", "int", "line_id", "int"), ("LINE_NUMBER", "int", "line_number", "int"), ("ORDER_DATE", "date", "order_date", "date"), ("SHIP_MODE", "string", "ship_mode", "string"), ("SITE_ID", "double", "site_id", "int"), ("PROFIT", "decimal(10,2)", "profit", "decimal(10,2)"),("EXTENDED_PRICE", "decimal(10,2)", "extended_price", "decimal(10,2)"),("DATE_KEY", "string", "date_key", "string"),("ORDER_ID", "int", "order_id", "int")], transformation_ctx = "applymapping1")
applymapping_dynf.toDF().show(5)

Finally, let's insert the records in the Amazon Redshift target table - sales_order_fact:

In [None]:
redshift_database_name='redshift_sales_analytics'
redshift_table_name='sales_analytics_dw_public_sales_order_fact'

datasink3 = glueContext.write_dynamic_frame.from_catalog(frame = applymapping_dynf, database = redshift_database_name, table_name = redshift_table_name, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink3")
datasink3.toDF().show(5)
print ("Rows inserted into Amazon Redshift table : %s : %s "%(redshift_table_name,str(datasink3.count())))

job.commit()

Let's verify the data inserted into the Amazon Redshift table:

In [None]:
%local 
import redshift_utils

redshift_utils.execute_redshift_query("Select count(distinct (order_id)) from sales_order_fact")

## Execute the Incremental Load Pipeline
[(Back to the top)](#top)

### Push Incremental data

<div class="alert alert-block alert-info"><b>Note:</b> Let's run the "generate_orders(100)" cell from the 1st notebook in this module to push some Inserts through.</div>

**Step 3** : Run the "generate_orders(100)" cell in Notebook 1.

AWS DMS should replicate the new inserts to our Amazon S3 bucket in a minute. 

We can run an AWS CLI command to verify that the incremental files has been dropped by AWS DMS to our Amazon S3 bucket:


In [None]:
%%sh
aws s3 ls s3://###s3_bucket###/dms-full-load-path/salesdb/SALES_ORDER/

### Crawl the DMS Incremental Output Data

Let's define and run crawlers to define AWS Glue tables over the incremental data. Note that:

- We have an Exclude Pattern (LOAD*) to exclude the AWS DMS Full load file. 
- We add a Prefix (INCR_) added to the incremental tables so that we can clearly identify them.

<div class="alert alert-block alert-warning"><b>Note:</b> Make sure you do not miss the Exclude Pattern (LOAD*) and the Prefix (INCR_) added to tables in the Crawler definition.</div>

**Step 4**: The first crawler should create a table for the changes to the SALES_ORDER table:

- Navigate to the AWS Glue console at Services -> AWS Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new AWS Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **incr_sales_order_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/dms-full-load-path/salesdb/SALES_ORDER/**
        - Exclude Pattern : **LOAD\***
    - Page: Choose an IAM role
       - IAM Role: Choose the **###iam_role###**
    - Page: Configure the crawler's output
        - Database:  Click on ‘Add database’ and enter database name as **mysql_dms_salesdb**
        - Prefix added to tables (optional): **INCR_**
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.

**Step 5**: And the second crawler should create a table for the changes to the SALES_ORDER_DETAIL table:

- Navigate to the AWS Glue console at Services -> AWS Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new AWS Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **incr_sales_order_detail_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/dms-full-load-path/salesdb/SALES_ORDER_DETAIL/**
        - Exclude Pattern : **LOAD\***
    - Page: Choose an IAM role
       - IAM Role: Choose the **###iam_role###**
    - Page: Configure the crawler's output
        - Database:  Select the database **mysql_dms_salesdb**
        - Prefix added to tables (optional): **INCR_**
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.

### Deploy and execute the Incremental Job with AWS Glue Bookmarking enabled

Let's now deploy the AWS Glue Job for the incremental load using the AWS SDK below. The following are noteworthy in our AWS Glue Job definition:

- The following line enables the AWS Glue Bookmarking feature for the AWS Glue job:

```
'--job-bookmark-option': 'job-bookmark-enable'
```

 - The following line ensures that AWS Glue creates private ENIs (Elastic Network Interfaces) within the VPC to connect to our Amazon Redshift Database instance:

```
Connections={'Connections': [redshift_database_connection]},
```

- The following line allocates the capacity allocated for this AWS Glue Job. We have allocated 3 DPUs. (1 DPU = 4 vCPUs and 16 GB of memory with a 50 GB disk and 2 executors)

```
MaxCapacity=3.0
```


You can read more about AWS Glue Job properties and Capacity options here: https://docs.aws.amazon.com/en_us/glue/latest/dg/add-job.html

In [None]:
%local

import boto3

acct_number=boto3.client('sts').get_caller_identity().get('Account')
bucket='###s3_bucket###'
redshift_database_connection='redshiftdb'

# Create the AWS Glue Spark Jobs
glue = boto3.client("glue")

for job_name in ['incr_load_SALES_ORDER_FACT']:
    response=glue.create_job(Name=job_name,
                         Role=f"arn:aws:iam::{acct_number}:role/###iam_role###",
                         ExecutionProperty={'MaxConcurrentRuns': 1},
                         Command={'Name': 'glueetl',
                                  'ScriptLocation': f's3://{bucket}/scripts/{job_name}.py',
                                  'PythonVersion': '3'},
                         DefaultArguments={'--TempDir': f's3://{bucket}/temp',
                                           '--enable-continuous-cloudwatch-log': 'true',
                                           '--enable-glue-datacatalog': '',
                                           '--enable-metrics': '',
                                           '--enable-spark-ui': 'true',
                                           '--spark-event-logs-path': f's3://{bucket}/spark_glue_etl_logs/{job_name}',
                                           '--job-bookmark-option': 'job-bookmark-enable',
                                           '--job-language': 'python',
                                           '--S3_BUCKET': bucket },
                         Connections={'Connections': [redshift_database_connection]},
                         MaxRetries=0,
                         Timeout=2880,
                         MaxCapacity=3.0,
                         GlueVersion='1.0',
                         Tags={'Owner': 'Glue_Labs'}
                        )
    print (response)

**Step 6**: To run the AWS Glue Job:

- Navigate to the AWS Glue console at Services -> AWS Glue
- From the left-hand panel menu, navigate to ETL -> Jobs
- Select the job 'incr_load_SALES_ORDER_FACT'
- And click on the button 'Action -> Run job'
- Accept all Default arguments and click on the 'Run job' button.

As the job is running, we can inspect the logs and monitor the run from the AWS Glue console.

<img src="../resources/glue_logs.png" alt="AWS_Glue_Logs" style="width: 400px;"/>

In [None]:
%local 
redshift_utils.execute_redshift_query("Select count(distinct order_id) from sales_order_fact")

**Step 7** : Finally, let us repeat the steps for some more incremental data:

- Insert 100 more orders

<div class="alert alert-block alert-info"><b>Note:</b> Let's run the "generate_orders(100)" cell from the 1st notebook in this module to push some Inserts through.</div>

In [None]:
%%sh
aws s3 ls s3://###s3_bucket###/dms-full-load-path/salesdb/SALES_ORDER/

- Rerun the AWS Glue Job - incr_load_SALES_ORDER_FACT
- Validate the record count in the Amazon Redshift table in the cell below.


In [None]:
%local 
redshift_utils.execute_redshift_query("Select count(distinct order_id) from sales_order_fact")


We can see that each run correctly identifies the incremental data and pushes it to the Amazon Redshift table. AWS Glue Bookmarking maintains the state of which file has been processed by each run and ensures that subsequent runs only picks up newer files.

You can read more on how AWS Glue Bookmarking works here: https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

## Wrap-up
[(Back to the top)](#top)

In this notebook, we ran exercises to: 

1. Load a table to an Amazon Redshift datawarehouse.
2. Add computed columns to a dataframe using Spark SQL.
3. Read incremental data from Amazon S3 as AWS Glue Tables using AWS Glue Crawlers and Exclusion Patterns.
4. Finally, Push the incremental data to a Amazon Redshift table using the AWS Glue Bookmarking feature.