[HUDI-3397] Guard repeated rdd triggers#6878
[HUDI-3397] Guard repeated rdd triggers#6878nsivabalan wants to merge 2 commits intoapache:masterfrom
Conversation
ffb92d0 to
536bae7
Compare
19d74d6 to
43193e5
Compare
43193e5 to
a4a7199
Compare
|
|
||
| @Override | ||
| public int getNumPartitions() { | ||
| return 1; |
| HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>(); | ||
| updateIndexAndCommitIfNeeded(writeStatuses, result); | ||
| // dereference rdd so that no double de-referencing can happen by mistake. | ||
| int numPartitions = Math.max(1, writeStatuses.getNumPartitions()); |
There was a problem hiding this comment.
don't think we need to guard it by min 1. the API getNumPartitions() should guarantee meaningful return value.
There was a problem hiding this comment.
@nsivabalan we just might need to guard against an empty RDD, but otherwise since we're working w/ an RDD in here i think we can assume that it shouldn't be returning an invalid value
| updateIndexAndCommitIfNeeded(writeStatuses, result); | ||
| // dereference rdd so that no double de-referencing can happen by mistake. | ||
| int numPartitions = Math.max(1, writeStatuses.getNumPartitions()); | ||
| HoodieData<WriteStatus> computedWriteStatus = HoodieJavaRDD.of(writeStatuses.collectAsList(), (HoodieSparkEngineContext) context, numPartitions); |
There was a problem hiding this comment.
If write status tracking success record, will collecting them in driver bring pressure on driver memory?
There was a problem hiding this comment.
+1 This may not work in case of large inserts with millions of inserts. Success record tracking will be enabled when certain indexes (e.g. record index) is enabled in the MDT.
|
@nsivabalan : Is this PR still relevant or can be closed ? |
Change Logs
We have had issues where stages involving writing data to disk were de-referenced multiple times which should not be happening. We are putting in guard rails so avoid such mis-steps.In this patch, we take control of the dag and trigger de-referencing of actual write and create Rdd again so that any downstream callers will never be able to trigger previous write stage by mistake.
Impact
We are changing our dag in the sense that, when exactly the write happens will change after this patch.
Risk level: high
We are changing the dag since we are explicitly de-referencing. For eg, incase of auto commit disable, after writeclient.insert() returns to the caller and if caller tries to dereference WriteStatus, the actual execution kicks in and the write gets triggered. But after this change, it may not be the case. Write will get triggered just before index update irrespective of whether auto commit is enabled or disabled.
Documentation Update
Not applicable.
Contributor's checklist