-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9527] Switch to HoodieFileGroupReader in HoodieTableMetadataUtil #13445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a182a89
ba9743b
7374ec8
9c83e5d
d3b4627
c60263b
6b9ba3d
a578c59
e9a1823
ebe6f60
8ec2e07
e75a1af
0494293
fd1fd27
1aa2d2f
dab3542
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.hudi.avro; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.IndexedRecord; | ||
|
||
public class IndexedDeleteRecord implements IndexedRecord { | ||
private final String[] metaFields; | ||
private final IndexedRecord record; | ||
|
||
public IndexedDeleteRecord(String recordKey, String partitionPath, IndexedRecord record) { | ||
this.metaFields = new String[] { | ||
null, | ||
null, | ||
recordKey, | ||
partitionPath, | ||
null | ||
}; | ||
this.record = record; | ||
} | ||
|
||
@Override | ||
public void put(int i, Object v) { | ||
throw new UnsupportedOperationException("IndexedDeleteRecord does not support put operation"); | ||
} | ||
|
||
@Override | ||
public Object get(int i) { | ||
if (i < metaFields.length) { | ||
return metaFields[i]; | ||
} | ||
return record == null ? null : record.get(i); | ||
} | ||
|
||
@Override | ||
public Schema getSchema() { | ||
return record == null ? Schema.create(Schema.Type.NULL) : record.getSchema(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,10 +24,10 @@ | |
import org.apache.hudi.common.model.HoodieRecord; | ||
import org.apache.hudi.common.model.HoodieRecordMerger; | ||
import org.apache.hudi.common.serialization.CustomSerializer; | ||
import org.apache.hudi.common.serialization.DefaultSerializer; | ||
import org.apache.hudi.common.table.HoodieTableConfig; | ||
import org.apache.hudi.common.table.log.InstantRange; | ||
import org.apache.hudi.common.table.read.BufferedRecord; | ||
import org.apache.hudi.common.table.read.BufferedRecordSerializer; | ||
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler; | ||
import org.apache.hudi.common.util.HoodieRecordSizeEstimator; | ||
import org.apache.hudi.common.util.LocalAvroSchemaCache; | ||
|
@@ -192,7 +192,7 @@ public SizeEstimator<BufferedRecord<T>> getRecordSizeEstimator() { | |
} | ||
|
||
public CustomSerializer<BufferedRecord<T>> getRecordSerializer() { | ||
return new DefaultSerializer<>(); | ||
return new BufferedRecordSerializer<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this change? It seems not necessary based on the benchmark: #13408 (comment) |
||
} | ||
|
||
/** | ||
|
@@ -491,4 +491,12 @@ public Integer encodeAvroSchema(Schema schema) { | |
protected Schema decodeAvroSchema(Object versionId) { | ||
return this.localAvroSchemaCache.getSchema((Integer) versionId).orElse(null); | ||
} | ||
|
||
/** | ||
* Checks if the record is a delete operation. | ||
* | ||
* @param record The record to check. | ||
* @return true if the record is a delete operation, false otherwise. | ||
*/ | ||
public abstract boolean isDeleteOperation(T record); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import org.apache.hudi.common.model.HoodieRecord; | ||
import org.apache.hudi.common.util.Option; | ||
import org.apache.hudi.exception.HoodieException; | ||
import org.apache.hudi.util.Lazy; | ||
|
||
import org.apache.avro.Schema; | ||
|
||
|
@@ -41,12 +42,12 @@ | |
*/ | ||
public class BufferedRecord<T> implements Serializable { | ||
private final String recordKey; | ||
private final Comparable orderingValue; | ||
private final Lazy<Comparable> orderingValue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When deriving the BufferedRecord from the record with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already cache the |
||
private T record; | ||
private final Integer schemaId; | ||
private final boolean isDelete; | ||
|
||
public BufferedRecord(String recordKey, Comparable orderingValue, T record, Integer schemaId, boolean isDelete) { | ||
public BufferedRecord(String recordKey, Lazy<Comparable> orderingValue, T record, Integer schemaId, boolean isDelete) { | ||
this.recordKey = recordKey; | ||
this.orderingValue = orderingValue; | ||
this.record = record; | ||
|
@@ -64,26 +65,26 @@ public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> record, | |
} catch (IOException e) { | ||
throw new HoodieException("Failed to get isDelete from record.", e); | ||
} | ||
return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, props), record.getData(), schemaId, isDelete); | ||
return new BufferedRecord<>(recordKey, Lazy.lazily(() -> record.getOrderingValue(schema, props)), record.getData(), schemaId, isDelete); | ||
} | ||
|
||
public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema schema, HoodieReaderContext<T> readerContext, Option<String> orderingFieldName, boolean isDelete) { | ||
String recordKey = readerContext.getRecordKey(record, schema); | ||
Integer schemaId = readerContext.encodeAvroSchema(schema); | ||
Comparable orderingValue = readerContext.getOrderingValue(record, schema, orderingFieldName); | ||
Lazy<Comparable> orderingValue = Lazy.lazily(() -> readerContext.getOrderingValue(record, schema, orderingFieldName)); | ||
return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, isDelete); | ||
} | ||
|
||
public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue) { | ||
return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue, null, null, true); | ||
public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue, Integer schemaId) { | ||
return new BufferedRecord<>(deleteRecord.getRecordKey(), Lazy.eagerly(orderingValue), null, schemaId, true); | ||
} | ||
|
||
public String getRecordKey() { | ||
return recordKey; | ||
} | ||
|
||
public Comparable getOrderingValue() { | ||
return orderingValue; | ||
return orderingValue.get(); | ||
} | ||
|
||
public T getRecord() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this? The record should already been marked as
delete
in this case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spark row doesn't have an operation type field like the flink row data does. Now this will always use an internal row to represent the row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying the records written to log data block from spark is always not a delete? This is not true as when the customized delete marker is there.
Can we return
HoodieInternalRow
withisDelete
set up in there?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we read the data from the log block, it is not going to have an operation type set. We can determine it is a delete by inspecting the record for
_hoodie_is_delete
or by a field used to mark deletes or the merger will output an empty option to signal a delete in the case of custom mergers and payloads.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns a
InternalRow
with just record key and patition path set up there will lost the other payload fields, not sure if it works for cdc read scenarios(the deletes for retraction)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the handling here to simply wrap the row if it is non-null so we can still detect it is a delete but otherwise do not modify it.