Skip to content

Commit

Permalink
[HUDI-1829] Use while loop instead of recursive call in MergeOnReadIn…
Browse files Browse the repository at this point in the history
…putFormat to avoid StackOverflow (#2862)

Recursive all is risky for StackOverflow when there are too many.
  • Loading branch information
danny0405 committed Apr 23, 2021
1 parent a1e636d commit a5789c4
Showing 1 changed file with 9 additions and 10 deletions.
Expand Up @@ -306,16 +306,17 @@ private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {

@Override
public boolean hasNext() {
if (logRecordsKeyIterator.hasNext()) {
String curAvrokey = logRecordsKeyIterator.next();
while (logRecordsKeyIterator.hasNext()) {
String curAvroKey = logRecordsKeyIterator.next();
Option<IndexedRecord> curAvroRecord = null;
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvrokey);
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
} catch (IOException e) {
throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e);
throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
}
if (!curAvroRecord.isPresent()) {
// delete record found
if (emitDelete && !pkSemanticLost) {
GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());

Expand All @@ -329,10 +330,9 @@ public boolean hasNext() {

this.currentRecord = delete;
return true;
} else {
// delete record found, skipping
return hasNext();
}
// skipping if the condition is unsatisfied
// continue;
} else {
// should improve the code when log scanner supports
// seeking by log blocks with commit time which is more
Expand All @@ -342,7 +342,7 @@ public boolean hasNext() {
String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
if (!split.getInstantRange().get().isInRange(commitTime)) {
// filter out the records that are not in range
return hasNext();
continue;
}
}
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
Expand All @@ -353,9 +353,8 @@ public boolean hasNext() {
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
return true;
}
} else {
return false;
}
return false;
}

@Override
Expand Down

0 comments on commit a5789c4

Please sign in to comment.