Skip to content

Commit

Permalink
- Fixing memory leak due to HoodieLogFileReader holding on to a logblock
Browse files Browse the repository at this point in the history
- Removed inMemory HashMap usage in merge(..) code in LogScanner
  • Loading branch information
n3nash authored and vinothchandar committed Mar 16, 2018
1 parent d3df32f commit 123da02
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,12 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) {

/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge
* with the application specific payload if the same key was found before Sufficient to just merge
* the log records since the base data is merged on previous compaction
* with the application specific payload if the same key was found before. Sufficient to just merge
* the log records since the base data is merged on previous compaction.
* Finally, merge this log block with the accumulated records
*/
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFromBlock(
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> merge(
HoodieAvroDataBlock dataBlock) throws IOException {
// TODO (NA) - Instead of creating a new HashMap use the spillable map
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps
.newHashMap();
// TODO (NA) - Implemnt getRecordItr() in HoodieAvroDataBlock and use that here
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
Expand All @@ -256,19 +254,19 @@ private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFrom
.toString();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
if (recordsFromLastBlock.containsKey(key)) {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData()
HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
recordsFromLastBlock
records
.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
recordsFromLastBlock.put(key, hoodieRecord);
records.put(key, hoodieRecord);
}
});
return recordsFromLastBlock;
return records;
}

/**
Expand All @@ -277,11 +275,12 @@ private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFrom
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) throws IOException {
while (!lastBlocks.isEmpty()) {
log.info("Number of remaining logblocks to merge " + lastBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock));
merge((HoodieAvroDataBlock) lastBlock);
break;
case DELETE_BLOCK:
// TODO : If delete is the only block written and/or records are present in parquet file
Expand All @@ -295,25 +294,6 @@ private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> reco
}
}

/**
* Merge the records read from a single data block with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, hoodieRecord);
}
});
}

@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return records.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
private static final byte[] oldMagicBuffer = new byte[4];
private static final byte[] magicBuffer = new byte[6];
private final Schema readerSchema;
private HoodieLogBlock nextBlock = null;
private LogFormatVersion nextBlockVersion;
private boolean readBlockLazily;
private long reverseLogFilePosition;
Expand Down Expand Up @@ -271,8 +270,8 @@ public boolean hasNext() {
if (isEOF) {
return false;
}
this.nextBlock = readBlock();
return nextBlock != null;
// If not hasNext(), we either we reach EOF or throw an exception on invalid magic header
return true;
} catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e);
}
Expand Down Expand Up @@ -322,11 +321,12 @@ private boolean readMagic() throws IOException {

@Override
public HoodieLogBlock next() {
if (nextBlock == null) {
// may be hasNext is not called
hasNext();
try {
// hasNext() must be called before next()
return readBlock();
} catch(IOException io) {
throw new HoodieIOException("IOException when reading logblock from log file " + logFile, io);
}
return nextBlock;
}

/**
Expand Down Expand Up @@ -378,7 +378,7 @@ public HoodieLogBlock prev() throws IOException {
boolean hasNext = hasNext();
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return this.nextBlock;
return next();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import java.io.IOException;
import java.util.List;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieLogFormatReader implements HoodieLogFormat.Reader {

Expand All @@ -34,6 +36,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final boolean readBlocksLazily;
private final boolean reverseLogReader;

private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class);

HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException {
this.logFiles = logFiles;
Expand Down Expand Up @@ -77,15 +81,15 @@ else if (logFiles.size() > 0) {
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
log.info("Moving to the next reader for logfile " + currentReader.getLogFile());
return this.currentReader.hasNext();
}
return false;
}

@Override
public HoodieLogBlock next() {
HoodieLogBlock block = currentReader.next();
return block;
return currentReader.next();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,15 @@ public void testBasicAppendAndRead()
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1,
dataBlockRead.getRecords());

reader.hasNext();
nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
copyOfRecords2.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2,
dataBlockRead.getRecords());

reader.hasNext();
nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
Expand Down

0 comments on commit 123da02

Please sign in to comment.