# Working with AWS DMS, Athena and Apache Hudi Deltastreamer 

In this notebook, you will learn to use DeltaStreamer Utility to bulk insert data into a Hudi Dataset as a Copy on Write(CoW) and Merge on Write (MOR) using a DMS Full load and CDC task as a source.  

We will run queries in hudi-cli and SparkSQL to verify the tables and subsequent updates are incorporated into our datalake on Amazon S3




##  Prerequisite

#### Complete the setup of the DMS environment (first notebook)

<img src="https://i.ibb.co/7nPtdfG/hudi-s1.png" alt="hudi-s1" border="0">

### Install Python AWS Wrangler 

## Run DeltaStreamer to write a Copy on Write (COW) table

We will now run the DeltaStreamer utility as an EMR Step to write the above JSON formatted data into a Hudi dataset. To do that, we will need the following:

* Properties file on localfs or dfs, with configurations for Hudi client, schema provider, key generator and data source 



In [3]:
import awswrangler as wr
import boto3

boto3.setup_default_session(region_name="us-west-2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Submit the bulk insert job to create Copy on Write (COW) table

### Bulk insert from parquet source table

In [4]:
#COW
# SOURCE TABLE sporting_event_ticket

cluster_id = "j-XXXXX"

source_bucket="hudi-dms-repl-XXXXXX"
source_key="cdc/dms_sample/sporting_event_ticket/"
target_key="cdc/dms_sample/hudi_sporting_event_ticket/"
target_table="hudi_sporting_event_ticket"
ordering_field="op_cdc_timestamp"
id_field="id"
partition_field="seat_row"


command= f"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
--jars /usr/lib/spark/external/lib/spark-avro_2.11-2.4.5-amzn-0.jar  \
--master yarn  \
file:///usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar  \
--table-type COPY_ON_WRITE  \
--continuous
-- mac 60
--op UPSERT \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource  \
--source-ordering-field "+ordering_field+"  \
--target-base-path s3://"+source_bucket+"/"+target_key+"  \
--target-table "+target_table+"  \
--transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer  \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload  \
--enable-hive-sync  \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://"+source_bucket+"/"+source_key+"   \
--hoodie-conf hoodie.datasource.write.recordkey.field="+id_field+"  \
--hoodie-conf hoodie.datasource.write.partitionpath.field="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.database=default  \
--hoodie-conf hoodie.datasource.hive_sync.table="+target_table+"   \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor  \
"
print(command)
print("-------------")

#Submit the job to the specified cluster (step)
step_id = wr.emr.submit_step(cluster_id, command)
print("STEP {} initiated on Cluster {}".format(step_id,cluster_id))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --jars /usr/lib/spark/external/lib/spark-avro_2.11-2.4.5-amzn-0.jar  --master yarn  file:///usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar  --table-type COPY_ON_WRITE  --op BULK_INSERT --source-class org.apache.hudi.utilities.sources.ParquetDFSSource  --source-ordering-field op_cdc_timestamp  --target-base-path s3://hudi-dms-repl-dmslabs3bucket-1q0p4wdv4oi0j/dms_sample/hudi_sporting_event_ticket/  --target-table hudi_sporting_event_ticket  --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer  --payload-class org.apache.hudi.payload.AWSDmsAvroPayload  --enable-hive-sync  --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://hudi-dms-repl-dmslabs3bucket-1q0p4wdv4oi0j/dms_sample/sporting_event_ticket/   --hoodie-conf hoodie.datasource.write.recordkey.field=id  --hoodie-conf hoodie.datasource.write.partitionpath.field=seat_row --hoodie-conf hoodie.datasource.hive_sync.part

In [5]:
#Check the status of the step

print(wr.emr.get_step_state(cluster_id, step_id))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PENDING

### Lets verify the records on Athena

1. Go to <strong>Athena</strong>, and in the tables list (left panel) select the database <strong>"default"</strong>, you can now see the table "hudi_sporting_event_ticket" as part of the Glue Data Catalog

2. Execute the following query to filter some example records and see the outputs.

<ul>
<p style="color:BLUE">
SELECT id,ticket_price FROM "default"."hudi_sporting_event_ticket" where id > 1 and id < 51;
</p>
    
<a href="https://ibb.co/yRsSJ95"><img src="https://i.ibb.co/g7gVC2w/athena-screenshot-2.png" alt="athena-screenshot-2" border="0"></a>
</ul>

### Lets do some updates on the source Postgres

BEGIN;   \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '1';  \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '11'; \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '21'; \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '31'; \
COMMIT;

In [146]:
#UPSERT COW

command= f"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
--jars /usr/lib/spark/external/lib/spark-avro_2.11-2.4.5-amzn-0.jar  \
--master yarn  \
file:///usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar  \
--table-type COPY_ON_WRITE  \
--op UPSERT \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource  \
--source-ordering-field "+ordering_field+"  \
--target-base-path s3://"+source_bucket+"/"+target_key+"  \
--target-table "+target_table+"  \
--transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer  \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload  \
--enable-hive-sync  \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://"+source_bucket+"/"+source_key+"   \
--hoodie-conf hoodie.datasource.write.recordkey.field="+id_field+"  \
--hoodie-conf hoodie.datasource.write.partitionpath.field="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.database=default  \
--hoodie-conf hoodie.datasource.hive_sync.table="+target_table+"   \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor  \
"
print(command)
print("-------------")

#Submit the job to the specified cluster (step)
step_id7 = wr.emr.submit_step(cluster_id, command)
print("STEP {} initiated on Cluster {}".format(step_id7,cluster_id))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --jars /usr/lib/spark/external/lib/spark-avro_2.11-2.4.5-amzn-0.jar  --master yarn  file:///usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar  --table-type COPY_ON_WRITE  --op UPSERT --source-class org.apache.hudi.utilities.sources.ParquetDFSSource  --source-ordering-field op_cdc_timestamp  --target-base-path s3://hudi-dms-repl-dmslabs3bucket-1q0p4wdv4oi0j/dms_sample/hudi_sporting_event_ticket/  --target-table hudi_sporting_event_ticket  --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer  --payload-class org.apache.hudi.payload.AWSDmsAvroPayload  --enable-hive-sync  --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://hudi-dms-repl-dmslabs3bucket-1q0p4wdv4oi0j/dms_sample/sporting_event_ticket/   --hoodie-conf hoodie.datasource.write.recordkey.field=id  --hoodie-conf hoodie.datasource.write.partitionpath.field=seat_row --hoodie-conf hoodie.datasource.hive_sync.partition

In [187]:
print(wr.emr.get_step_state(cluster_id, step_id))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

COMPLETED

## Submit job to create Merge on Read (MOR) table  

In [124]:
#MOR
# TABLE sporting_event

source_key="cdc/dms_sample/sporting_event_ticket/"
target_key="cdc/dms_sample/hudi_sporting_event_ticket_mor/"
target_table="hudi_sporting_event_ticket_mor"
ordering_field="op_cdc_timestamp"
id_field="id"
partition_field="seat_row"


command= f"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
--jars /usr/lib/spark/external/lib/spark-avro.jar  \
--master yarn  \
file:///usr/lib/hudi/hudi-utilities-bundle.jar  \
--table-type MERGE_ON_READ  \
--op INSERT \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource  \
--source-ordering-field "+ordering_field+"  \
--target-base-path s3://"+source_bucket+"/"+target_key+"  \
--target-table "+target_table+"  \
--transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer  \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload  \
--enable-hive-sync  \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://"+source_bucket+"/"+source_key+"   \
--hoodie-conf hoodie.datasource.write.recordkey.field="+id_field+"  \
--hoodie-conf hoodie.datasource.write.partitionpath.field="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.database=default  \
--hoodie-conf hoodie.datasource.hive_sync.table="+target_table+"   \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor  \
"
#print(command)


#Submit the job to the specified cluster (step)
step_id3 = wr.emr.submit_step(cluster_id, command)
print("STEP {} initiated on Cluster {}".format(step_id3,cluster_id))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

STEP s-J0DVYZOCOO6S initiated on Cluster j-3PNYPQ5J3JROS

In [125]:
#Check the status of the step

print(wr.emr.get_step_state(cluster_id, step_id3))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

COMPLETED

# Wait the step to finish and query the generated table on Athena

## Run some CDC updates on the source Postgres database

Let´s generate some CDC data, by running updates now on the fake profile data generated. 

BEGIN;   \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '1';  \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '11'; \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '21'; \
UPDATE dms_sample.sporting_event_ticket SET ticket_price = '111.99' WHERE  id = '31'; \
COMMIT;

## Start a DeltaStreamer upsert job on MOR Table to update the records

In [126]:
# SOURCE TABLE sporting_event_ticket 

target_key="cdc/dms_sample/hudi_sporting_event_ticket_mor/"
target_table="cdc/hudi_sporting_event_ticket_mor"
ordering_field="op_cdc_timestamp"

command= f"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
--jars /usr/lib/spark/external/lib/spark-avro.jar  \
--master yarn  \
file:///usr/lib/hudi/hudi-utilities-bundle.jar  \
--table-type MERGE_ON_READ  \
--op UPSERT \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource  \
--source-ordering-field "+ordering_field+"  \
--target-base-path s3://"+source_bucket+"/"+target_key+"  \
--target-table "+target_table+"  \
--transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer  \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload  \
--enable-hive-sync  \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://"+source_bucket+"/"+source_key+"   \
--hoodie-conf hoodie.datasource.write.recordkey.field="+id_field+"  \
--hoodie-conf hoodie.datasource.write.partitionpath.field="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields="+partition_field+" \
--hoodie-conf hoodie.datasource.hive_sync.database=default  \
--hoodie-conf hoodie.datasource.hive_sync.table="+target_table+"   \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor  \
"
#print(command)


#Submit the job to the specified cluster (step)
step_id5 = wr.emr.submit_step(cluster_id, command)
print("STEP {} initiated on Cluster {}".format(step_id5,cluster_id))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

STEP s-3UZ8XM2QJD92M initiated on Cluster j-3PNYPQ5J3JROS

## Run again the SQL statement on Athena to verify the updated records

2. Execute the same above query to filter some example records and verify the UPDATES.

<ul>
<p style="color:BLUE">
SELECT * FROM "default"."hudi_sporting_event_ticket" where id > 1 and id < 51;
</p>

<a href="https://ibb.co/yRsSJ95"><img src="https://i.ibb.co/g7gVC2w/athena-screenshot-2.png" alt="athena-screenshot-2" border="0"></a>
</ul>

### Finally, check the Hudi CLI to list the different COMMITS

Now lets check out Hudi CLI

Execute on the Terminal: <strong> hudi-cli </strong> to access the HUDI CLI, and execute the following commands:

<strong>hudi:</strong> connect --path s3://XXXXXXX/landing-hudi/co/tickets/dms_sample/hudi_sporting_event_ticket/

<strong>hudi: hudi_sporting_event_ticket -> </strong> commits show

See the different commits (you can rollback to a specific one)

```
20/05/06 06:47:14 INFO timeline.HoodieActiveTimeline: Loadedinstants java.util.stream.ReferencePipeline$Head@d689f6
20/05/06 06:47:15 INFO s3n.S3NativeFileSystem: Opening 's3://XXXXXXX/landing-hudi/co/tickets/dms_sample/hudi_sporting_event_ticket/.hoodie/20200506063222.commit' for reading
20/05/06 06:47:15 INFO s3n.S3NativeFileSystem: Opening 's3://XXXXX/landing-hudi/co/tickets/dms_sample/hudi_sporting_event_ticket/.hoodie/20200506052917.commit' for reading
╔════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime     │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20200506063222 │ 21.6 MB             │ 0                 │  4                  │ 1                        │ 150000                │ 140000                       │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200506052917 │ 21.6 MB             │ 10                │ 0                   │ 1                        │ 150000                │ 0                            │ 0            ║
╚════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

```