# Lab : Batch Updates to a S3 Datalake using Apache Hudi

# Copy on Write 

## Table of Contents:

- 1. [Overview](#Overview)
- 2. [Copy On Write](#Copy-On-Write)
   - 2.1 [Bulk Insert the Initial Dataset](#Bulk-Insert-the-Initial-Dataset)
   - 2.2 [Batch Upsert some records](#Batch-Upsert-some-records)
   - 2.3 [Incremental-View](#Incremental-View )
   - 2.3 [Deleting Records](#Deleting-Records.)  
- 3. [Rollback](#Rollback)
- 4. [Advanced - Understanding Hudi Commits](#Advanced:Understanding-Hudi-Commits)


## Overview

This notebook demonstrates using PySpark on [Apache Hudi](https://aws.amazon.com/emr/features/hudi/) on Amazon EMR to insert/upsert/delete records to an S3 data lake.

Here is a good reference link to read later:

* [How Hudi Works](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-how-it-works.html)

This notebook covers the following concepts when writing Copy On Write tables to an S3 Datalake:

- Write Hudi Spark jobs in PySpark.
- Bulk Insert the Initial Dataset.
- Write a Non-Partitioned table.
- Sync the Hudi tables to the Hive/Glue Catalog.
- Upsert some records to a Hudi table.
- Delete records from a Hudi table.
- Rollback.
- Understanding Hudi commits.





#### Pre-requisites

### This demo is based on Hudi version 0.8.0 and runs fine on Jupyter Notebooks connected to a 1 node (r5.4xlarge) EMR cluster with configuration listed below 

 - EMR versions 6.5.0 
 
 - Software configuration

       - Hadoop 3.2.1
       - Hive 3.1.2
       - Livy 0.7.1
       - JupyterHub 1.4.1
       - Spark 3.1.2
       
       
 - AWS Glue Data Catalog settings - Select the below listed check boxes
       - Use for Hive table metadata  
       - Use for Spark table metadata



### Connect to the Master Node of EMR cluster Using SSH :
    - ssh -i ~/xxxx.pem hadoop@<ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com>

    - Ensure  the below listed files are copied into HDFS.

    - hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar hdfs:///user/hadoop/

    - hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar hdfs:///user/hadoop/
    
    - hdfs dfs -copyFromLocal /usr/share/aws/aws-java-sdk/aws-java-sdk-bundle-1.12.31.jar hdfs:///user/hadoop/

- https://github.com/apache/hudi/issues/5053

Let's start by initializing the Spark Session to connect this notebook to our Spark EMR cluster:

In [8]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///user/hadoop/aws-java-sdk-bundle-1.12.31.jar, hdfs:///user/hadoop/hudi-spark-bundle.jar,hdfs:///user/hadoop/spark-avro.jar",
             "spark.sql.hive.convertMetastoreParquet":"false",     
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
             "spark.dynamicAllocation.executorIdleTimeout": 3600,
             "spark.executor.memory": "5G",
             "spark.executor.cores": 3,
             "spark.dynamicAllocation.initialExecutors":5
           } 
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6,application_1648194189527_0007,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1648194189527_0005,pyspark,idle,Link,Link,,
6,application_1648194189527_0007,pyspark,idle,Link,Link,,✔


Make sure you update the bucket name to your unique bucket before running this cell.

In [9]:
## CHANGE ME ##
config = {
    "table_name": "hudi_trips_table",
    "target": "s3://<Your S3 Bucket Here>/hudi/hudi_trips_table",
    "primary_key": "trip_id",
    "sort_key": "tstamp",
    "commits_to_retain": "4"
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The constants for Python to use:

In [10]:
# General Constants
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Functions to create JSON data and Spark dataframe from this data

In [11]:
## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Bulk Insert the Initial Dataset

Let's generate 2M records to load into our Data Lake:

In [12]:
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df1 = create_json_df(spark, get_json_data(0, 2000000, dest))
print(df1.count())
df1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2000000
+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|      Seattle|       A|      0|2022-03-25 09:40:07|
|     New York|       B|      1|2022-03-25 09:40:07|
|   New Jersey|       C|      2|2022-03-25 09:40:07|
|  Los Angeles|       D|      3|2022-03-25 09:40:07|
|    Las Vegas|       E|      4|2022-03-25 09:40:07|
|       Tucson|       F|      5|2022-03-25 09:40:07|
|Washington DC|       G|      6|2022-03-25 09:40:07|
| Philadelphia|       H|      7|2022-03-25 09:40:07|
|        Miami|       I|      8|2022-03-25 09:40:07|
|San Francisco|       J|      9|2022-03-25 09:40:07|
|      Seattle|       A|     10|2022-03-25 09:40:07|
|     New York|       B|     11|2022-03-25 09:40:07|
|   New Jersey|       C|     12|2022-03-25 09:40:07|
|  Los Angeles|       D|     13|2022-03-25 09:40:07|
|    Las Vegas|       E|     14|2022-03-25 09:40:07|
|       Tucson|       F|     15|2022-0

## Copy On Write

The default table type is Copy-On-Write which is best suited for read-heavy workloads with modest writes. Copy-On-Write creates commit files with original data + the new changes during writing itself. While this increases latency on writes, this set up makes it more manageable for faster read.


We will be using the Copy on write storage option(STORAGE_TYPE_OPT_KEY, "COPY_ON_WRITE") which is the default option
and need not be explicitly set.

Write the data to S3: 

In [13]:
(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .mode("Overwrite")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's observe the number of files in S3. Expected number of files is 3 files as BULK_INSERT_PARALLELISM is set to 3. 
```

$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2021-11-15 19:47:08          0 .hoodie_$folder$
2021-11-15 19:47:19         93 .hoodie_partition_metadata
2021-11-15 19:47:29    4874333 2ac15cd9-60ce-4fbd-8ae7-91abae3a3a12-0_2-15-105_20211115194706.parquet
2021-11-15 19:47:29    4936875 3de41050-e3d7-41d2-8ce5-2892ac24e2e1-0_1-15-104_20211115194706.parquet
2021-11-15 19:47:26    4672448 bfa09b26-b3af-4f1a-90d1-c6f71bf70a07-0_0-15-103_20211115194706.parquet

Total Objects: 5
   Total Size: 13.8 MiB

```

```

Let's inspect the table created and query the data:

In [14]:
spark.sql("describe formatted "+config['table_name']).show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------+-----------------------------------------------------------+-------+
|col_name                    |data_type                                                  |comment|
+----------------------------+-----------------------------------------------------------+-------+
|_hoodie_commit_time         |string                                                     |null   |
|_hoodie_commit_seqno        |string                                                     |null   |
|_hoodie_record_key          |string                                                     |null   |
|_hoodie_partition_path      |string                                                     |null   |
|_hoodie_file_name           |string                                                     |null   |
|destination                 |string                                                     |null   |
|route_id                    |string                                                     |null   |
|trip_id  

In [15]:
df2=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df2.count()
df2.show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----------------------+------------------+----------------------+--------------------------------------------------------------------+-------------+--------+-------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno   |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                   |destination  |route_id|trip_id|tstamp             |
+-------------------+-----------------------+------------------+----------------------+--------------------------------------------------------------------+-------------+--------+-------+-------------------+
|20220325094025     |20220325094025_2_655243|407615            |                      |008d7dbd-649d-4355-956e-9b6b3f8b48fa-0_2-9-64_20220325094025.parquet|Tucson       |F       |407615 |2022-03-25 09:40:07|
|20220325094025     |20220325094025_2_655244|407616            |                      |008d7dbd-649d-4355-956e-9b6b3f8b48fa-0_2-9-64_20220325094025.parquet|Washington D

We can query the Hive table as well:

In [16]:
spark.sql("select count(*) from "+config['table_name']).show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|2000000 |
+--------+


## Batch Upsert some records

Let's modify a few records:

In [17]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name'] +" where trip_id between 1000000 and 1000009").show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------+-------------------+
|trip_id|route_id|destination  |tstamp             |
+-------+--------+-------------+-------------------+
|1000000|A       |Seattle      |2022-03-25 09:40:07|
|1000001|B       |New York     |2022-03-25 09:40:07|
|1000002|C       |New Jersey   |2022-03-25 09:40:07|
|1000003|D       |Los Angeles  |2022-03-25 09:40:07|
|1000004|E       |Las Vegas    |2022-03-25 09:40:07|
|1000005|F       |Tucson       |2022-03-25 09:40:07|
|1000006|G       |Washington DC|2022-03-25 09:40:07|
|1000007|H       |Philadelphia |2022-03-25 09:40:07|
|1000008|I       |Miami        |2022-03-25 09:40:07|
|1000009|J       |San Francisco|2022-03-25 09:40:07|
+-------+--------+-------------+-------------------+

In [18]:
upsert_dest = ["Boston", "Boston", "Boston", "Boston", "Boston","Boston","Boston","Boston","Boston","Boston"]
df3 = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))
df3.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [19]:
df3.select("trip_id","route_id","destination","tstamp").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-----------+-------------------+
|trip_id|route_id|destination|             tstamp|
+-------+--------+-----------+-------------------+
|1000000|       A|     Boston|2022-03-25 09:44:40|
|1000001|       B|     Boston|2022-03-25 09:44:40|
|1000002|       C|     Boston|2022-03-25 09:44:40|
|1000003|       D|     Boston|2022-03-25 09:44:40|
|1000004|       E|     Boston|2022-03-25 09:44:40|
|1000005|       F|     Boston|2022-03-25 09:44:40|
|1000006|       G|     Boston|2022-03-25 09:44:40|
|1000007|       H|     Boston|2022-03-25 09:44:40|
|1000008|       I|     Boston|2022-03-25 09:44:40|
|1000009|       J|     Boston|2022-03-25 09:44:40|
+-------+--------+-----------+-------------------+

We have changed the destination and timestamp for trip IDs 1000000 to 1000010. Now, let's upsert the changes to S3. Note that the operation now is "Upsert" as opposed to BulkInsert for the initial load:

```
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)

```

In [20]:
(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id between 999996 and 1000013").show(50,False)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1000000|A       |2022-03-25 09:44:40|Boston       |
|1000001|B       |2022-03-25 09:44:40|Boston       |
|1000002|C       |2022-03-25 09:44:40|Boston       |
|1000003|D       |2022-03-25 09:44:40|Boston       |
|1000004|E       |2022-03-25 09:44:40|Boston       |
|1000005|F       |2022-03-25 09:44:40|Boston       |
|1000006|G       |2022-03-25 09:44:40|Boston       |
|1000007|H       |2022-03-25 09:44:40|Boston       |
|1000008|I       |2022-03-25 09:44:40|Boston       |
|1000009|J       |2022-03-25 09:44:40|Boston       |
|1000010|A       |2022-03-25 09:40:07|Seattle      |
|1000011|B       |2022-03-25 09:40:07|New York     |
|1000012|C       |2022-03-25 09:40:07|New Jersey   |
|1000013|D       |2022-03-25 09:40:07|Los Angeles  |
|999996 |G       |2022-03-25 09:40:07|Washington DC|
|999997 |H       |2022-03-25 09:40:07|Philadel

Now lets check the commit. You will notice 3 base files from Bulk Insert + 1 different version of one of the base file from upsert. For instance, in below example we now have 2 versions of bfa09b26-b3af-4f1a-90d1-c6f71bf70a07

```
$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2021-11-15 19:47:08          0 .hoodie_$folder$
2021-11-15 19:47:19         93 .hoodie_partition_metadata
2021-11-15 19:47:29    4874333 2ac15cd9-60ce-4fbd-8ae7-91abae3a3a12-0_2-15-105_20211115194706.parquet
2021-11-15 19:47:29    4936875 3de41050-e3d7-41d2-8ce5-2892ac24e2e1-0_1-15-104_20211115194706.parquet
2021-11-15 19:47:26    4672448 bfa09b26-b3af-4f1a-90d1-c6f71bf70a07-0_0-15-103_20211115194706.parquet
2021-11-15 19:50:29    4672282 bfa09b26-b3af-4f1a-90d1-c6f71bf70a07-0_0-59-416_20211115195018.parquet

Total Objects: 6
   Total Size: 18.6 MiB

```

Now that we upserted some records, let us try to insert 10 new records into the table. We will use same upsert option. As primary keys 2000000 to 2000009 do not exist in the table, the records will be inserted. 

In [22]:
insert_dest = ["Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse"]
df5 = create_json_df(spark, get_json_data(2000000, 10, insert_dest))
df5.count()
df5.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+-------------------+
|destination|route_id|trip_id|             tstamp|
+-----------+--------+-------+-------------------+
|   Syracuse|       A|2000000|2022-03-25 09:45:31|
|   Syracuse|       B|2000001|2022-03-25 09:45:31|
|   Syracuse|       C|2000002|2022-03-25 09:45:31|
|   Syracuse|       D|2000003|2022-03-25 09:45:31|
|   Syracuse|       E|2000004|2022-03-25 09:45:31|
|   Syracuse|       F|2000005|2022-03-25 09:45:31|
|   Syracuse|       G|2000006|2022-03-25 09:45:31|
|   Syracuse|       H|2000007|2022-03-25 09:45:31|
|   Syracuse|       I|2000008|2022-03-25 09:45:31|
|   Syracuse|       J|2000009|2022-03-25 09:45:31|
+-----------+--------+-------+-------------------+

In [23]:
(df5.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df6=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df6.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2000010

In [25]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id > 1999996").show(50,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1999997|H       |2022-03-25 09:40:07|Philadelphia |
|1999998|I       |2022-03-25 09:40:07|Miami        |
|1999999|J       |2022-03-25 09:40:07|San Francisco|
|2000009|J       |2022-03-25 09:45:31|Syracuse     |
|2000008|I       |2022-03-25 09:45:31|Syracuse     |
|2000007|H       |2022-03-25 09:45:31|Syracuse     |
|2000006|G       |2022-03-25 09:45:31|Syracuse     |
|2000001|B       |2022-03-25 09:45:31|Syracuse     |
|2000000|A       |2022-03-25 09:45:31|Syracuse     |
|2000005|F       |2022-03-25 09:45:31|Syracuse     |
|2000004|E       |2022-03-25 09:45:31|Syracuse     |
|2000003|D       |2022-03-25 09:45:31|Syracuse     |
|2000002|C       |2022-03-25 09:45:31|Syracuse     |
+-------+--------+-------------------+-------------+

## Incremental View
Hudi provides **Incremental** **View**. This is helpful to extract only the changes between any two commits. 

In [26]:
commits = list(map(lambda row: row[0], spark.sql("select distinct _hoodie_commit_time as commit_time from "+config['table_name'] +" order by commit_time").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in
print(beginTime)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20220325094500

In [27]:
# We are taking second last commit_time as our begin time to find records that have been changed between last two commits

incremental_read_options = {
  VIEW_TYPE_OPT_KEY: VIEW_TYPE_INCREMENTAL_OPT_VAL,
  BEGIN_INSTANTTIME_OPT_KEY: beginTime,
}

incQueryDF=spark.read.format(HUDI_FORMAT).options(**incremental_read_options).load(config["target"]+"/*")
incQueryDF.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------+----------------------+--------------------+-----------+--------+-------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|destination|route_id|trip_id|             tstamp|
+-------------------+--------------------+------------------+----------------------+--------------------+-----------+--------+-------+-------------------+
|     20220325094536| 20220325094536_0_11|           2000009|                      |319c4469-92d7-433...|   Syracuse|       J|2000009|2022-03-25 09:45:31|
|     20220325094536| 20220325094536_0_12|           2000008|                      |319c4469-92d7-433...|   Syracuse|       I|2000008|2022-03-25 09:45:31|
|     20220325094536| 20220325094536_0_13|           2000007|                      |319c4469-92d7-433...|   Syracuse|       H|2000007|2022-03-25 09:45:31|
|     20220325094536| 20220325094536_0_14|           2000006|         

We can observe that only newly added records from trip_id 2000000 to 2000009 are shown.

## Deleting Records.

Apache Hudi supports implementing two types of deletes on data stored in Hudi datasets, by enabling the user to specify a different record payload implementation.

* **Soft Deletes** : With soft deletes, user wants to retain the key but just null out the values for all other fields. This can be simply achieved by ensuring the appropriate fields are nullable in the dataset schema and simply upserting the dataset after setting these fields to null.
    
* **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the dataset. 

Let's now execute some hard delete operations on our dataset which will remove the records from our dataset.

Let's delete the 10 records with the "Syracuse" destination we added to the table. You can hard delete data by setting OPERATION_OPT_KEY to DELETE_OPERATION_OPT_VAL to remove all records in the dataset you submit
```
.option(OPERATION_OPT_KEY,DELETE_OPERATION_OPT_VAL)

```

In [28]:
df5.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+-------------------+
|destination|route_id|trip_id|             tstamp|
+-----------+--------+-------+-------------------+
|   Syracuse|       A|2000000|2022-03-25 09:45:31|
|   Syracuse|       B|2000001|2022-03-25 09:45:31|
|   Syracuse|       C|2000002|2022-03-25 09:45:31|
|   Syracuse|       D|2000003|2022-03-25 09:45:31|
|   Syracuse|       E|2000004|2022-03-25 09:45:31|
|   Syracuse|       F|2000005|2022-03-25 09:45:31|
|   Syracuse|       G|2000006|2022-03-25 09:45:31|
|   Syracuse|       H|2000007|2022-03-25 09:45:31|
|   Syracuse|       I|2000008|2022-03-25 09:45:31|
|   Syracuse|       J|2000009|2022-03-25 09:45:31|
+-----------+--------+-------+-------------------+

In [29]:
(df5.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id > 1999996").show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1999997|H       |2022-03-25 09:40:07|Philadelphia |
|1999998|I       |2022-03-25 09:40:07|Miami        |
|1999999|J       |2022-03-25 09:40:07|San Francisco|
+-------+--------+-------------------+-------------+

We can observe that the records > 2000000 no longer exist in our table.

Let's observe the number of files in S3. Expected : 6 files (initial files (3) + one upsert + one insert + one delete = 6)

```

$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2021-11-15 19:47:08          0 .hoodie_$folder$
2021-11-15 19:47:19         93 .hoodie_partition_metadata
2021-11-15 19:47:29    4874333 2ac15cd9-60ce-4fbd-8ae7-91abae3a3a12-0_2-15-105_20211115194706.parquet
2021-11-15 19:57:26    4936875 3de41050-e3d7-41d2-8ce5-2892ac24e2e1-0_0-164-14890_20211115195703.parquet
2021-11-15 19:53:14    4936622 3de41050-e3d7-41d2-8ce5-2892ac24e2e1-0_0-99-692_20211115195304.parquet
2021-11-15 19:47:29    4936875 3de41050-e3d7-41d2-8ce5-2892ac24e2e1-0_1-15-104_20211115194706.parquet
2021-11-15 19:47:26    4672448 bfa09b26-b3af-4f1a-90d1-c6f71bf70a07-0_0-15-103_20211115194706.parquet
2021-11-15 19:50:29    4672282 bfa09b26-b3af-4f1a-90d1-c6f71bf70a07-0_0-59-416_20211115195018.parquet

Total Objects: 8
   Total Size: 27.5 MiB

```

In our example, we set number of commits to retain as 4. So, maximum only 4 new versions of a single file can be created on top of our bulk insert files.  This also directly translates into how much data you can incrementally pull on this table, default is 10. Hudi Cleaner Policy will delete older files when writing based on the commit retain policy.

## Rollback

Let's say we want to roll back the last delete we made. 

Start the hudi CLI using the command-> 
    /usr/lib/hudi/cli/bin/hudi-cli.sh

connect to the hudi/hudi_trips_table using the command ->  
    connect --path s3://BUCKET_NAME/hudi/hudi_trips_table/

List the commits using command ->> 
    commits show

hudi:hudi_trips_table->commits show
20/04/28 23:46:09 INFO s3n.S3NativeFileSystem: Opening 's3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/.hoodie/20200428231705.commit' for reading
20/04/28 23:46:09 INFO s3n.S3NativeFileSystem: Opening 's3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/.hoodie/20200428231528.commit' for reading
20/04/28 23:46:09 INFO s3n.S3NativeFileSystem: Opening 's3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/.hoodie/20200428231141.commit' for reading
╔════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime     │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20200428231814 │ 4.4 MB              │ 0                 │ 1                   │ 1                        │ 642132                │ 0                            │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200428231528 │ 4.8 MB              │ 0                 │ 1                   │ 1                        │ 697329                │ 10                           │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200428231528 │ 4.8 MB              │ 0                 │ 1                   │ 1                        │ 697329                │ 10                           │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢

║ 20200428231141 │ 13.8 MB             │ 3                 │ 0                   │ 1                        │ 2000000               │ 0                            │ 0            ║
╚════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

commit rollback --commit 20200428231814
    
Now let us check what happened to the records we deleted earlier.

If you see this message- "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources". Make sure you have enough resources to execute the rollback job

In [31]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id > 1999996").show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1999997|H       |2022-03-25 09:40:07|Philadelphia |
|1999998|I       |2022-03-25 09:40:07|Miami        |
|1999999|J       |2022-03-25 09:40:07|San Francisco|
|2000009|J       |2022-03-25 09:45:31|Syracuse     |
|2000008|I       |2022-03-25 09:45:31|Syracuse     |
|2000007|H       |2022-03-25 09:45:31|Syracuse     |
|2000006|G       |2022-03-25 09:45:31|Syracuse     |
|2000001|B       |2022-03-25 09:45:31|Syracuse     |
|2000000|A       |2022-03-25 09:45:31|Syracuse     |
|2000005|F       |2022-03-25 09:45:31|Syracuse     |
|2000004|E       |2022-03-25 09:45:31|Syracuse     |
|2000003|D       |2022-03-25 09:45:31|Syracuse     |
|2000002|C       |2022-03-25 09:45:31|Syracuse     |
+-------+--------+-------------------+-------------+

## Advanced:Understanding-Hudi-Commits

After rollback you should have 5 files in your s3 bucket.Lets understand how does hudi clean older files. There are 2 properties that determine hudi cleaner behavior. 

**KEEP_LATEST_COMMITS:** This is the default policy. This is a temporal cleaning policy that ensures the effect of having lookback into all the changes that happened in the last X commits. Suppose a writer is ingesting data into a Hudi dataset every 30 minutes and the longest running query can take 5 hours to finish, then the user should retain atleast the last 10 commits. With such a configuration, we ensure that the oldest version of a file is kept on disk for at least 5 hours, thereby preventing the longest running query from failing at any point in time. Incremental cleaning is also possible using this policy.

**KEEP_LATEST_FILE_VERSIONS:** This policy has the effect of keeping N number of file versions irrespective of time. This policy is useful when it is known how many MAX versions of the file does one want to keep at any given time. To achieve the same behaviour as before of preventing long running queries from failing, one should do their calculations based on data patterns. Alternatively, this policy is also useful if a user just wants to maintain 1 latest version of the file.


Let us run the upsert operation with these commit configuration and notice the change in behavior of files created on S3 

In [32]:
## KEEP_LATEST_COMMITS
## Lets run the upsert statement again to create new version of the file. Keep the output of aws s3 ls handy prior to running below command to check the difference

(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,2)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now compare the file names before and after the upsert statement. Because of .option(HUDI_COMMITS_RETAINED,2) you will not have more than 2 versions of the single file. Older version gets cleaned up as new version is written. 

In [33]:
## KEEP_LATEST_FILE_VERSIONS

(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_FILE_VERSIONS) 
      .option(HUDI_FILES_RETAINED,1)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Compare the change in files on S3 now. Because of .option(HUDI_FILES_RETAINED,1) you will only have 3 files (3 base * 1 version of each file). Notice the file names are still the same, however they might be versioned differently. 

```

$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2021-11-15 19:47:08          0 .hoodie_$folder$
2021-11-15 19:47:19         93 .hoodie_partition_metadata
2021-11-15 19:47:29    4874333 2ac15cd9-60ce-4fbd-8ae7-91abae3a3a12-0_2-15-105_20211115194706.parquet
2021-11-15 19:53:14    4936622 3de41050-e3d7-41d2-8ce5-2892ac24e2e1-0_0-99-692_20211115195304.parquet
2021-11-15 20:49:04    4672242 bfa09b26-b3af-4f1a-90d1-c6f71bf70a07-0_0-65-482_20211115204852.parquet

```