Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing memory leak due to HoodieLogFileReader holding on to a logblock #346

Merged
merged 1 commit into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,12 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) {

/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge
* with the application specific payload if the same key was found before Sufficient to just merge
* the log records since the base data is merged on previous compaction
* with the application specific payload if the same key was found before. Sufficient to just merge
* the log records since the base data is merged on previous compaction.
* Finally, merge this log block with the accumulated records
*/
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFromBlock(
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> merge(
HoodieAvroDataBlock dataBlock) throws IOException {
// TODO (NA) - Instead of creating a new HashMap use the spillable map
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This intermediate map was causing the issue? I guess we don't need this anymore given we are lazy reading the metadata and the content is only read when we are sure we want to merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues. This one is just extra overhead which we don't need and cannot afford. So yes, we merge only when we are sure so this is not needed.

.newHashMap();
// TODO (NA) - Implemnt getRecordItr() in HoodieAvroDataBlock and use that here
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
Expand All @@ -256,19 +254,19 @@ private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFrom
.toString();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
if (recordsFromLastBlock.containsKey(key)) {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData()
HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
recordsFromLastBlock
records
.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
recordsFromLastBlock.put(key, hoodieRecord);
records.put(key, hoodieRecord);
}
});
return recordsFromLastBlock;
return records;
}

/**
Expand All @@ -277,11 +275,12 @@ private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFrom
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) throws IOException {
while (!lastBlocks.isEmpty()) {
log.info("Number of remaining logblocks to merge " + lastBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock));
merge((HoodieAvroDataBlock) lastBlock);
break;
case DELETE_BLOCK:
// TODO : If delete is the only block written and/or records are present in parquet file
Expand All @@ -295,25 +294,6 @@ private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> reco
}
}

/**
* Merge the records read from a single data block with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, hoodieRecord);
}
});
}

@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return records.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
private static final byte[] oldMagicBuffer = new byte[4];
private static final byte[] magicBuffer = new byte[6];
private final Schema readerSchema;
private HoodieLogBlock nextBlock = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this cause additional memory overhead?

Copy link
Contributor Author

@n3nash n3nash Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this issue. The following is my analysis. Essentially, we pass an inputstream from HoodieLogFileReader to the LogBlocks. An inputstream can be shared in more than 1 logblock. A list of logBlocks are kept until they are merged. So until all logblocks holding the inputstream generated in HoodieLogFileReader are GC'd, they will continue to hold memory. So the last block read is kept in this variable nextBlock and so till the HoodieLogFileReader instance in GC'd this will cause memory overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the screenshot in the ticket, most retained memory comes from private List<IndexedRecord> records; in the AvroDataBlock and not the inputStream itself? The inputstream being held does not seem to have causing these issues..
This might be just a large data block being loaded into memory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think we will hold onto the last block in each file and that's what you are seeing..

private LogFormatVersion nextBlockVersion;
private boolean readBlockLazily;
private long reverseLogFilePosition;
Expand Down Expand Up @@ -271,8 +270,8 @@ public boolean hasNext() {
if (isEOF) {
return false;
}
this.nextBlock = readBlock();
return nextBlock != null;
// If not hasNext(), we either we reach EOF or throw an exception on invalid magic header
return true;
} catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e);
}
Expand Down Expand Up @@ -322,11 +321,12 @@ private boolean readMagic() throws IOException {

@Override
public HoodieLogBlock next() {
if (nextBlock == null) {
// may be hasNext is not called
hasNext();
try {
// hasNext() must be called before next()
return readBlock();
} catch(IOException io) {
throw new HoodieIOException("IOException when reading logblock from log file " + logFile, io);
}
return nextBlock;
}

/**
Expand Down Expand Up @@ -378,7 +378,7 @@ public HoodieLogBlock prev() throws IOException {
boolean hasNext = hasNext();
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return this.nextBlock;
return next();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import java.io.IOException;
import java.util.List;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieLogFormatReader implements HoodieLogFormat.Reader {

Expand All @@ -34,6 +36,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final boolean readBlocksLazily;
private final boolean reverseLogReader;

private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class);

HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException {
this.logFiles = logFiles;
Expand Down Expand Up @@ -77,15 +81,15 @@ else if (logFiles.size() > 0) {
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
log.info("Moving to the next reader for logfile " + currentReader.getLogFile());
return this.currentReader.hasNext();
}
return false;
}

@Override
public HoodieLogBlock next() {
HoodieLogBlock block = currentReader.next();
return block;
return currentReader.next();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,15 @@ public void testBasicAppendAndRead()
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1,
dataBlockRead.getRecords());

reader.hasNext();
nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
copyOfRecords2.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2,
dataBlockRead.getRecords());

reader.hasNext();
nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
Expand Down