Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1720] when query incr view of mor table which has many delete records use sparksql/hive-beeline, StackOverflowError #2721

Merged
merged 1 commit into from Apr 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -77,6 +77,14 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept
.build();
}

private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {
if (usesCustomPayload) {
return record.getData().getInsertValue(getWriterSchema());
} else {
return record.getData().getInsertValue(getReaderSchema());
}
}

@Override
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
Expand All @@ -95,15 +103,24 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns from the parquet
Option<GenericRecord> rec;
if (usesCustomPayload) {
rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
} else {
rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
// If the record is not present, this is a delete record using an empty payload so skip this base record
// and move to the next record
while (!rec.isPresent()) {
// if current parquet reader has no record, return false
if (!this.parquetReader.next(aVoid, arrayWritable)) {
garyli1019 marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(tempKey)) {
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
} else {
// need to return true, since now log file does not contain tempKey, but parquet file contains tempKey
return true;
}
}
if (!rec.isPresent()) {
// If the record is not present, this is a delete record using an empty payload so skip this base record
// and move to the next record
return next(aVoid, arrayWritable);
return false;
}
GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) {
Expand Down