From fb529dbf8af00b69cbaeebf4ec5077b6f4b4e481 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Thu, 25 Mar 2021 19:30:09 +0800 Subject: [PATCH] [HUDI-1720] when query incr view of mor table which has many delete records use sparksql/hive-beeline, StackOverflowError --- .../RealtimeCompactedRecordReader.java | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index a98a23010297..1ae25f80ed7d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -77,6 +77,14 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept .build(); } + private Option buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException { + if (usesCustomPayload) { + return record.getData().getInsertValue(getWriterSchema()); + } else { + return record.getData().getInsertValue(getReaderSchema()); + } + } + @Override public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException { // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable @@ -95,15 +103,24 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the // deltaRecord may not be a full record and needs values of columns from the parquet Option rec; - if (usesCustomPayload) { - rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema()); - } else { - rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema()); + rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); + // If the record is not present, this is a delete record using an empty payload so skip this base record + // and move to the next record + while (!rec.isPresent()) { + // if current parquet reader has no record, return false + if (!this.parquetReader.next(aVoid, arrayWritable)) { + return false; + } + String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); + if (deltaRecordMap.containsKey(tempKey)) { + rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey)); + } else { + // need to return true, since now log file does not contain tempKey, but parquet file contains tempKey + return true; + } } if (!rec.isPresent()) { - // If the record is not present, this is a delete record using an empty payload so skip this base record - // and move to the next record - return next(aVoid, arrayWritable); + return false; } GenericRecord recordToReturn = rec.get(); if (usesCustomPayload) {