Skip to content

Commit

Permalink
Implement scanAndVerify, relieves memory pressure on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
no2chem committed Dec 6, 2017
1 parent 8399417 commit 4256ebf
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 11 deletions.
17 changes: 17 additions & 0 deletions format/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ message LogEntry {
optional int64 checkpointedStreamStartLogAddress = 15;
}

message LogEntryMetadataOnly {
optional DataType data_type = 1;
// don't deserialize bytes
optional int64 global_address = 3;
repeated string streams = 6;
map<string, int64> logical_addresses = 7;
map<string, int64> backpointers = 8;
optional DataRank rank = 9;
optional CheckpointEntryType checkpointEntryType = 10;
optional int64 checkpointId_most_significant = 11;
optional int64 checkpointId_least_significant = 12;
optional int64 checkpointedStreamId_most_significant = 13;
optional int64 checkpointedStreamId_least_significant = 14;
// Tail of the stream at the time of taking the checkpoint snapshot.
optional int64 checkpointedStreamStartLogAddress = 15;
}

message LogHeader {
optional int32 version = 1;
optional bool verify_checksum = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;

import lombok.Data;
Expand All @@ -58,6 +59,7 @@
import org.apache.commons.io.FileUtils;
import org.corfudb.format.Types;
import org.corfudb.format.Types.LogEntry;
import org.corfudb.format.Types.LogEntryMetadataOnly;
import org.corfudb.format.Types.LogHeader;
import org.corfudb.format.Types.Metadata;
import org.corfudb.format.Types.TrimEntry;
Expand All @@ -68,6 +70,7 @@
import org.corfudb.runtime.exceptions.DataCorruptionException;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.util.AutoCleanableMappedBuffer;
import org.corfudb.util.MappedByteBufInputStream;


/**
Expand All @@ -93,7 +96,7 @@ public class StreamLogFiles implements StreamLog, StreamLogWithRankedAddressSpac
public final String logDir;
private final boolean noVerify;
private final ServerContext serverContext;
private final AtomicLong globalTail = new AtomicLong(0L);
private final LongAccumulator globalTail = new LongAccumulator(Long::max, -1L);
private Map<String, SegmentHandle> writeChannels;
private Set<FileChannel> channelsToSync;
private MultiReadWriteLock segmentLocks = new MultiReadWriteLock();
Expand Down Expand Up @@ -214,7 +217,7 @@ private void syncTailSegment(long address) {
// TODO(Maithem) since writing a record and setting the tail segment is not
// an atomic operation, it is possible to set an incorrect tail segment. In
// that case we will need to scan more than one segment
globalTail.getAndUpdate(maxTail -> address > maxTail ? address : maxTail);
globalTail.accumulate(address);
long segment = address / RECORDS_PER_LOG_FILE;
if (lastSegment < segment) {
serverContext.setTailSegment(segment);
Expand Down Expand Up @@ -255,14 +258,7 @@ private void initializeMaxGlobalAddress() {
long addressInTailSegment = (tailSegment * RECORDS_PER_LOG_FILE) + 1;
SegmentHandle sh = getSegmentHandleForAddress(addressInTailSegment);
try {
Collection<LogEntry> segmentEntries = (Collection<LogEntry>)
getCompactedEntries(sh.getFileName(), new HashSet()).getEntries();

for (LogEntry entry : segmentEntries) {
long currentAddress = entry.getGlobalAddress();
globalTail.getAndUpdate(maxTail -> currentAddress > maxTail
? currentAddress : maxTail);
}
scanAndVerifyEntries(sh.fileName);
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
Expand Down Expand Up @@ -514,6 +510,37 @@ private void trimLogFile(String filePath, Set<Long> pendingTrim) throws IOExcept
writeChannels.remove(filePath);
}

/** Scan all entries, updating the LogUnit structures and verifying the log file integrity.
*
* @param filePath The log file (segment) to scan
*/
private void scanAndVerifyEntries(String filePath) throws IOException {
FileChannel fc = getChannel(filePath, true);
try (MappedByteBufInputStream stream = new MappedByteBufInputStream(fc)) {
Metadata headerMetadata = Metadata.parseFrom(stream.limited(METADATA_SIZE));
LogHeader header = LogHeader.parseFrom(stream.limited(headerMetadata.getLength()));

while (stream.available() > 0) {
// Skip delimiter
stream.readShort();
Metadata logMetadata = Metadata.parseFrom(stream.limited(METADATA_SIZE));
if (noVerify) {
LogEntryMetadataOnly logEntry =
LogEntryMetadataOnly.parseFrom(stream.limited(logMetadata.getLength()));
globalTail.accumulate(logEntry.getGlobalAddress());
} else {
LogEntry logEntry = LogEntry.parseFrom(stream.limited(logMetadata.getLength()));
if (logMetadata.getChecksum() != getChecksum(logEntry.toByteArray())) {
log.error("Checksum mismatch detected while trying to read address {}",
logEntry.getGlobalAddress());
throw new DataCorruptionException();
}
globalTail.accumulate(logEntry.getGlobalAddress());
}
}
}
}

private CompactedEntry getCompactedEntries(String filePath,
Set<Long> pendingTrim) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.corfudb.util;

import com.google.common.io.ByteStreams;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import javax.annotation.Nonnull;

public class MappedByteBufInputStream extends
ByteBufInputStream implements AutoCloseable {

final AutoCleanableMappedBuffer buffer;

/** Generate a new {@link MappedByteBufInputStream} over the entire file.
*
* @param fc The FileChannel to map the stream over.
* @throws IOException If the stream cannot be mapped.
*/
public MappedByteBufInputStream(@Nonnull FileChannel fc)
throws IOException {
this(new AutoCleanableMappedBuffer(fc.map(MapMode.READ_ONLY, 0, fc.size())));
}

/** Given an existing {@link AutoCleanableMappedBuffer}, generate a new
* {@link MappedByteBufInputStream} over it.
* @param buf The {@link AutoCleanableMappedBuffer} to base this stream off of.
*/
private MappedByteBufInputStream(@Nonnull AutoCleanableMappedBuffer buf) {
super(Unpooled.wrappedBuffer(buf.getBuffer()));
this.buffer = buf;
}

/** Return a limited version of this {@link MappedByteBufInputStream} which only returns
* {@code limit} bytes.
*
* @param limit The number of bytes to limit to.
* @return An {@link InputStream} which only permits reading up to the
* given number of bytes.
*/
public InputStream limited(int limit) {
return ByteStreams.limit(this, limit);
}

/** {@inheritDoc}
*
* Also closes the underlying {@link AutoCleanableMappedBuffer}.
*/
@Override
public void close() {
buffer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public void testWritingFileHeader() throws Exception {
public void testGetGlobalTail() {
StreamLogFiles log = new StreamLogFiles(getContext(), false);

assertThat(log.getGlobalTail()).isEqualTo(0);
assertThat(log.getGlobalTail()).isEqualTo(-1L);

// Write to multiple segments
final int segments = 3;
Expand Down

0 comments on commit 4256ebf

Please sign in to comment.