In [1]:
%%configure
{ "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
          }}

In [None]:
%%html 
<img src="/ProductReviewsProcessingRepo/images/hudi_demo_diagram.png">

In [9]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiTableName = "amazon_product_reviews"
val hudiTableRecordKey = "review_id"
val hudiTablePath = "s3://hocanint-reinvent-2019-demo-outputs/createdatasets/" + hudiTableName
val hudiTablePartitionColumn = "review_date"


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
hudiTableName: String = amazon_product_reviews
hudiTableRecordKey: String = review_id
hudiTablePath: String = s3://hocanint-reinvent-2019-demo-outputs/createdatasets/amazon_product_reviews
hudiTablePartitionColumn: String = review_date
hudiTablePrecombineKey: String = timestamp


In [12]:
/****************************
Read out product reviews table
*****************************/
val sourceData = spark.read.option("sep", "\t").option("header", "true").parquet("s3://hocanint-reinvent-2019-datasets-us-east-1/parquet/product_category=Home_Improvement/*").withColumn(hudiTablePrecombineKey, current_timestamp()).withColumn(hudiTablePartitionColumn, regexp_replace(col(hudiTablePartitionColumn), "-", "/")).withColumn("year", $"review_date".substr(1,4)).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

sourceData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [marketplace: string, customer_id: string ... 14 more fields]


In [7]:
sourceData.select("marketplace", "review_id", "customer_id", "product_title", "star_rating", "review_date").show(10);

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------+-----------+--------------------+-----------+-----------+
|marketplace|     review_id|customer_id|       product_title|star_rating|review_date|
+-----------+--------------+-----------+--------------------+-----------+-----------+
|         US|R2XKAHVTHMN0YZ|   10127812|RoomMates RMK2361...|          5| 2015/01/01|
|         US|R23U5RQ70LNFES|   18462986|Progress Lighting...|          5| 2015/01/01|
|         US| R3ZRQAED0WWBF|    9973935|Elmer's E7010 Car...|          5| 2015/01/01|
|         US|R36EL6ASTQXWCJ|   15871971|Honeywell RCWL300...|          5| 2015/01/01|
|         US|R33S6FX3917UAT|   53047377|Master Lock Pytho...|          5| 2015/01/01|
|         US| RE1ZOXQ9PAHUN|   11082374|Step 2 541200 Mai...|          5| 2015/01/01|
|         US|R1SLAY20OP3Z26|    1386285|Science Purchase ...|          5| 2015/01/01|
|         US| R85LC7REDGM52|   10675345|Axis 45504 3 Outl...|          5| 2015/01/01|
|         US|R1RQ9YTHRX3NZI|   15700318|General Electr

In [13]:
/****************************
Our Hudi Options for our Product Reviews Dataset.
*****************************/
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TABLE_NAME -> hudiTableName,

  //For this data set, we will configure it to use the COPY_ON_WRITE storage strategy. You can also choose MERGE_ON_READ
  DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 

  //These three options configure what Hudi should use as its record key, partition column, and precombine key.
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "review_id",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "review_date",

  //For this data set, we specify that we want to sync metadata with Hive. 
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
  DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "year,month,day",
  DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → classOf[MultiPartKeysValueExtractor].getName
)

/** ********************************
Lets write our input dataset to Hudi.
************************************/
(sourceData.write
  .format("org.apache.hudi")
  .options(hudiOptions)

  //Operation Key tells Hudi whether this is an Insert, Upsert, or Bulk Insert operation.
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
  
  .mode(SaveMode.Overwrite)
  .save(hudiTablePath))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

hudiOptions: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.write.precombine.field -> timestamp, hoodie.datasource.hive_sync.partition_fields -> year,month,day, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.MultiPartKeysValueExtractor, hoodie.datasource.hive_sync.table -> amazon_product_reviews, hoodie.datasource.hive_sync.enable -> true, hoodie.datasource.write.recordkey.field -> review_id, hoodie.table.name -> amazon_product_reviews, hoodie.datasource.write.storage.type -> COPY_ON_WRITE, hoodie.datasource.hive_sync.assume_date_partitioning -> false, hoodie.datasource.write.partitionpath.field -> review_date)


In [14]:
/** **********************************
Lets look at a product that some of my consumers may be having an issue with.
**************************************/
val readOptimizedHudiViewDF = (spark.read
       .format("org.apache.hudi")
       .load(hudiTablePath + "/*/*/*/*"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

readOptimizedHudiViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 19 more fields]


In [15]:
/** ***********************************
Lets take a look at our data. Lets say someone says there is something fishy going on with star ratings.
**************************************/
readOptimizedHudiViewDF.registerTempTable("amazon_product_reviews_raw_ro_table");
spark.sql("select star_rating, count(*) from amazon_product_reviews_raw_ro_table group by star_rating order by star_rating ASC").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+
|star_rating|count(1)|
+-----------+--------+
|          1|   77100|
|          2|   38288|
|          3|   58836|
|          4|  130377|
|          5|  599625|
|        100|       5|
+-----------+--------+



In [17]:
/** *********************************
Select the rows we want to update and and make the update.
************************************/
val upsertdf = readOptimizedHudiViewDF.filter($"star_rating" === 100).withColumn("star_rating", lit(null).cast(IntegerType))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

upsertdf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 19 more fields]


In [18]:
/** ********************************
Before, if you wanted to update data in S3, you had to read the old data, merge with the new data, and then overwrite
the old data. Now, with Hudi, you can directly update the data in-place.
************************************/
(upsertdf.write
  .format("org.apache.hudi")
  .options(hudiOptions)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  .mode(SaveMode.Append)
  .save(hudiTablePath))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
val readOptimizedHudiViewDF = spark.read.format("org.apache.hudi").load(hudiTablePath + "/*/*/*/*").cache()
readOptimizedHudiViewDF.registerTempTable("amazon_product_reviews_ro_table");
spark.sql("select star_rating, count(*) from amazon_product_reviews_ro_table group by star_rating order by star_rating ASC").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

readOptimizedHudiViewDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 19 more fields]
+-----------+--------+
|star_rating|count(1)|
+-----------+--------+
|       null|       5|
|          1|   77100|
|          2|   38288|
|          3|   58836|
|          4|  130377|
|          5|  599625|
+-----------+--------+



In [20]:
/** *******************************
Now, suppose we need to delete a customers information due to GDPR because a request was made by a customer?
***********************************/
val deleteRowsDf = readOptimizedHudiViewDF.filter($"customer_id" === 32068341);

//Deletion
(deleteRowsDf.write
  .format("org.apache.hudi")
  .options(hudiOptions)
  //We set the operation to UPSERT
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  //We set the Payload Class to be empty record 
  .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
  .mode(SaveMode.Append)
  .save(hudiTablePath))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

deleteRowsDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 19 more fields]


In [23]:
/**************************************
We can also do point in time queries. Lets take a look at all the commits.
***************************************/
val readOptimizedHudiViewDF = spark.read.format("org.apache.hudi").load(hudiTablePath + "/*/*/*/*").cache()
readOptimizedHudiViewDF.registerTempTable("amazon_product_reviews_ro_table");

spark.sql("select count(*) from amazon_product_reviews_ro_table where customer_id = 32068341").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

readOptimizedHudiViewDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 19 more fields]
+--------+
|count(1)|
+--------+
|       0|
+--------+



In [None]:
val commits = (spark.sql("select distinct(_hoodie_commit_time) as commitTime from amazon_product_reviews_ro_table order by commitTime")
                .map(k => k.getString(0)))

In [None]:
/** ********************************
Suppose that we wanted to know what was a review at a certain point of time. Hudi Allows that by specifying 
a point in time and it will read 
************************************/
val beginTime = "0"
val endTime = commits(0) // commit time we are interested in

val amazon_product_reviews_table = (spark.read
     .format("org.apache.hudi")
     //Mark that we want to do an incremental query
     .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)

     //Set at what time we want to start quering.
     .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
     .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime)

     .options(hudiOptions)
     .load(hudiTablePath)).cache()

amazon_product_reviews_table.select("review_id", "product_id", "product_title", "star_rating").filter($"star_rating" === 100).show()

In [None]:
/***********************************
At this point, I am going to switch to SQLDeveloper and call Hive Queries to show that the data has been changed
************************************/