Skip to content

[HUDI-9527] Switch to HoodieFileGroupReader in HoodieTableMetadataUtil #13445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class HoodieInternalRow extends InternalRow {
*/
private final UTF8String[] metaFields;
private final InternalRow sourceRow;
// indicates whether this row represents a delete operation. Used in the CDC read.
private boolean isDeleteOperation;

/**
* Specifies whether source {@link #sourceRow} contains meta-fields
Expand All @@ -71,24 +73,39 @@ public HoodieInternalRow(UTF8String commitTime,
UTF8String fileName,
InternalRow sourceRow,
boolean sourceContainsMetaFields) {
this.metaFields = new UTF8String[] {
commitTime,
commitSeqNumber,
recordKey,
partitionPath,
fileName
};

this.sourceRow = sourceRow;
this.sourceContainsMetaFields = sourceContainsMetaFields;
this(new UTF8String[] {commitTime, commitSeqNumber, recordKey, partitionPath, fileName}, sourceRow, sourceContainsMetaFields, false);
}

public HoodieInternalRow(UTF8String[] metaFields,
InternalRow sourceRow,
boolean sourceContainsMetaFields) {
this(metaFields, sourceRow, sourceContainsMetaFields, false);
}

private HoodieInternalRow(UTF8String[] metaFields,
InternalRow sourceRow,
boolean sourceContainsMetaFields,
boolean isDeleteOperation) {
this.metaFields = metaFields;
this.sourceRow = sourceRow;
this.sourceContainsMetaFields = sourceContainsMetaFields;
this.isDeleteOperation = isDeleteOperation;
}

public static HoodieInternalRow createDeleteRow(UTF8String recordKey, UTF8String partitionPath, InternalRow sourceRow) {
if (sourceRow instanceof HoodieInternalRow) {
HoodieInternalRow hoodieInternalRow = (HoodieInternalRow) sourceRow;
hoodieInternalRow.isDeleteOperation = true;
return hoodieInternalRow;
}
UTF8String[] metaFields = sourceRow != null ? new UTF8String[0] : new UTF8String[] {
null,
null,
recordKey,
partitionPath,
null
};
return new HoodieInternalRow(metaFields, sourceRow, false, true);
}

@Override
Expand Down Expand Up @@ -130,7 +147,7 @@ public boolean isNullAt(int ordinal) {
if (ordinal < metaFields.length) {
return metaFields[ordinal] == null;
}
return sourceRow.isNullAt(rebaseOrdinal(ordinal));
return sourceRow == null || sourceRow.isNullAt(rebaseOrdinal(ordinal));
}

@Override
Expand Down Expand Up @@ -254,4 +271,8 @@ private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType) {
throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String at (%d) as %s", ordinal, expectedDataType.getName()));
}
}

public boolean isDeleteOperation() {
return isDeleteOperation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRo
return new HoodieSparkRecord(hoodieKey, row, HoodieInternalRowUtils.getCachedSchema(schema), false);
}

@Override
public boolean isDeleteOperation(InternalRow record) {
return record instanceof HoodieInternalRow && ((HoodieInternalRow) record).isDeleteOperation();
}

@Override
public InternalRow seal(InternalRow internalRow) {
return internalRow.copy();
Expand Down Expand Up @@ -171,9 +176,6 @@ public Comparable convertValueToEngineType(Comparable value) {

@Override
public InternalRow getDeleteRow(InternalRow record, String recordKey) {
if (record != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this? The record should already been marked as delete in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spark row doesn't have an operation type field like the flink row data does. Now this will always use an internal row to represent the row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying the records written to log data block from spark is always not a delete? This is not true as when the customized delete marker is there.

Can we return HoodieInternalRow with isDelete set up in there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we read the data from the log block, it is not going to have an operation type set. We can determine it is a delete by inspecting the record for _hoodie_is_delete or by a field used to mark deletes or the merger will output an empty option to signal a delete in the case of custom mergers and payloads.

Copy link
Contributor

@danny0405 danny0405 Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns a InternalRow with just record key and patition path set up there will lost the other payload fields, not sure if it works for cdc read scenarios(the deletes for retraction)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the handling here to simply wrap the row if it is non-null so we can still detect it is a delete but otherwise do not modify it.

return record;
}
return new HoodieInternalRow(null, null, UTF8String.fromString(recordKey), UTF8String.fromString(partitionPath), null, null, false);
return HoodieInternalRow.createDeleteRow(UTF8String.fromString(recordKey), UTF8String.fromString(partitionPath), record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,29 @@
* Avro record excluding the internal {@link Schema}.
*/
public class AvroRecordSizeEstimator implements SizeEstimator<BufferedRecord<IndexedRecord>> {
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AvroRecordSizeEstimator.class);
private final Schema recordSchema;
private final long sizeOfSchema;

public AvroRecordSizeEstimator(Schema recordSchema) {
this.recordSchema = recordSchema;
sizeOfSchema = ObjectSizeCalculator.getObjectSize(recordSchema);
}

@Override
public long sizeEstimate(BufferedRecord<IndexedRecord> record) {
long sizeOfRecord = ObjectSizeCalculator.getObjectSize(record);
// generated record do not contain Schema field, so do not need minus size of Schema.
if (record.getRecord() instanceof SpecificRecord) {
return sizeOfRecord;
try {
long sizeOfRecord = ObjectSizeCalculator.getObjectSize(record);

// generated record do not contain Schema field, so do not need minus size of Schema.
if (record.getRecord() instanceof SpecificRecord) {
return sizeOfRecord;
}
// do not contain size of Avro schema as the schema is reused among records
return sizeOfRecord - sizeOfSchema + 8;
} catch (Throwable e) {
LOG.error("Failed to estimate size of Avro record: {} with schema:{}", record, recordSchema, e);
throw new RuntimeException("Failed to estimate size of Avro record: " + record, e);
}
// do not contain size of Avro schema as the schema is reused among records
return sizeOfRecord - sizeOfSchema + 8;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -37,7 +39,6 @@
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -68,15 +69,13 @@
* This implementation does not rely on a specific engine and can be used in any JVM environment as a result.
*/
public class HoodieAvroReaderContext extends HoodieReaderContext<IndexedRecord> {
private final String payloadClass;

public HoodieAvroReaderContext(
StorageConfiguration<?> storageConfiguration,
HoodieTableConfig tableConfig,
Option<InstantRange> instantRangeOpt,
Option<Predicate> filterOpt) {
super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt);
this.payloadClass = tableConfig.getPayloadClass();
}

@Override
Expand Down Expand Up @@ -120,7 +119,7 @@ public GenericRecord convertToAvroRecord(IndexedRecord record, Schema schema) {

@Override
public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) {
throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
return new IndexedDeleteRecord(recordKey, partitionPath, record);
}

@Override
Expand Down Expand Up @@ -153,17 +152,18 @@ public String getMetaFieldValue(IndexedRecord record, int pos) {

@Override
public HoodieRecord<IndexedRecord> constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
if (bufferedRecord.isDelete()) {
return SpillableMapUtils.generateEmptyPayload(
bufferedRecord.getRecordKey(),
partitionPath,
bufferedRecord.getOrderingValue(),
payloadClass);
return new HoodieAvroRecord(hoodieKey, new EmptyHoodieRecordPayload());
}
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}

@Override
public boolean isDeleteOperation(IndexedRecord record) {
return record instanceof IndexedDeleteRecord;
}

@Override
public IndexedRecord seal(IndexedRecord record) {
return record;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

public class IndexedDeleteRecord implements IndexedRecord {
private final String[] metaFields;
private final IndexedRecord record;

public IndexedDeleteRecord(String recordKey, String partitionPath, IndexedRecord record) {
this.metaFields = new String[] {
null,
null,
recordKey,
partitionPath,
null
};
this.record = record;
}

@Override
public void put(int i, Object v) {
throw new UnsupportedOperationException("IndexedDeleteRecord does not support put operation");
}

@Override
public Object get(int i) {
if (i < metaFields.length) {
return metaFields[i];
}
return record == null ? null : record.get(i);
}

@Override
public Schema getSchema() {
return record == null ? Schema.create(Schema.Type.NULL) : record.getSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecordSerializer;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
Expand Down Expand Up @@ -192,7 +192,7 @@ public SizeEstimator<BufferedRecord<T>> getRecordSizeEstimator() {
}

public CustomSerializer<BufferedRecord<T>> getRecordSerializer() {
return new DefaultSerializer<>();
return new BufferedRecordSerializer<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? It seems not necessary based on the benchmark: #13408 (comment)

}

/**
Expand Down Expand Up @@ -491,4 +491,12 @@ public Integer encodeAvroSchema(Schema schema) {
protected Schema decodeAvroSchema(Object versionId) {
return this.localAvroSchemaCache.getSchema((Integer) versionId).orElse(null);
}

/**
* Checks if the record is a delete operation.
*
* @param record The record to check.
* @return true if the record is a delete operation, false otherwise.
*/
public abstract boolean isDeleteOperation(T record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.Lazy;

import org.apache.avro.Schema;

Expand All @@ -41,12 +42,12 @@
*/
public class BufferedRecord<T> implements Serializable {
private final String recordKey;
private final Comparable orderingValue;
private final Lazy<Comparable> orderingValue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When deriving the BufferedRecord from the record with emitDeletes we can run into issues trying to extract the ordering value since the record is null. This allows us to only perform that evaluation if required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already cache the orderingVal in HoodieRecord, the Lazy wrapper will increase the bytes to serialize for the spillableMap too.

private T record;
private final Integer schemaId;
private final boolean isDelete;

public BufferedRecord(String recordKey, Comparable orderingValue, T record, Integer schemaId, boolean isDelete) {
public BufferedRecord(String recordKey, Lazy<Comparable> orderingValue, T record, Integer schemaId, boolean isDelete) {
this.recordKey = recordKey;
this.orderingValue = orderingValue;
this.record = record;
Expand All @@ -64,26 +65,26 @@ public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> record,
} catch (IOException e) {
throw new HoodieException("Failed to get isDelete from record.", e);
}
return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, props), record.getData(), schemaId, isDelete);
return new BufferedRecord<>(recordKey, Lazy.lazily(() -> record.getOrderingValue(schema, props)), record.getData(), schemaId, isDelete);
}

public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema schema, HoodieReaderContext<T> readerContext, Option<String> orderingFieldName, boolean isDelete) {
String recordKey = readerContext.getRecordKey(record, schema);
Integer schemaId = readerContext.encodeAvroSchema(schema);
Comparable orderingValue = readerContext.getOrderingValue(record, schema, orderingFieldName);
Lazy<Comparable> orderingValue = Lazy.lazily(() -> readerContext.getOrderingValue(record, schema, orderingFieldName));
return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, isDelete);
}

public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue) {
return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue, null, null, true);
public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue, Integer schemaId) {
return new BufferedRecord<>(deleteRecord.getRecordKey(), Lazy.eagerly(orderingValue), null, schemaId, true);
}

public String getRecordKey() {
return recordKey;
}

public Comparable getOrderingValue() {
return orderingValue;
return orderingValue.get();
}

public T getRecord() {
Expand Down
Loading
Loading