Skip to content

Commit

Permalink
getCompactedEntries should also use a memory mapped file
Browse files Browse the repository at this point in the history
  • Loading branch information
no2chem committed Dec 6, 2017
1 parent 7bd4da3 commit 8399417
Showing 1 changed file with 26 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,60 +518,41 @@ private CompactedEntry getCompactedEntries(String filePath,
Set<Long> pendingTrim) throws IOException {

FileChannel fc = getChannel(filePath, true);
Map<Long, LogEntry> compactedEntryMap = new HashMap<>();

// Skip the header
ByteBuffer headerMetadataBuf = ByteBuffer.allocate(METADATA_SIZE);
fc.read(headerMetadataBuf);
headerMetadataBuf.flip();

Metadata headerMetadata = Metadata.parseFrom(headerMetadataBuf.array());
ByteBuffer headerBuf = ByteBuffer.allocate(headerMetadata.getLength());
fc.read(headerBuf);
headerBuf.flip();

ByteBuffer o = ByteBuffer.allocate((int) fc.size() - (int) fc.position());
fc.read(o);
fc.close();
o.flip();

LinkedHashMap<Long, LogEntry> compacted = new LinkedHashMap<>();

while (o.hasRemaining()) {

//Skip delimiter
o.getShort();

byte[] metadataBuf = new byte[METADATA_SIZE];
o.get(metadataBuf);

try {
Metadata metadata = Metadata.parseFrom(metadataBuf);

byte[] logEntryBuf = new byte[metadata.getLength()];

o.get(logEntryBuf);

LogEntry entry = LogEntry.parseFrom(logEntryBuf);
try (AutoCleanableMappedBuffer logBuf = new AutoCleanableMappedBuffer(
fc.map(MapMode.READ_ONLY, 0, fc.size()))) {
try (ByteBufInputStream bbis = new ByteBufInputStream(
Unpooled.wrappedBuffer(logBuf.getBuffer()))) {
Metadata headerMetadata = Metadata
.parseFrom(ByteStreams.limit(bbis, METADATA_SIZE));
LogHeader header = LogHeader.parseFrom(ByteStreams.limit(bbis,
headerMetadata.getLength()));

if (!noVerify) {
if (metadata.getChecksum() != getChecksum(entry.toByteArray())) {
log.error("Checksum mismatch detected while trying to read address {}",
entry.getGlobalAddress());
throw new DataCorruptionException();
while (bbis.available() > 0) {
// Skip delimiter
bbis.readShort();
Metadata logMetadata = Metadata
.parseFrom(ByteStreams.limit(bbis, METADATA_SIZE));
LogEntry logEntry = LogEntry.parseFrom(ByteStreams.limit(bbis,
logMetadata.getLength()));
if (!noVerify) {
if (logMetadata.getChecksum() != getChecksum(logEntry.toByteArray())) {
log.error("Checksum mismatch detected while trying to read address {}",
logEntry.getGlobalAddress());
throw new DataCorruptionException();
}
}
}

if (!pendingTrim.contains(entry.getGlobalAddress())) {
compacted.put(entry.getGlobalAddress(), entry);
if (!pendingTrim.contains(logEntry.getGlobalAddress())) {
compactedEntryMap.put(logEntry.getGlobalAddress(), logEntry);
}
}

} catch (InvalidProtocolBufferException e) {
throw new DataCorruptionException();
return new CompactedEntry(header, compactedEntryMap.values());
}
}

LogHeader header = LogHeader.parseFrom(headerBuf.array());
return new CompactedEntry(header, compacted.values());
}

private LogData getLogData(LogEntry entry) {
Expand Down

0 comments on commit 8399417

Please sign in to comment.