Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] PreCombineAndUpdate in Payload #1582

Closed
nandini57 opened this issue May 1, 2020 · 11 comments
Closed

[SUPPORT] PreCombineAndUpdate in Payload #1582

nandini57 opened this issue May 1, 2020 · 11 comments
Assignees

Comments

@nandini57
Copy link

@vinothchandar @bvaradar

In continuation to recently raised issue 1569,for custom merge logic, is there a way to preserve the currentValue on Disk.It seems in HoodieMergeHandler copyOldRecord flag is false and currentval is lost

@OverRide
public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {

@bvaradar
Copy link
Contributor

bvaradar commented May 2, 2020

@nandini57 : The flag is for internal hudi logic to preserve old record when hudi is not able to create a valid updated record to write.

I am not sure I am following your use-case. From #1569 , if you are using unique keys per batch, you should not be seeing merges anyways.

@bvaradar bvaradar self-assigned this May 2, 2020
@nandini57
Copy link
Author

My apologies. Let me try to explain.If i don't upsert the data with each batch where applicable,when i query back the table,it will have duplicates as batch "n" need to have data from batch "n-1","n-2" ... I need to do group by upsertKey ..max(commit_time) to get the latest view of data.Doing a group by with each read won't scale .

Instead of this, if i can preserve the current_val with deleted identifier in CustomPayload and also return both incoming and current payload in Combine And Get, i can preserve the required data for audit and also read can filter out records with deleted identifier.

Does this make sense.Any other ideas? Possibly ,making the copyOldRecord a configurable property with default as false if that doesn't impact anything else

@bvaradar
Copy link
Contributor

bvaradar commented May 2, 2020

Thanks for the details. One of the primary contract within Hudi is the uniqueness of record key within partition/dataset. Instead, can you materialize the grouping within the record. To elaborate, can you create a nested array of struct field : "audit_log" (inner struct having same structure as top-level struct without audit_log) in your schema which would contain basically the list of record images at each ingest time and have your custom payload append all previous images as part of combineAndGetUpdateValue and preCombine. This way if you want the latest image, you simply have to skip projecting "audit_log" in your query and don't have to deal with reduce-by.

@nandini57
Copy link
Author

i did think about this, but our schemas are heavily nested and contain more than 5000 cols even for a very decent one .Need to think more around it.

If i rethink the problem as figuring out my state of data as of business day <=X, it is possible to track if i tag the recordkey while inserting with _X,_X-1,_X-2 etc.Does it sound logical thing to do?

@bvaradar
Copy link
Contributor

bvaradar commented May 5, 2020

Sorry, not following your solution. Are you referring to creating unique record keys per batch and treating them as inserts ?

@nandini57
Copy link
Author

nandini57 commented May 6, 2020

Hi Balaji,
If i rephrase the problem , at commit X, i have 3 records.I do an upsert and change 1 record at commit X+1.As of commit X+1,i have the latest view of data which is fine.But as of commit X, i don't get the view of data as commit X(the original 3 recs).when i do a spark read with parquet format i get a full view which includes both X and X+1 but how do i filter as of X

+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+----------------+----+---+----------+-------+--------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |name |team|ts |recordKey |batchId|uniqueCd| +-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+----------------+----+---+----------+-------+--------+ |20200506122159 |20200506122159_0_4 |5 |-464907625 |c7fd919f-0152-438f-b4b6-882464570f67-0_0-34-36_20200506122159.parquet|5 |sandeep-upserted|hudi|2 |-464907625|18 |18_5 | |20200506122143 |20200506122143_0_2 |1 |-464907625 |c7fd919f-0152-438f-b4b6-882464570f67-0_0-5-5_20200506122143.parquet |1 |mandeep |hudi|1 |-464907625|15 |15_1 | |20200506122143 |20200506122143_0_3 |2 |-464907625 |c7fd919f-0152-438f-b4b6-882464570f67-0_0-5-5_20200506122143.parquet |2 |jhandeep |modi|2 |-464907625|15 |15_2 | |20200506122143 |20200506122143_0_1 |5 |-464907625 |c7fd919f-0152-438f-b4b6-882464570f67-0_0-5-5_20200506122143.parquet |5 |sandeep |hudi|1 |-464907625|15 |15_5 | |20200506122143 |20200506122143_0_2 |1 |-464907625 |c7fd919f-0152-438f-b4b6-882464570f67-0_0-5-5_20200506122143.parquet |1 |mandeep |hudi|1 |-464907625|15 |15_1 | |20200506122143 |20200506122143_0_3 |2 |-464907625 |c7fd919f-0152-438f-b4b6-882464570f67-0_0-5-5_20200506122143.parquet |2 |jhandeep |modi|2 |-464907625|15 |15_2 | +-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+----------------+----+---+----------+-------+--------+

`

@nandini57
Copy link
Author

nandini57 commented May 6, 2020

Probably switching to parquet format instead of hudi and doing a spark.read.parquet(partitionpath).dropduplicates where commit_time= X is an option? The following works if i want to go back to commit X and have a view of data.However,the same with hudi format doesn't provide me the right view as of commit X

def audit(spark: SparkSession, partitionPath: String, tablePath: String, commitTime: String): Unit = {
val hoodieROViewDF = spark.read.option("inferSchema", true).parquet(tablePath + "/" + partitionPath)
hoodieROViewDF.createOrReplaceTempView("hoodie_ro")
spark.sql("select * from hoodie_ro where _hoodie_commit_time =" + commitTime).dropDuplicates().show()
}

Did a lil bit digging and the following code in HoodieROTablePathFilter seems to be taking only latest BaseFile and thus dropping the other files.The impact of this is in my case ,i get incorrect view as of time X as it is reading latest file which has 2 records as of time X and 1 is upserted and got a new commit time.Is the understanding correct?

How do i get around this?Can i use a custom path filter?

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
            metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), fs.listStatus(folder));
    List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());

@nandini57
Copy link
Author

Turns out, incremental query is what can get me the data back in time.Thanks again

public static void audit(SparkSession spark,String tablePath,Long commitTime) {
spark.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY (), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitTime -1 )
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY(), commitTime)
.load(tablePath).show ();
}

@nsivabalan
Copy link
Contributor

Can we close if the issue is resolved?

@nandini57
Copy link
Author

Can i get a list of open issues with incremental query option to be aware if anything can hit my job?
You can close this otherwise.Thanks in advance

@bvaradar
Copy link
Contributor

Jira (https://jira.apache.org/jira/projects/HUDI/summary) would be a good place to look at. For Copy ON Write table, you should not see any surprises w.r.t to query engine support. Spark DataSource Support for incremental view over merge on read table is an open item.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants