[HUDI-5298] Optimize WriteStatus storing HoodieRecord#8472
[HUDI-5298] Optimize WriteStatus storing HoodieRecord#8472xiaochen-zhou wants to merge 11 commits intoapache:masterfrom xiaochen-zhou:HUDI-5298
Conversation
|
|
||
| public class HoodieRecordStatus implements Serializable, KryoSerializable { | ||
|
|
||
|
|
There was a problem hiding this comment.
key + location are actually an index item, just rename it to IndexItem ?
There was a problem hiding this comment.
key + location are actually an index item, just rename it to
IndexItem?
Thank you very much for your review, I have modified the code, can you re-review the code when you are free, and make some comments.
|
It is great if we can have numbers to illustrate the gains after the patch, like the cost reduction for memory or something. |
I would be happy to do it. |
# Conflicts: # hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
I did a test based on your suggestion: WriteStatus status = new WriteStatus(false, 1.0);
for (int i = 0; i < 1000 * 100; i++) {
status.markSuccess(mock(HoodieRecord.class), Option.empty());
status.markFailure(mock(HoodieRecord.class), t, Option.empty());
}
System.out.println("status memory: " + ObjectSizeCalculator.getObjectSize(status));The memory occupied by private final List<HoodieRecord> writtenRecords = new ArrayList<>();
private final List<HoodieRecord> failedRecords = new ArrayList<>();The memory occupied by private final List<IndexItem> writtenRecordIndexes = new ArrayList<>();
private final List<IndexItem> failedRecordIndexes = new ArrayList<>(); |
The memory occupied by |
bvaradar
left a comment
There was a problem hiding this comment.
We would need to keep failed records as is for logging the complete record elsewhere cc @nsivabalan
|
@prashantwason @nbalajee @suryaprasanna would this break you all in anyway? Do we need the record data anywhere for successful writes? cc @rmahindra123 as well. same question. |
|
|
||
| import java.io.Serializable; | ||
|
|
||
| public class IndexItem implements Serializable, KryoSerializable { |
There was a problem hiding this comment.
Give some doc to the class.
| * Identifies the record across the table. | ||
| */ | ||
| protected HoodieKey key; | ||
|
|
There was a problem hiding this comment.
Can we make all these members private and final?
There was a problem hiding this comment.
Can we make all these members private and final?
Thank you very much for review and sorry for the late response. I will try to modify the code according to your suggestions
There was a problem hiding this comment.
Can we make all these members private and final?
We may not be able to make all these members final because they need to be reassigned
@Override
public final void read(Kryo kryo, Input input) {
this.key = kryo.readObjectOrNull(input, HoodieKey.class);
this.currentLocation = (HoodieRecordLocation) kryo.readClassAndObject(input);
this.newLocation = (HoodieRecordLocation) kryo.readClassAndObject(input);
}
record index implementation requires the record key and the location to create the mapping in the index. This is similar requirement to other non-implicit indexes like HBaseIndex. |
|
@clownxc If I understand correctly, the memory savings are coming from dropping the "data" part of the HoodieRecord? I noticed that HoodieRecord has only 2 additional members - sealed (boolean) and data (t). Are the savings due to usage of the mock class (which may have bloating compared to the original HoodieRecord)? But hoodie write handles deflate the HoodieRecord after writing so the data portion should go away reducing the amount of savings possible. Can you run the test again with these changes:
I feel the above may give a more realistic view of savings. Also, how did you find this interesting optimization? I am interested as there may be other avenues of such savings within HUDI so if would be good to know how you track these. |
I would be happy to do it. |
this interesting optimization was reported by @nsivabalan HUDI-5298 and has not been implemented for a long time. so I try to complete it. |
|
According to the suggestion provided by @prashantwason , I did a test as follows: WriteStatus status = new WriteStatus(true, 1.0);
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 1000);
Throwable t = new Exception("some error in writing");
for (int i = 0; i < 1000 ; i++) {
HoodieRecord data1 = records.get(i);
status.markSuccess(data1, Option.empty());
data1.deflate();
HoodieRecord data2 = records.get(i++);
status.markFailure(data2, t, Option.empty());
data2.deflate();
}
System.out.println("status memory: " + ObjectSizeCalculator.getObjectSize(status));It was found that the memory space occupation before(status memory: 113048) and after optimization(status memory: 117032) basically did not change, The main reason is that I have a doubt that if there is some optimization needed for writeStatus.markSuccess(hoodieRecord, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
hoodieRecord.deflate();
return finalRecordOpt;
} catch (Exception e) {
LOG.error("Error writing record " + hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}or, In some places, there will be no if (indexedRecord.isPresent()) {
// Skip the ignored record.
try {
if (!indexedRecord.get().shouldIgnore(writeSchema, recordProperties)) {
recordList.add(indexedRecord.get());
}
} catch (IOException e) {
writeStatus.markFailure(record, e, record.getMetadata());
LOG.error("Error writing record " + indexedRecord.get(), e);
}
}Although the optimized effect may not have a large benefit public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
// Guaranteed to have at-least one error
failedRecords.add(record);
errors.put(record.getKey(), t);
}
totalRecords++;
totalErrorRecords++;
}hope you leave some comments in your free time. @prashantwason @danny0405 @vinothchandar |
|
@clownxc : For failed records, we need to have them logged elsewhere and so no need to deflate. For exception cases, the write status should be marked as failure. So, I don't see any reason to change this. |
I see, Thank you very much for review. |
|
Cool, I think we are good to close this issue.. |
Change Logs
WriteStatus stores the entire HoodieRecord. we can optimize it to store just the required info (record key, partition path, location).
Impact
Optimize
WriteStatusto store just the required info (record key, partition path, location).Risk level (write none, low medium or high below)
low
Documentation Update
none
Contributor's checklist