[HUDI-8310] Fix the bug where the Flink table config hoodie.populate.meta.fields is not effective and optimize write performance#12065
Conversation
…meta.fields is not effective and optimize write performance
…meta.fields is not effective and optimize write performance
…meta.fields is not effective and optimize write performance
…meta.fields is not effective and optimize write performance
| HoodieRecord populatedRecord = | ||
| finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); | ||
| if (!metadataValues.isEmpty()) { | ||
| populatedRecord = finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); |
There was a problem hiding this comment.
In #11028, we already fixed the unnecessary rewrite when the schemas are exactly the same, is your benchmark based on the fix then?
| // Check if operation metadata fields are allowed | ||
| if (writeConfig.allowOperationMetadataField()) { | ||
| if (!writeConfig.populateMetaFields()) { | ||
| throw new HoodieException("Operation metadata fields are allowed, but populateMetaFields is not enabled. " |
There was a problem hiding this comment.
Operation metadata fields are allowed -> Operation metadata field is allowed
There was a problem hiding this comment.
Operation metadata fields are allowed->Operation metadata field is allowed
Could you please help explain why it is possible to enable the 'allow operation meta field' when the 'populate meta field' is turned off?
There was a problem hiding this comment.
The operation field is introduced to mark the row-level operation type, for the use cases where metadata fields are disabled (pure inserts), I think it makes sense to also disable the operation field because all the operation types should be I (insert).
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
Show resolved
Hide resolved
| .defaultValue(true) | ||
| .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated " | ||
| + "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing"); | ||
| + "and incremental queries will not be functional. In the disabled state, the number of record key fields must be equal to one."); |
There was a problem hiding this comment.
Why change the limitations?
There was a problem hiding this comment.
Why change the liminations?
Because when querying with the Log Scanner, it is necessary to merge records based on the record key. The original design only supports the generation of Simple Key Generator (simple key field). This PR temporarily uses the original design and verifies that Complex Key Generator is also acceptable. Therefore, we are currently limiting it to a single record key field.
The code for generating the record key is as follows:
// AbstractHoodieLogRecordReader#processDataBlock, calls wrapIntoHoodieRecordPayloadWithParams to generate HoodieRecord
HoodieRecord completedRecord = recordIterator.next()
.wrapIntoHoodieRecordPayloadWithParams(recordsIteratorSchemaPair.getRight(),
hoodieTableMetaClient.getTableConfig().getProps(),
recordKeyPartitionPathFieldPair,
this.withOperationField,
this.partitionNameOverrideOpt,
populateMetaFields,
Option.empty());
// HoodieAvroRecord#wrapIntoHoodieRecordPayloadWithParams, simpleKeyGenFieldsOpt indicates that the Record Key can be generated from the content
public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
Schema recordSchema, Properties props,
Option<Pair<String, String>> simpleKeyGenFieldsOpt,
Boolean withOperation,
Option<String> partitionNameOp,
Boolean populateMetaFields,
Option<Schema> schemaWithoutMetaFields) throws IOException {
IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(recordSchema, props).get();
String payloadClass = ConfigUtils.getPayloadClass(props);
String preCombineField = ConfigUtils.getOrderingField(props);
return HoodieAvroUtils.createHoodieRecordFromAvro(indexedRecord, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields, schemaWithoutMetaFields);
}
There was a problem hiding this comment.
Yeah, it looks like there is a limitation in the code:
public static <R> HoodieRecord<R> convertToHoodieRecordPayload(GenericRecord record, String payloadClazz,
String preCombineField,
Pair<String, String> recordKeyPartitionPathFieldPair,
boolean withOperationField,
Option<String> partitionName,
Option<Schema> schemaWithoutMetaFields) {
final String recKey = record.get(recordKeyPartitionPathFieldPair.getKey()).toString();
final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
record.get(recordKeyPartitionPathFieldPair.getRight()).toString());I think we should support composite record keys for non-metadata fields use cases in the near future.
| .setTableVersion(conf.getInteger(FlinkOptions.WRITE_TABLE_VERSION)) | ||
| .setDatabaseName(conf.getString(FlinkOptions.DATABASE_NAME)) | ||
| .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) | ||
| .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD)) |
There was a problem hiding this comment.
Why change the default value, may not work for pk-less table scenario.
…meta.fields is not effective and optimize write performance
Change Logs
1. Fix the bug
hoodie.populate.meta.fieldsin Table Config (hoodie.properties)2. Optimize write performance
Impact
Improve write performance. After optimization, the write speed with
hoodie.populate.meta.fields=falseis 42.9% faster than withhoodie.populate.meta.fields=true.Testing method
Consume from the earliest position in Kafka until all messages are consumed (Kafka Lag = 0), and compare the time taken for both.
1)populate meta fields

time taken: 21hours and 25mins
2)no meta fields

time taken: 12hours and 14mins
Risk level (write none, low medium or high below)
medium
Documentation Update
none
Contributor's checklist