Skip to content

Support a querying delete data methond in incremental view #14517

@hudi-bot

Description

@hudi-bot

As we known, hudi have supported many method to query data in Spark and Hive and Presto. And it also provides a very good timeline idea to trace changes in data, and it can be used to query incremental data in incremental view.
In old time, we just have insert and update funciton to upsert data, and now we have added new functions to delete some existing data.

[HUDI-328] Adding delete api to HoodieWriteClient #1004
*[HUDI-377] Adding Delete() support to DeltaStreamer**#1073

So I think if we have delete api, should we add another method to get deleted data in incremental view?

I've looked at the methods for generating new parquet files. I think the main idea is to combine old and new data, and then filter the data which need to be deleted, so that the deleted data does not exist in the new dataset. However, in this way, the data to be deleted will not be retained in new dataset, so that only the inserted or modified data can be found according to the existing timestamp field during data tracing in incremental view.
If we can do it, I feel that there are two ideas to consider:

  1. Trace the dataset in the same file at different time check points according to the timeline, compare the two datasets according to the key and filter out the deleted data. This method does not consume extra when writing, but it needs to call the analysis function according to the actual request during query, which consumes a lot.
  2. When writing data, if there is any deleted data, we will record it. File name such as .delete_filename_version_timestamp. So that we can immediately give feedback according to the time. But additional processing will be done at the time of writing.

 

JIRA info


Comments

30/Dec/19 18:12;vinoth;Do you think this is specific to deletes? Could we generalize this to a new config say 'include.before.image=true' in incremental pull where, you get two val;ues in incremental pull. Currently, you will only get one value per record upserted/deleted

 

 
||Operation||include.before.image=false||include.before.image=true||
|insert|new_value_inserted|[null, new_value_inserted]|
|update/soft delete|new_value_updated|[old_value, new_value]|
|hard delete|May not get anything today.|[deleted_value, null]|

 ;;;


14/Mar/20 09:01;yanghua;[~vinoth] IMO, support query delete data in the incremental view is a key feature. Comparing with MySQL binlog CDC for incremental processing, we'd better provide deleted data for the further purpose. In our company's data warehouse scenes, we have a strong business requirement.;;;


16/Mar/20 15:13;vinoth;[~yanghua] This is an interesting topic for sure. Lets discuss.. 

 

Are you interested in getting both the before and after images like above or just want a stream of deleted record keys? ;;;


17/Mar/20 03:31;yanghua;[~vinoth] Yes, I am absolutely interested in this topic. IMO, before/after is a good direction. Let's support binlog feature on Hadoop.;;;


23/Mar/20 02:32;vinoth;This needs an RFC to drive the design first.. At a high level, it seems like with some additional cleaner retention and some merging of old and current file slices, we should be able to do something of this sort. ;;;


21/Apr/20 00:33;chenxiang;[~vinoth] [~yanghua] Maybe I can open a RFC and write down my own thoughts, and then we can discuss the feasibility of the plan;;;


21/Apr/20 03:52;yanghua;[~chenxiang] Glad to hear this. We are busy with other things. Please go ahead!;;;


01/Jul/20 07:23;yanghua;[~chenxiang] What's the progress?;;;


01/Jul/20 07:28;yanghua;[~vinoth] For the hard deletion, as default behavior, can we log the row key list as the metadata of a commit? 

Additionally, we can introduce an advanced config option. If users want to see the whole deleted row. They can open it, then we could log all the fields of the deleted row.;;;


02/Jul/20 21:16;vinoth;[~yanghua] for MOR the list of keys is already recorded in the delta log files.. its just a matter of reading it.. See HoodieDeleteBlock.

For COW, we may not want to support this intiially? 

 

 row key list as the metadata of a commit

This will become a scaling problem IMO.. storing data along with metadata.. wdyt

 ;;;


03/Jul/20 03:21;yanghua;{quote}
For COW, we may not want to support this intiially? 
{quote}

Actually, I am mainly thinking about the implementation of this feature for COW.

 {quote}
This will become a scaling problem IMO.. storing data along with metadata.. wdyt
{quote}

Yes, so I am also thinking if we can only store the row key. Considering the major scene of this feature is that the downstream tables want to delete those deleted rows.
;;;


07/Jul/20 23:31;vinoth;even just storing the row_key, if there are millions of deletes in a batch (e.g GDPR), it will be problematic.. 

IMO we should focus on supporting this on MOR..

 

For COW, if you really want to go after this.. we can read the previous version of the base file and deduce what key was deleted? 

e.g. 

f1_commit1.parquet =>. k1, k2, k3

f1_commit2.parquet => k1, k3 (record in .hoodie/commit2.commit that there was a delete) 

 

Now incremental pull, when you supply `option("hoodie.include.deleted.keys", "true"), we see that the commit2 has deletes, and then read all keys from f1_commit1.parquet as well, merge and send along with data written into f1_commit2.parquet.. This would be a very cool thing to implement

 ;;;


08/Jul/20 05:39;yanghua;Yes, you provide a differentiated implementation of this feature based on different storage views.

My initial idea may have gone another way, which is a bit different from yours. It has two key points:

  1. At the feature level, both views (COW, MOR) are supported. I think if we move towards a bifurcation in the feature, will it give users a bad experience in the future; they need to infer based on the feature. Selected view? A better experience is to choose the view according to their own scene, but each view provides similar functional features.

  2. The implementation level is consistent, similar to our design for the two views on the index. We may introduce external storage to store these "metadata" (row keys), so that it will be more likely to be implemented as a plug-in, and the metadata storage framework can be replaced, and it is also a lightweight for Hudi Level metadata system.

The above are some of my previous points, just because my understanding of the core implementation of Hudi is not thorough enough.

Of course, I also agree to focus directly on Hudi's storage and views, which can avoid introducing too many external dependencies. But we need to provide implementation methods for two different storage views.;;;


13/Oct/20 13:03;chenxiang;[~vinoth]
[~yanghua]

Sorry for starting to discuss this topic as long as so far. I have considered a general implementation based on spark to complete incremental query for deleted data:

  1. We need to create a new method that use HoodieWriteStat.getNumDeletes() to filter files which contains delete rows.
  2. We also need to create a new method that use HoodieWriteStat.getPrevCommit() to find previous file with same fileId.
  3. Creating a new method that check current files and previous files with same fileId, and then filter out deleted rows. The implementation method is similar to spark merge method. It builds an ExternalSpillableMap (including COW parquet and MOR log) of the current file datasets, and then determines whether it is in prev ExternalSpillableMap. If the old key does not exist in ExternalSpillableMap, it is a deleted row.
  4. Add a new column named _ hoodie_ delete_ which is the commit time of the current files. It is used to indicate the deletion time.
  5. Add a new incremental view called DELETE , we can add a new type DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL

{code:scala}
Dataset hudiIncQueryDF = spark.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), )
.load(tablePath);

hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select * from hudi_trips_incremental where fare > 20.0").show()
{code}

I haven't figured out how to implement it in Hive yet. But I figured out how to query in Hive.
We can add a mode called 'DELETE'. Delete views in Hive can used by this:

{code:sql}
set hoodie.tableName.consume.mode=DELETE
set hoodie.tableName.consume.start.timestamp=20101013232359
set hoodie.tableName.consume.max.commits=1
{code}

;;;


14/Oct/20 06:32;yanghua;[~chenxiang] Thanks for raising up this feature again. For step 1, {{HoodieWriteStat.getNumDeletes()}} only returns a number, how to filter? Anyway, we may need a document to give a more detailed design. WDYT?;;;


14/Oct/20 07:03;chenxiang;[~yanghua]
In step 1, we just filter which files contains delete rows, so that we can use delete numbers value to check out. I've write some codes like this:

{code:java}
HoodieInstant instant = timeline.filter(i -> i.getTimestamp().equals(commitTime)).firstInstant().get();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);

for (List stats : commitMetadata.getPartitionToWriteStats().values()) {
for (HoodieWriteStat stat : stats) {
if (stat.getPrevCommit() != null && stat.getNumDeletes() > 0) {
LOG.info("file name is {} in {} with partition {}, and prev commit is {}",
stat.getFileId(), stat.getPath(), stat.getPartitionPath(), stat.getPrevCommit());
}
}
}
{code}

In step 2, we can use the files filtered out in step 1 and collect files by prev commit. Step 1 and Step 2 can be implemented together in a method.

I'll write some of my ideas later on for a more specific document.;;;


21/Oct/20 02:20;vinoth;If we can start a Rfc on this that would be great;;;


08/Dec/20 14:55;chenxiang;[~vinoth] [~yanghua]
I opened a project in https://github.com/shangyuantech/hudi-delete-view
This project is based on some production environment scenarios, I think it can show some ideas about querying deleted rows.

;;;


11/Dec/20 08:39;vinoth;[~chenxiang] I could not understand it well. 

[~yanghua] thoughts? ;;;


13/Dec/20 12:56;chenxiang;[~vinoth]
Thanks for your reply ~. I retranslated it into an English explanation. I don't know if this readme can be better understood.
The main idea is to extract the last file according to prevcommit and then make comparison.;;;


16/Mar/22 10:07;fnie;[~vinoth]  [~chenxiang] 

we also face the similar issue, our job's business logic need to figure out one record's current and previous value, so the developer can compare the two values, then decide to proceed different logic , so we are really considering your idea in the top comment you mention in this Jira:

<<<<<<<<<<<<<<

Do you think this is specific to deletes? Could we generalize this to a new config say 'include.before.image=true' in incremental pull where, you get two values in incremental pull. Currently, you will only get one value per record upserted/deleted

 

 
||Operation||include.before.image=false||include.before.image=true||
|insert|new_value_inserted|[null, new_value_inserted]|
|update/soft delete|new_value_updated|[old_value, new_value]|
|hard delete|May not get anything today.|[deleted_value, null]|

<<<<<<<<<<<<<<<<<<<<<<<<

we are thinking developing an udf to support this, like below:

select status,pre_value(status) as pre_status

from table_xxxx where _hoodie_commit_time > xxxx;

 

because we havent figure out how to show the result from sql for one column "old_value, new_value"...  any thought? or is there any feature ongoing can meet our requirement ? thanks.

 ;;;


05/Jul/22 00:40;chenxiang;[fnie]
Hi
What progress have you made so far, I haven't continued since as I haven't dealt with related matters since. If there is any progress on your side, I feel we can discuss it together first (chen xiang on slack) and see what can be done for the community.;;;

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions