# HudiOnHops - PySpark

This notebook is meant as a continuation of [HudiOnHops - Scala](https://github.com/logicalclocks/hops-examples/blob/master/notebooks/featurestore/hudi/HudiOnHops.ipynb). Check out that notebook first to get an introduction of what is Apache Hudi and how you can leverage it for dataset upserts and time travel.

This notebook will walk you through how to achieve the same results in PySpark and how to use the feature store Python API to create and update Hudi feature groups.

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
18,application_1574727804458_0028,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fba7607ddd8>

## Setup

For the feature store we are going to leverage Hudi's capability of synchronizing Hudi datasets with Apache Hive external tables. To do so, we need to do some setup.
First we need to build the JDBC string which Hudi is going to use to synchronize the dataset and its partitions with Hive.

Please note that in the following example you should replace the IP with the IP/hostname of your instance.

In [16]:
from hops import featurestore, hdfs, tls
jdbc_conn = ("jdbc:hive2://10.0.2.15:9085/{};auth=noSasl;ssl=true;twoWay=true;" + \
            "sslTrustStore=t_certificate;trustStorePassword={};" + \
            "sslKeyStore=k_certificate;keyStorePassword={}").format(featurestore.project_featurestore(), tls.get_key_store_pwd(), tls.get_key_store_pwd())
project_name = hdfs.project_name()
feature_group_name = "hudi_fg_1"

## Bulk insert

In this step we are going to generate a Spark dataframe containing some sample data and persist it as a Hudi dataset. We are then going to register this dataset as feature group with the feature store.

In [11]:
bulkInsertData = [
    (1, "2019-02-30", 0.4151, "Sweden"),
    (2, "2019-05-01", 1.2151, "Ireland"),
    (3, "2019-08-06", 0.2151, "Belgium"),
    (4, "2019-08-06", 0.8151, "Russia")
]

columns = ['id', 'date', 'value', 'country']
bulkInsertDf = spark.createDataFrame(bulkInsertData, columns)
bulkInsertDf.show(4)

+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-02-30|0.4151| Sweden|
|  2|2019-05-01|1.2151|Ireland|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
+---+----------+------+-------+

You can find the meaning of the options specified in the other notebook. As a reminder, Hudi is going to detect duplicates entries based on the `HoodieKey` which is the combination of `hoodie.datasource.write.recordkey.field` and `hoodie.datasource.write.partitionpath.field`. In future upserts, duplicated entries will be updated.

In [17]:
bulkInsertDf.write.format("org.apache.hudi") \
            .option("hoodie.table.name", feature_group_name) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "bulk_insert") \
            .option("hoodie.datasource.write.recordkey.field","id") \
            .option("hoodie.datasource.write.partitionpath.field", "date") \
            .option("hoodie.datasource.write.precombine.field", "value") \
            .option("hoodie.datasource.hive_sync.enable", "true") \
            .option("hoodie.datasource.hive_sync.table", feature_group_name) \
            .option("hoodie.datasource.hive_sync.database", featurestore.project_featurestore()) \
            .option("hoodie.datasource.hive_sync.jdbcurl", jdbc_conn) \
            .option("hoodie.datasource.hive_sync.partition_fields", "date") \
            .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor") \
            .mode("overwrite") \
            .save("hdfs:///Projects/{}/Resources/{}".format(project_name, feature_group_name))

In the following step we are going to register the Hudi dataset created above with the feature store and compute the statistics for it.

In [18]:
featurestore.sync_hive_table_with_featurestore("hudi_fg", featuregroup_version=1)

Synchronizing Hive Table: hudi_fg with Feature Store: demo_featurestore_admin000_featurestore
Hive Table: hudi_fg was successfully synchronized with Feature Store: demo_featurestore_admin000_featurestore

In [19]:
featurestore.update_featuregroup_stats("hudi_fg", featuregroup_version=1, descriptive_statistics=True,feature_correlation=True, \
                                       feature_histograms=True, cluster_analysis=True, stat_columns=None)

Running sql: use demo_featurestore_admin000_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM hudi_fg_1 against offline feature store
computing descriptive statistics for : hudi_fg, version: 1
computing feature correlation for: hudi_fg, version: 1
computing feature histograms for: hudi_fg, version: 1
computing cluster analysis for: hudi_fg, version: 1

## Upsert

In this step we simulate the arrival of new data. Some data is new (insert) other data has been modified (update). We are going to append it to the feature group created above.

In [21]:
upsertData = [
    (5, "2019-02-30", 0.7921, "Northern Ireland"), # Insert
    (1, "2019-02-30", 1.151, "Norway"), # Update
    (3, "2019-08-06", 0.999, "Belgium"), # Update
    (6, "2019-08-01", 0.0151, "France") # Insert
]

upsertDf = spark.createDataFrame(upsertData, columns)
upsertDf.show(4)

+---+----------+------+----------------+
| id|      date| value|         country|
+---+----------+------+----------------+
|  5|2019-02-30|0.7921|Northern Ireland|
|  1|2019-02-30| 1.151|          Norway|
|  3|2019-08-06| 0.999|         Belgium|
|  6|2019-08-01|0.0151|          France|
+---+----------+------+----------------+

In [22]:
upsertDf.write.format("org.apache.hudi") \
            .option("hoodie.table.name", feature_group_name) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "upsert") \
            .option("hoodie.datasource.write.recordkey.field","id") \
            .option("hoodie.datasource.write.partitionpath.field", "date") \
            .option("hoodie.datasource.write.precombine.field", "value") \
            .option("hoodie.datasource.hive_sync.enable", "true") \
            .option("hoodie.datasource.hive_sync.table", feature_group_name) \
            .option("hoodie.datasource.hive_sync.database", featurestore.project_featurestore()) \
            .option("hoodie.datasource.hive_sync.jdbcurl", jdbc_conn) \
            .option("hoodie.datasource.hive_sync.partition_fields", "date") \
            .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor") \
            .mode("append") \
            .save("hdfs:///Projects/{}/Resources/{}".format(project_name, feature_group_name))

Let's also refresh the statistics to consider the new data:

In [23]:
featurestore.update_featuregroup_stats("hudi_fg", featuregroup_version=1, descriptive_statistics=True,feature_correlation=True, \
                                       feature_histograms=True, cluster_analysis=True, stat_columns=None)

Running sql: use demo_featurestore_admin000_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM hudi_fg_1 against offline feature store
computing descriptive statistics for : hudi_fg, version: 1
computing feature correlation for: hudi_fg, version: 1
computing feature histograms for: hudi_fg, version: 1
computing cluster analysis for: hudi_fg, version: 1

## Read the content of the feature group

Using the Apache Hudi API you will be able to read the latest version of the feature group content or travel back in time and generate a training dataset from a previous version of the data.

In [49]:
df = spark.read.format("org.apache.hudi")\
          .load("hdfs:///Projects/demo_featurestore_admin000/Resources/hudi_fg_1/*/*")
df.registerTempTable("snapshot")

In [37]:
spark.sql("describe snapshot").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|                  id|   bigint|   null|
|                date|   string|   null|
|               value|   double|   null|
|             country|   string|   null|
+--------------------+---------+-------+

As you can see above, the table contains some Hudi internal fields used to track commits and allow time travel. We should remove them before generating the training dataset

In [80]:
clean_df = spark.sql("select id, date, value, country from snapshot")

In [51]:
clean_df.show()

+---+----------+------+----------------+
| id|      date| value|         country|
+---+----------+------+----------------+
|  1|2019-02-30| 1.151|          Norway|
|  5|2019-02-30|0.7921|Northern Ireland|
|  6|2019-08-01|0.0151|          France|
|  3|2019-08-06| 0.999|         Belgium|
|  2|2019-05-01|1.2151|         Ireland|
|  4|2019-08-06|0.8151|          Russia|
+---+----------+------+----------------+

In [43]:
featurestore.create_training_dataset(clean_df, "HudiTrainingDs", description="", \
                                     featurestore=featurestore.project_featurestore(), data_format="csv", \
                                     training_dataset_version=1, descriptive_statistics=False, feature_correlation=False, \
                                     feature_histograms=False, cluster_analysis=False, stat_columns=None)

Training Dataset created successfully

## Time Travel

Hudi in PySpark does have an easy way of accessing the timeline to retrieve all the commits. However, you can see the list of commits by querying the dataset as shown below:

In [64]:
tt = spark.read.format("org.apache.hudi") \
          .load("hdfs:///Projects/demo_featurestore_admin000/Resources/hudi_fg_1/*/*")
tt.registerTempTable("tt")
spark.sql("select _hoodie_commit_time from tt").show()

+-------------------+
|_hoodie_commit_time|
+-------------------+
|     20191126204338|
|     20191126204338|
|     20191126204338|
|     20191126204338|
|     20191126203237|
|     20191126203237|
+-------------------+

The options `hoodie.datasource.read.begin.instanttime` and `hoodie.datasource.read.end.instanttime` allow you to specify for which time frame to retrieve the data. The values should be timestamps, but they don't have to correspond to specific commit times. In the example below are retrieving the entries that changed from `0`(the beginning) and `20191126204000` (the last commit included in the range was at `20191126203237`).

Please note that if you re-run the notebook, the timestamps will differ.

In [67]:
incrementalDf = spark.read.format("org.apache.hudi") \
                     .option("hoodie.datasource.view.type", "incremental") \
                     .option("hoodie.datasource.read.begin.instanttime", "0") \
                     .option("hoodie.datasource.read.end.instanttime", "20191126204000") \
                     .load("hdfs:///Projects/demo_featurestore_admin000/Resources/hudi_fg_1")
incrementalDf.registerTempTable("incremental_df")
incrementalDf = spark.sql("select id, value, date, country from incremental_df")

As before, we can generate a training dataset out of this dataframe:

In [68]:
featurestore.create_training_dataset(incrementalDf, "HudiTrainingDs_timetravel", description="", \
                                     featurestore=featurestore.project_featurestore(), data_format="csv", \
                                     training_dataset_version=1, descriptive_statistics=False, feature_correlation=False, \
                                     feature_histograms=False, cluster_analysis=False, stat_columns=None)

Training Dataset created successfully

## Deletes

If you need to delete some records from a Hudi dataset you have 2 options:
- You do an upsert of the records you want to delete with all the fields set to NULL
- You issue a delete of the records. You can do so by setting the option: `.option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload")`

The cells below illustrate the second option:

In [71]:
russiaDf = spark.sql('SELECT * FROM snapshot WHERE country="Russia"')

In [72]:
russiaDf.show()

+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|      date| value|country|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+-------+
|     20191126203237|  20191126203237_3_8|                 4|            2019-08-06|971eed02-334c-4e5...|  4|2019-08-06|0.8151| Russia|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+-------+

In [74]:
russiaDf.write.format("org.apache.hudi") \
            .option("hoodie.table.name", feature_group_name) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "upsert") \
            .option("hoodie.datasource.write.recordkey.field","id") \
            .option("hoodie.datasource.write.partitionpath.field", "date") \
            .option("hoodie.datasource.write.precombine.field", "value") \
            .option("hoodie.datasource.hive_sync.enable", "true") \
            .option("hoodie.datasource.hive_sync.table", feature_group_name) \
            .option("hoodie.datasource.hive_sync.database", featurestore.project_featurestore()) \
            .option("hoodie.datasource.hive_sync.jdbcurl", jdbc_conn) \
            .option("hoodie.datasource.hive_sync.partition_fields", "date") \
            .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor") \
            .option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload") \
            .mode("append") \
            .save("hdfs:///Projects/{}/Resources/{}".format(project_name, feature_group_name))

If we get the current snapshot of the dataset, we see that the record was removed:

In [77]:
df = spark.read.format("org.apache.hudi")\
          .load("hdfs:///Projects/demo_featurestore_admin000/Resources/hudi_fg_1/*/*")
df.registerTempTable("snapshot")
spark.sql("SELECT * FROM snapshot").show()

+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+----------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|      date| value|         country|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+----------------+
|     20191126204338| 20191126204338_1_10|                 1|            2019-02-30|8a90bc02-03fe-459...|  1|2019-02-30| 1.151|          Norway|
|     20191126204338| 20191126204338_1_11|                 5|            2019-02-30|8a90bc02-03fe-459...|  5|2019-02-30|0.7921|Northern Ireland|
|     20191126204338| 20191126204338_2_12|                 6|            2019-08-01|5652d001-3d1d-48b...|  6|2019-08-01|0.0151|          France|
|     20191126204338|  20191126204338_0_9|                 3|            2019-08-06|480b1cd9-4644-42c...|  3|2019-08-06| 0.999|   

However, you can still travel back in time and retrieve the record if you need to:

In [79]:
incrementalDf = spark.read.format("org.apache.hudi") \
                     .option("hoodie.datasource.view.type", "incremental") \
                     .option("hoodie.datasource.read.begin.instanttime", "0") \
                     .option("hoodie.datasource.read.end.instanttime", "20191126204000") \
                     .load("hdfs:///Projects/demo_featurestore_admin000/Resources/hudi_fg_1")
incrementalDf.registerTempTable("incremental_df")
spark.sql("SELECT * FROM incremental_df").show()

+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|      date| value|country|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+------+-------+
|     20191126203237|  20191126203237_1_6|                 2|            2019-05-01|919dd34c-1ba7-4e1...|  2|2019-05-01|1.2151|Ireland|
|     20191126203237|  20191126203237_2_7|                 3|            2019-08-06|480b1cd9-4644-42c...|  3|2019-08-06|0.2151|Belgium|
|     20191126203237|  20191126203237_0_5|                 1|            2019-02-30|8a90bc02-03fe-459...|  1|2019-02-30|0.4151| Sweden|
|     20191126203237|  20191126203237_3_8|                 4|            2019-08-06|971eed02-334c-4e5...|  4|2019-08-06|0.8151| Russia|
+-------------------+--------------------+------

More documentation, including documentation on how to compact the commits and reduce the storage consumption, is available [here](https://hudi.apache.org/configurations.html)