Skip to content

[HUDI-9527] Switch to HoodieFileGroupReader in HoodieTableMetadataUtil #13445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

the-other-tim-brown
Copy link
Contributor

@the-other-tim-brown the-other-tim-brown commented Jun 16, 2025

Change Logs

  • Removes usage of HoodieMergedLogRecordScanner in HoodieTableMetadataUtil and replace it with the HoodieFileGroupReader
  • Fixes handling of deleted records when reading as HoodieRecord

Impact

  • Uses new standard way of reading

Risk level (write none, low medium or high below)

Low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Jun 16, 2025
@the-other-tim-brown the-other-tim-brown marked this pull request as ready for review June 16, 2025 21:17
Comment on lines 1021 to 1023
HoodieTableMetaClient datasetMetaClient,
Option<Schema> writerSchemaOpt,
String latestCommitTimestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect indention.

@@ -171,9 +183,6 @@ public Comparable convertValueToEngineType(Comparable value) {

@Override
public InternalRow getDeleteRow(InternalRow record, String recordKey) {
if (record != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this? The record should already been marked as delete in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spark row doesn't have an operation type field like the flink row data does. Now this will always use an internal row to represent the row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying the records written to log data block from spark is always not a delete? This is not true as when the customized delete marker is there.

Can we return HoodieInternalRow with isDelete set up in there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we read the data from the log block, it is not going to have an operation type set. We can determine it is a delete by inspecting the record for _hoodie_is_delete or by a field used to mark deletes or the merger will output an empty option to signal a delete in the case of custom mergers and payloads.

Copy link
Contributor

@danny0405 danny0405 Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns a InternalRow with just record key and patition path set up there will lost the other payload fields, not sure if it works for cdc read scenarios(the deletes for retraction)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the handling here to simply wrap the row if it is non-null so we can still detect it is a delete but otherwise do not modify it.

public HoodieRecord<InternalRow> constructHoodieRecord(InternalRow row, Schema schema, Option<String> orderingFieldName) {
if (row instanceof HoodieInternalRow && ((HoodieInternalRow) row).isDeleteOperation()) {
return new HoodieEmptyRecord<>(
new HoodieKey(row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD).toString(), partitionPath),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the row already been marked as delete, we can just construct HoodieSparkRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HoodieSparkRecord will be marked as a delete if the row is null. Using an empty record is the common pattern adopted across the engines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown, @danny0405 , should we fix HoodieDeleteHelper as well, since for Avro, we use HoodieAvroRecord with EmptyHoodieRecordPayload; what about Flink, Hive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linliu-code what is the impact of using EmptyHoodieRecordPayload vs HoodieEmptyRecord?

@@ -120,7 +118,7 @@ public GenericRecord convertToAvroRecord(IndexedRecord record, Schema schema) {

@Override
public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) {
throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
return new DeleteRecord(recordKey, record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API is designed to return a record with the record keys set up ready in the record fields instead of a wrapper.
Things like DeleteRecord.getRecordKey is not a general API for readers..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this to match the pattern with HoodieInternalRow

* @param orderingFieldName The name of the ordering field, if any.
* @return A new instance of {@link HoodieRecord}.
*/
public abstract HoodieRecord<T> constructHoodieRecord(T record, Schema schema, Option<String> orderingFieldName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this we will need a way to determine if the record is a delete record even when the data may not be set. If there is a way to do this across all engine types (not just Flink) then we can use that. Without this you will get runtime errors when trying to read as a HoodieRecord.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a way to do this across all engine types (not just Flink) then we can use that.

Let's figure out a solution for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the solution is already proposed here and it works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only other option I see is to make the HoodieReaderContext have a method for isDeleteOperation(T record) and then we can use that to set the value for isDelete in this line, but that is just a roundabout way to get back to the same result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's give it a try. In the end, I think we should introduce our engine agnostice data structures there to unify all the discrepencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to have the FGReader return BufferedRecords directly which will already have this context for whether it represents a delete for that row. There are some other changes that may need to come with that like making the ordering value lookup lazy to avoid extra overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part here is only when there are log files or merging we could have a BufferedRecord.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed an update to add a check for whether the record is a delete operation in the reader context

this.isDeleteOperation = false;
}

private HoodieInternalRow(UTF8String recordKey,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is anyway private, can you take in isDeleteOperation as an argument. and so we can set it to tru in L 114. Just that looking at constructor arguments, its not very apparent that this is for delete operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. I've cleaned up the constructors to all route through a common all-args constructor.

private final String[] metaFields;
private final IndexedRecord record;

public DeleteIndexedRecord(String recordKey, String partitionPath, IndexedRecord record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexedDeleteRecord

@@ -58,6 +58,8 @@ public class HoodieInternalRow extends InternalRow {
*/
private final UTF8String[] metaFields;
private final InternalRow sourceRow;
// indicates whether this row represents a delete operation. Used in the CDC read.
private final boolean isDeleteOperation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just isDeleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to better align with the operation type used in Flink

@@ -107,7 +107,7 @@ public int hashCode() {

@Override
public String toString() {
return "DeleteRecord {"
return "DeleteIndexedRecord {"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be reverted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was a bad find and replace on my part

HoodieReaderContext<T> readerContext) {
if (writerSchemaOpt.isPresent() && !logFilePaths.isEmpty()) {
List<HoodieLogFile> logFiles = logFilePaths.stream().map(HoodieLogFile::new).collect(Collectors.toList());
FileSlice fileSlice = new FileSlice(partitionPath, logFiles.get(0).getFileId(), logFiles.get(0).getDeltaCommitTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta commit time of first log file may not be the file slice's base instant time right. but I see here we are setting it so.
Can you confirm this does not have any other side effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we don't rely on the instant time anywhere in the FileGroupReader but we can change this is there is a way to set it properly

.withFileSlice(fileSlice)
.withDataSchema(tableSchema)
.withRequestedSchema(HoodieAvroUtils.getRecordKeySchema())
.withLatestCommitTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we compute the latestInstant in driver and let the executor access it directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated

@@ -850,6 +846,7 @@ public static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(Hoodi
StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf();
Option<Schema> writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
ReaderContextFactory<T> readerContextFactory = engineContext.getReaderContextFactory(dataTableMetaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, should we consider initializing InstantRange ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it set in the current code, what would be the side-effect of not adding it?

Comment on lines 202 to 204
if (operation == HoodieOperation.DELETE) {
return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, getOrderingValue(rowData, schema, orderingFieldName), HoodieRecord.HoodieRecordType.FLINK);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can use HoodieDeleteHelper.createDeleteRecord function.

@@ -384,10 +384,8 @@ public ClosableIterator<T> getClosableIterator() throws IOException {
* @return An iterator over the records that wraps the engine-specific record in a HoodieRecord.
*/
public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws IOException {
return new CloseableMappingIterator<>(getClosableIterator(), nextRecord -> {
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, readerContext.getSchemaHandler().getRequestedSchema(), readerContext, orderingFieldName, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fix the isDelete flag here, the new introduced #constructHoodieRecord can be eliminated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, just needs some way to set this properly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -41,12 +42,12 @@
*/
public class BufferedRecord<T> implements Serializable {
private final String recordKey;
private final Comparable orderingValue;
private final Lazy<Comparable> orderingValue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When deriving the BufferedRecord from the record with emitDeletes we can run into issues trying to extract the ordering value since the record is null. This allows us to only perform that evaluation if required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already cache the orderingVal in HoodieRecord, the Lazy wrapper will increase the bytes to serialize for the spillableMap too.

byte[] avroBytes = recordSerializer.serialize(record.getRecord());
output.writeInt(avroBytes.length);
output.writeBytes(avroBytes);
byte[] recordBytes = record.getRecord() == null ? new byte[0] : recordSerializer.serialize(record.getRecord());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this would fail for deletes where the record is null

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@@ -192,7 +192,7 @@ public SizeEstimator<BufferedRecord<T>> getRecordSizeEstimator() {
}

public CustomSerializer<BufferedRecord<T>> getRecordSerializer() {
return new DefaultSerializer<>();
return new BufferedRecordSerializer<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? It seems not necessary based on the benchmark: #13408 (comment)

private final ByteArrayOutputStream baos;
private final RecordSerializer<T> recordSerializer;
// Caching kryo serializer to avoid creating kryo instance for every serde operation
private static final ThreadLocal<InternalSerializerInstance> SERIALIZER_REF =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kryo creation happens once for each file group reader so the cost should be low?

@@ -389,7 +392,8 @@ public ClosableIterator<T> getClosableIterator() throws IOException {
*/
public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws IOException {
return new CloseableMappingIterator<>(getClosableIterator(), nextRecord -> {
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, readerContext.getSchemaHandler().getRequestedSchema(), readerContext, orderingFieldName, false);
boolean isDelete = emitDelete && readerContext.isDeleteOperation(nextRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete is a DELETE, you can not emit a delete when the emitDelete is false.

@@ -128,6 +130,7 @@ private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorag
this.props = props;
this.start = start;
this.length = length;
this.emitDelete = emitDelete;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this.

@danny0405
Copy link
Contributor

danny0405 commented Jun 19, 2025

@the-other-tim-brown I think the emitDeletes support for HoodieRecord iterator brings in too much overhead than I thought, can we drop it in this PR, the delete keys fetching should be just used in the legacy code path, now we have streaming write to MDT, and the code should be removed in the future anyway(once the streaming write is stable).

The emitDeletes is introduced mainly for streaming read scenarios with engine specific rows.

Also can we revert the changes for size estimation into a serapate PR to make the review of the current one easier.

@the-other-tim-brown
Copy link
Contributor Author

@the-other-tim-brown I think the emitDeletes support for HoodieRecord iterator brings in too much overhead than I thought, can we drop it in this PR, the delete keys fetching should be just used in the legacy code path, now we have streaming write to MDT, and the code should be removed in the future anyway(once the streaming write is stable).

The emitDeletes is introduced mainly for streaming read scenarios with engine specific rows.

Also can we revert the changes for size estimation into a serapate PR to make the review of the current one easier.

@danny0405 I don't understand what you are recommending here. Streaming write to the MDT is only used for Spark and I don't think there are plans to use it for other engines.

@the-other-tim-brown
Copy link
Contributor Author

Started a new PR with the same end result of moving off deprecated code but smaller changeset #13470

@danny0405
Copy link
Contributor

Streaming write to the MDT is only used for Spark and I don't think there are plans to use it for other engines.

That's true, for Flink and Java, there is no even a solution/plan to support the RLI there, that is why I said those codes like constructing RLI from files should be deemed as legacy.

@the-other-tim-brown the-other-tim-brown deleted the HUDI-9527 branch June 23, 2025 00:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants