## Configuration

Before running the code in the cell(s) below SSH into your EMR cluster and run the following 

```hdfs dfs -mkdir -p /apps/hudi/lib```

```hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar```

```hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar```

This will copy the Hudi jar files from the local file system to HDFS on the master node of the notebook cluster

In [1]:
%%configure
{
    "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
    }
}

In [2]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.types
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.hudi.hive.MultiPartKeysValueExtractor

import java.sql.Timestamp

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
17,application_1638389236079_0045,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.types
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import java.sql.Timestamp


## Write to S3 via. Hudi

In [3]:
val inputDF = Seq(
    ("1", "Chris", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00")),
    ("2", "Will", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00")),
    ("3", "Emma", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00")),
    ("4", "John", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00")),
    ("5", "Eric", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00")),
    ("6", "Adam", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00"))
).toDF(
    "id",
    "name",
    "create_date",
    "last_update_time"
)

val hudiOptions = Map[String,String](
  HoodieWriteConfig.TABLE_NAME -> "merge_on_read_scala",
  DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "false",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "merge_on_read_scala",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)

inputDF.
    write.
    format("org.apache.hudi").
    option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL).
    options(hudiOptions).
    mode(SaveMode.Overwrite).
    save("s3://hudi-sharkech/merge_on_read_scala/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

inputDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]
hudiOptions: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.write.precombine.field -> last_update_time, hoodie.datasource.hive_sync.partition_fields -> creation_date, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.MultiPartKeysValueExtractor, hoodie.datasource.hive_sync.table -> merge_on_read_scala, hoodie.datasource.hive_sync.enable -> true, hoodie.datasource.write.recordkey.field -> id, hoodie.table.name -> merge_on_read_scala, hoodie.datasource.write.table.type -> MERGE_ON_READ, hoodie.datasource.write.partitionpath.field -> creation_date)


## Upsert data

Lets do an upsert ... this will be *upsert #1* 

In *upsert 1* we change **Chris** to **Chris Sharkey**

also note that for this write we set [inline compaction][0] to false ```option("hoodie.compact.inline", "false")``` . This keeps Hudi from compacting our changes during the write operation.

[0]:https://hudi.apache.org/docs/0.7.0/configurations#withinlinecompactioninlinecompaction--false

In [4]:
val updateDF = Seq(
    ("1", "Chris Sharkey", "2020-01-01", Timestamp.valueOf("2020-01-02 00:00:00"))
).toDF(
    "id",
    "name",
    "create_date",
    "last_update_time"
)

updateDF.
    write.
    format("org.apache.hudi").
    option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
    option("hoodie.compact.inline", "false").
    options(hudiOptions).
    mode(SaveMode.Append).
    save("s3://hudi-sharkech/merge_on_read_scala/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

updateDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]


## Read the Hudi Table

Hudi provides 3 query types
1. Snapshot Query
2. Read Optimized Query 
3. Incremental Query

We will cover Snapshot queries and Read Optimized queries below. Incremetnal queries are covered in the [copy_on_write][1] notebooks.

Query Type|Description
:---|:---|
Snapshot Queries|Queries that see the latest snapshot of the table as of a given commit or compaction action. For MoR tables, snapshot queries expose the most recent state of the table by merging the base and delta files of the latest file slice at the time of the query. 
Incremental Queries|Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
Read Optimized Queries|For MoR tables, queries see the latest data compacted. For CoW tables, queries see the latest data committed.

[1]:https://github.com/ev2900/Hudi_Elastic_Map_Reduce/tree/main/copy_on_write

### Snapshot Query

We expect a snapshot query to return the most up to date version of a Hudi table. 

The snap shotquery should include *upsert 1* that changed Chris to **Chris Sharkey**

In [5]:
val snapshotQueryDF = spark.read.format("org.apache.hudi").load("s3://hudi-sharkech/merge_on_read_scala" + "/*/*")

// snapshotQueryDF.orderBy("id").show()
//snapshotQueryDF.select("id", "_hoodie_record_key", "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_partition_path", "_hoodie_file_name").orderBy("id").show()
snapshotQueryDF.select("id", "name", "create_date", "last_update_time").orderBy("id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

snapshotQueryDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+-------------+-----------+-------------------+
| id|         name|create_date|   last_update_time|
+---+-------------+-----------+-------------------+
|  1|Chris Sharkey| 2020-01-01|2020-01-02 00:00:00|
|  2|         Will| 2020-01-01|2020-01-01 00:00:00|
|  3|         Emma| 2020-01-01|2020-01-01 00:00:00|
|  4|         John| 2020-01-01|2020-01-01 00:00:00|
|  5|         Eric| 2020-01-01|2020-01-01 00:00:00|
|  6|         Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-------------+-----------+-------------------+



### Read Optimized Queries

A read optimized query to return the latest data compacted. 

*upsert 1* that changed **Chris** to **Chris Sharkey** has not been compacted to the base parquet files yet becuase we set ```option("hoodie.compact.inline", "false")``` during the upsert in the prior step.

We expect the read optimized query to **not** reflect the changes made in *upsert 1*

In [6]:
val read_optimized_options = Map[String,String](
    DataSourceReadOptions.QUERY_TYPE_OPT_KEY -> QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
)

val readOptimizedDF = spark.
    read.format("hudi").
    options(read_optimized_options).
    load("s3://hudi-sharkech/merge_on_read_scala" + "/*/*")

readOptimizedDF.select("id", "name", "create_date", "last_update_time").orderBy("id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

read_optimized_options: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.query.type -> read_optimized)
readOptimizedDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+



Now that we are getting the hang of it ... lets do another upsert this will be *upsert #2*

*upsert 2* will change **Chris Sharkey** to **Chris M Sharkey**

In [7]:
val updateDF = Seq(
    ("1", "Chris M Sharkey", "2020-01-01", Timestamp.valueOf("2020-01-02 00:00:00"))
).toDF(
    "id",
    "name",
    "create_date",
    "last_update_time"
)

updateDF.
    write.
    format("org.apache.hudi").
    option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
    option("hoodie.compact.inline", "false").
    options(hudiOptions).
    mode(SaveMode.Append).
    save("s3://hudi-sharkech/merge_on_read_scala/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

updateDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]


Snapshot query ... the query results should include the changes we just made in *upsert 2*

In [8]:
val snapshotQueryDF = spark.read.format("org.apache.hudi").load("s3://hudi-sharkech/merge_on_read_scala" + "/*/*")

// snapshotQueryDF.orderBy("id").show()
//snapshotQueryDF.select("id", "_hoodie_record_key", "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_partition_path", "_hoodie_file_name").orderBy("id").show()
snapshotQueryDF.select("id", "name", "create_date", "last_update_time").orderBy("id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

snapshotQueryDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+---------------+-----------+-------------------+
| id|           name|create_date|   last_update_time|
+---+---------------+-----------+-------------------+
|  1|Chris M Sharkey| 2020-01-01|2020-01-02 00:00:00|
|  2|           Will| 2020-01-01|2020-01-01 00:00:00|
|  3|           Emma| 2020-01-01|2020-01-01 00:00:00|
|  4|           John| 2020-01-01|2020-01-01 00:00:00|
|  5|           Eric| 2020-01-01|2020-01-01 00:00:00|
|  6|           Adam| 2020-01-01|2020-01-01 00:00:00|
+---+---------------+-----------+-------------------+



Read optimized query .. neither *upsert 1* or *upsert 2* have been compacted yet. 

The read optimized query should **not** include the changes made by either *upsert 1* or *upsert 2*

In [9]:
val read_optimized_options = Map[String,String](
    DataSourceReadOptions.QUERY_TYPE_OPT_KEY -> QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
)

val readOptimizedDF = spark.
    read.format("hudi").
    options(read_optimized_options).
    load("s3://hudi-sharkech/merge_on_read_scala" + "/*/*")

readOptimizedDF.select("id", "name", "create_date", "last_update_time").orderBy("id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

read_optimized_options: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.query.type -> read_optimized)
readOptimizedDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+



### Compaction

Running a compaction will merge the changes we made in *upsert 1* and *upsert 2* with the base parquet files. After a compaction the snapshot query and read optimized query will return the same results. 

An easy way to trigger a compaction is to do another write operation and set ```option("hoodie.compact.inline", "true")``` and set ```option("hoodie.compact.inline.max.delta.commits", "1")```

We could also run a compaction from the [Hudi CLI][1] or via. other methods 

[1]:https://hudi.apache.org/docs/0.7.0/deployment#compactions

In [10]:
val updateDF = Seq(
    ("1", "Christopher M Sharkey", "2020-01-01", Timestamp.valueOf("2020-01-02 00:00:00"))
).toDF(
    "id",
    "name",
    "create_date",
    "last_update_time"
)

updateDF.
    write.
    format("org.apache.hudi").
    option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
    option("hoodie.compact.inline", "true").
    option("hoodie.compact.inline.max.delta.commits", "1").
    options(hudiOptions).
    mode(SaveMode.Append).
    save("s3://hudi-sharkech/merge_on_read_scala/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

updateDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]


The snapshot query and read optimized query will should now return the same results

In [11]:
// Snapshot query
val snapshotQueryDF = spark.read.format("org.apache.hudi").load("s3://hudi-sharkech/merge_on_read_scala" + "/*/*")

snapshotQueryDF.select("id", "name", "create_date", "last_update_time").orderBy("id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

snapshotQueryDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+--------------------+-----------+-------------------+
| id|                name|create_date|   last_update_time|
+---+--------------------+-----------+-------------------+
|  1|Christopher M Sha...| 2020-01-01|2020-01-02 00:00:00|
|  2|                Will| 2020-01-01|2020-01-01 00:00:00|
|  3|                Emma| 2020-01-01|2020-01-01 00:00:00|
|  4|                John| 2020-01-01|2020-01-01 00:00:00|
|  5|                Eric| 2020-01-01|2020-01-01 00:00:00|
|  6|                Adam| 2020-01-01|2020-01-01 00:00:00|
+---+--------------------+-----------+-------------------+



In [12]:
// Read optimized query
val read_optimized_options = Map[String,String](
    DataSourceReadOptions.QUERY_TYPE_OPT_KEY -> QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
)

val readOptimizedDF = spark.
    read.format("hudi").
    options(read_optimized_options).
    load("s3://hudi-sharkech/merge_on_read_scala" + "/*/*")

readOptimizedDF.select("id", "name", "create_date", "last_update_time").orderBy("id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

read_optimized_options: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.query.type -> read_optimized)
readOptimizedDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+--------------------+-----------+-------------------+
| id|                name|create_date|   last_update_time|
+---+--------------------+-----------+-------------------+
|  1|Christopher M Sha...| 2020-01-01|2020-01-02 00:00:00|
|  2|                Will| 2020-01-01|2020-01-01 00:00:00|
|  3|                Emma| 2020-01-01|2020-01-01 00:00:00|
|  4|                John| 2020-01-01|2020-01-01 00:00:00|
|  5|                Eric| 2020-01-01|2020-01-01 00:00:00|
|  6|                Adam| 2020-01-01|2020-01-01 00:00:00|
+---+--------------------+-----------+-------------------+

