In [None]:
%%configure
{ "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
          }}

In [None]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiTableName = "amazon_product_reviews"
val hudiTableRecordKey = "review_id"
val hudiTablePath = "s3://hocanint-reinvent-2019-demo-outputs/createdatasets/" + hudiTableName
val hudiTablePartitionColumn = "review_date"
val hudiTablePrecombineKey = "timestamp"

In [None]:
/****************************
Read out product reviews table
*****************************/
val sourceData = (spark.read.parquet("s3://hocanint-reinvent-2019-datasets-us-east-1/parquet/product_category=Home_Improvement/*")
                            .withColumn(hudiTablePrecombineKey, current_timestamp().cast("long"))
                            .withColumn(hudiTablePartitionColumn, regexp_replace(col(hudiTablePartitionColumn), "-", "/"))
                            .withColumn("year", $"review_date".substr(1,4))
                            .cache())

In [None]:
sourceData.select("marketplace", "review_id", "customer_id", "product_title", "star_rating", "review_date").show(10);

In [None]:
/****************************
Our Hudi Options for our Product Reviews Dataset.
*****************************/
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TABLE_NAME -> hudiTableName,

  //For this data set, we will configure it to use the COPY_ON_WRITE storage strategy. You can also choose MERGE_ON_READ
  DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 

  //These three options configure what Hudi should use as its record key, partition column, and precombine key.
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "review_id",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "review_date",

  //For this data set, we specify that we want to sync metadata with Hive. 
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
  DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "year,month,day",
  DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → classOf[MultiPartKeysValueExtractor].getName
)

/** ********************************
Lets write our input dataset to Hudi.
************************************/
(sourceData.write
  .format("org.apache.hudi")
  .options(hudiOptions)

  //Operation Key tells Hudi whether this is an Insert, Upsert, or Bulk Insert operation.
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
  
  .mode(SaveMode.Overwrite)
  .save(hudiTablePath))

In [None]:
/** **********************************
Lets look at a product that some of my consumers may be having an issue with.
**************************************/
val readOptimizedHudiViewDF = (spark.read
       .format("org.apache.hudi")
       .load(hudiTablePath + "/*/*/*/*"))

In [None]:
/** ***********************************
Lets take a look at our data. Lets say someone says there is something fishy going on with star ratings.
**************************************/
readOptimizedHudiViewDF.registerTempTable("amazon_product_reviews_raw_ro_table");
spark.sql("select star_rating, count(*) from amazon_product_reviews_raw_ro_table group by star_rating order by star_rating ASC").show()

In [None]:
/** *********************************
Select the rows we want to update and and make the update.
************************************/
val upsertdf = readOptimizedHudiViewDF.filter($"star_rating" === 100).withColumn("star_rating", lit(null).cast(IntegerType))

In [None]:
/** ********************************
Before, if you wanted to update data in S3, you had to read the old data, merge with the new data, and then overwrite
the old data. Now, with Hudi, you can directly update the data in-place.
************************************/
(upsertdf.write
  .format("org.apache.hudi")
  .options(hudiOptions)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  .mode(SaveMode.Append)
  .save(hudiTablePath))

In [None]:
val readOptimizedHudiViewDF = spark.read.format("org.apache.hudi").load(hudiTablePath + "/*/*/*/*").cache()
readOptimizedHudiViewDF.registerTempTable("amazon_product_reviews_ro_table");
spark.sql("select star_rating, count(*) from amazon_product_reviews_ro_table group by star_rating order by star_rating ASC").show()

In [None]:
/** *******************************
Now, suppose we need to delete a customers information due to GDPR because a request was made by a customer?
***********************************/
val deleteRowsDf = readOptimizedHudiViewDF.filter($"customer_id" === 32068341);

//Deletion
(deleteRowsDf.write
  .format("org.apache.hudi")
  .options(hudiOptions)
  //We set the operation to UPSERT
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  //We set the Payload Class to be empty record 
  .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
  .mode(SaveMode.Append)
  .save(hudiTablePath))

In [None]:
/**************************************
We can also do point in time queries. Lets take a look at all the commits.
***************************************/
val readOptimizedHudiViewDF = spark.read.format("org.apache.hudi").load(hudiTablePath + "/*/*/*/*").cache()
readOptimizedHudiViewDF.registerTempTable("amazon_product_reviews_ro_table");

spark.sql("select count(*) from amazon_product_reviews_ro_table where customer_id = 32068341").show()

In [None]:
val commits = (spark.sql("select distinct(_hoodie_commit_time) as commitTime from amazon_product_reviews_ro_table order by commitTime")
                .map(k => k.getString(0)).take(50))
commits.toString()

In [None]:
/** ********************************
Suppose that we wanted to know what was a review at a certain point of time. Hudi Allows that by specifying 
a point in time and it will read 
************************************/
val beginTime = "0"
val endTime = commits(0) // commit time we are interested in

val amazon_product_reviews_table = (spark.read
     .format("org.apache.hudi")
     //Mark that we want to do an incremental query
     .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)

     //Set at what time we want to start quering.
     .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
     .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime)

     .options(hudiOptions)
     .load(hudiTablePath)).cache()

amazon_product_reviews_table.select("review_id", "product_id", "product_title", "star_rating").filter($"star_rating" === 100).show()

In [None]:
/***********************************
Hive and Presto can query the data too!
************************************/