-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9527] Switch to HoodieFileGroupReader in HoodieTableMetadataUtil #13445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
HoodieTableMetaClient datasetMetaClient, | ||
Option<Schema> writerSchemaOpt, | ||
String latestCommitTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect indention.
@@ -171,9 +183,6 @@ public Comparable convertValueToEngineType(Comparable value) { | |||
|
|||
@Override | |||
public InternalRow getDeleteRow(InternalRow record, String recordKey) { | |||
if (record != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this? The record should already been marked as delete
in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spark row doesn't have an operation type field like the flink row data does. Now this will always use an internal row to represent the row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying the records written to log data block from spark is always not a delete? This is not true as when the customized delete marker is there.
Can we return HoodieInternalRow
with isDelete
set up in there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we read the data from the log block, it is not going to have an operation type set. We can determine it is a delete by inspecting the record for _hoodie_is_delete
or by a field used to mark deletes or the merger will output an empty option to signal a delete in the case of custom mergers and payloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns a InternalRow
with just record key and patition path set up there will lost the other payload fields, not sure if it works for cdc read scenarios(the deletes for retraction)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the handling here to simply wrap the row if it is non-null so we can still detect it is a delete but otherwise do not modify it.
public HoodieRecord<InternalRow> constructHoodieRecord(InternalRow row, Schema schema, Option<String> orderingFieldName) { | ||
if (row instanceof HoodieInternalRow && ((HoodieInternalRow) row).isDeleteOperation()) { | ||
return new HoodieEmptyRecord<>( | ||
new HoodieKey(row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD).toString(), partitionPath), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the row
already been marked as delete
, we can just construct HoodieSparkRecord
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HoodieSparkRecord
will be marked as a delete if the row is null
. Using an empty record is the common pattern adopted across the engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@the-other-tim-brown, @danny0405 , should we fix HoodieDeleteHelper
as well, since for Avro, we use HoodieAvroRecord
with EmptyHoodieRecordPayload
; what about Flink, Hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linliu-code what is the impact of using EmptyHoodieRecordPayload
vs HoodieEmptyRecord
?
@@ -120,7 +118,7 @@ public GenericRecord convertToAvroRecord(IndexedRecord record, Schema schema) { | |||
|
|||
@Override | |||
public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) { | |||
throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName()); | |||
return new DeleteRecord(recordKey, record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API is designed to return a record with the record keys set up ready in the record fields instead of a wrapper.
Things like DeleteRecord.getRecordKey
is not a general API for readers..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this to match the pattern with HoodieInternalRow
* @param orderingFieldName The name of the ordering field, if any. | ||
* @return A new instance of {@link HoodieRecord}. | ||
*/ | ||
public abstract HoodieRecord<T> constructHoodieRecord(T record, Schema schema, Option<String> orderingFieldName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this we will need a way to determine if the record is a delete record even when the data may not be set. If there is a way to do this across all engine types (not just Flink) then we can use that. Without this you will get runtime errors when trying to read as a HoodieRecord.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a way to do this across all engine types (not just Flink) then we can use that.
Let's figure out a solution for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the solution is already proposed here and it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only other option I see is to make the HoodieReaderContext have a method for isDeleteOperation(T record)
and then we can use that to set the value for isDelete
in this line, but that is just a roundabout way to get back to the same result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's give it a try. In the end, I think we should introduce our engine agnostice data structures there to unify all the discrepencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to have the FGReader return BufferedRecords directly which will already have this context for whether it represents a delete for that row. There are some other changes that may need to come with that like making the ordering value lookup lazy to avoid extra overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky part here is only when there are log files or merging we could have a BufferedRecord
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed an update to add a check for whether the record is a delete operation in the reader context
this.isDeleteOperation = false; | ||
} | ||
|
||
private HoodieInternalRow(UTF8String recordKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is anyway private, can you take in isDeleteOperation as an argument. and so we can set it to tru in L 114. Just that looking at constructor arguments, its not very apparent that this is for delete operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense. I've cleaned up the constructors to all route through a common all-args constructor.
private final String[] metaFields; | ||
private final IndexedRecord record; | ||
|
||
public DeleteIndexedRecord(String recordKey, String partitionPath, IndexedRecord record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IndexedDeleteRecord
@@ -58,6 +58,8 @@ public class HoodieInternalRow extends InternalRow { | |||
*/ | |||
private final UTF8String[] metaFields; | |||
private final InternalRow sourceRow; | |||
// indicates whether this row represents a delete operation. Used in the CDC read. | |||
private final boolean isDeleteOperation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just isDeleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to better align with the operation type used in Flink
@@ -107,7 +107,7 @@ public int hashCode() { | |||
|
|||
@Override | |||
public String toString() { | |||
return "DeleteRecord {" | |||
return "DeleteIndexedRecord {" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be reverted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was a bad find and replace on my part
HoodieReaderContext<T> readerContext) { | ||
if (writerSchemaOpt.isPresent() && !logFilePaths.isEmpty()) { | ||
List<HoodieLogFile> logFiles = logFilePaths.stream().map(HoodieLogFile::new).collect(Collectors.toList()); | ||
FileSlice fileSlice = new FileSlice(partitionPath, logFiles.get(0).getFileId(), logFiles.get(0).getDeltaCommitTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delta commit time of first log file may not be the file slice's base instant time right. but I see here we are setting it so.
Can you confirm this does not have any other side effects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we don't rely on the instant time anywhere in the FileGroupReader but we can change this is there is a way to set it properly
.withFileSlice(fileSlice) | ||
.withDataSchema(tableSchema) | ||
.withRequestedSchema(HoodieAvroUtils.getRecordKeySchema()) | ||
.withLatestCommitTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse("")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we compute the latestInstant in driver and let the executor access it directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, updated
@@ -850,6 +846,7 @@ public static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(Hoodi | |||
StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); | |||
Option<Schema> writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); | |||
Option<Schema> finalWriterSchemaOpt = writerSchemaOpt; | |||
ReaderContextFactory<T> readerContextFactory = engineContext.getReaderContextFactory(dataTableMetaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, should we consider initializing InstantRange
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it set in the current code, what would be the side-effect of not adding it?
if (operation == HoodieOperation.DELETE) { | ||
return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, getOrderingValue(rowData, schema, orderingFieldName), HoodieRecord.HoodieRecordType.FLINK); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably can use HoodieDeleteHelper.createDeleteRecord
function.
@@ -384,10 +384,8 @@ public ClosableIterator<T> getClosableIterator() throws IOException { | |||
* @return An iterator over the records that wraps the engine-specific record in a HoodieRecord. | |||
*/ | |||
public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws IOException { | |||
return new CloseableMappingIterator<>(getClosableIterator(), nextRecord -> { | |||
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, readerContext.getSchemaHandler().getRequestedSchema(), readerContext, orderingFieldName, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fix the isDelete
flag here, the new introduced #constructHoodieRecord
can be eliminated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, just needs some way to set this properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I recommended to address this: https://github.com/apache/hudi/pull/13445/files/11a4df01b69b973f5fce11fbb574b1edba4ca19c#r2151183064
bb84243
to
57e41ca
Compare
… existing class, add meta fields to record
… Update handling of delete row for spark
…y, update serializers
19a487d
to
e75a1af
Compare
@@ -41,12 +42,12 @@ | |||
*/ | |||
public class BufferedRecord<T> implements Serializable { | |||
private final String recordKey; | |||
private final Comparable orderingValue; | |||
private final Lazy<Comparable> orderingValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When deriving the BufferedRecord from the record with emitDeletes
we can run into issues trying to extract the ordering value since the record is null. This allows us to only perform that evaluation if required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already cache the orderingVal
in HoodieRecord
, the Lazy
wrapper will increase the bytes to serialize for the spillableMap too.
byte[] avroBytes = recordSerializer.serialize(record.getRecord()); | ||
output.writeInt(avroBytes.length); | ||
output.writeBytes(avroBytes); | ||
byte[] recordBytes = record.getRecord() == null ? new byte[0] : recordSerializer.serialize(record.getRecord()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this would fail for deletes where the record is null
@@ -192,7 +192,7 @@ public SizeEstimator<BufferedRecord<T>> getRecordSizeEstimator() { | |||
} | |||
|
|||
public CustomSerializer<BufferedRecord<T>> getRecordSerializer() { | |||
return new DefaultSerializer<>(); | |||
return new BufferedRecordSerializer<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change? It seems not necessary based on the benchmark: #13408 (comment)
private final ByteArrayOutputStream baos; | ||
private final RecordSerializer<T> recordSerializer; | ||
// Caching kryo serializer to avoid creating kryo instance for every serde operation | ||
private static final ThreadLocal<InternalSerializerInstance> SERIALIZER_REF = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kryo creation happens once for each file group reader so the cost should be low?
@@ -389,7 +392,8 @@ public ClosableIterator<T> getClosableIterator() throws IOException { | |||
*/ | |||
public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws IOException { | |||
return new CloseableMappingIterator<>(getClosableIterator(), nextRecord -> { | |||
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, readerContext.getSchemaHandler().getRequestedSchema(), readerContext, orderingFieldName, false); | |||
boolean isDelete = emitDelete && readerContext.isDeleteOperation(nextRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete is a DELETE
, you can not emit a delete when the emitDelete
is false.
@@ -128,6 +130,7 @@ private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorag | |||
this.props = props; | |||
this.start = start; | |||
this.length = length; | |||
this.emitDelete = emitDelete; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this.
@the-other-tim-brown I think the The Also can we revert the changes for size estimation into a serapate PR to make the review of the current one easier. |
@danny0405 I don't understand what you are recommending here. Streaming write to the MDT is only used for Spark and I don't think there are plans to use it for other engines. |
Started a new PR with the same end result of moving off deprecated code but smaller changeset #13470 |
That's true, for Flink and Java, there is no even a solution/plan to support the RLI there, that is why I said those codes like constructing RLI from files should be deemed as legacy. |
Change Logs
HoodieMergedLogRecordScanner
inHoodieTableMetadataUtil
and replace it with theHoodieFileGroupReader
HoodieRecord
Impact
Risk level (write none, low medium or high below)
Low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist