From 4256ebf8ffb4cd2b6c3b24f174315196d49029dd Mon Sep 17 00:00:00 2001 From: Michael Wei Date: Wed, 6 Dec 2017 11:39:43 -0800 Subject: [PATCH] Implement scanAndVerify, relieves memory pressure on startup --- format/proto/types.proto | 17 ++++++ .../infrastructure/log/StreamLogFiles.java | 47 ++++++++++++---- .../util/MappedByteBufInputStream.java | 55 +++++++++++++++++++ .../log/StreamLogFilesTest.java | 2 +- 4 files changed, 110 insertions(+), 11 deletions(-) create mode 100644 runtime/src/main/java/org/corfudb/util/MappedByteBufInputStream.java diff --git a/format/proto/types.proto b/format/proto/types.proto index 40456cb0f1b..93820beb624 100644 --- a/format/proto/types.proto +++ b/format/proto/types.proto @@ -37,6 +37,23 @@ message LogEntry { optional int64 checkpointedStreamStartLogAddress = 15; } +message LogEntryMetadataOnly { + optional DataType data_type = 1; + // don't deserialize bytes + optional int64 global_address = 3; + repeated string streams = 6; + map logical_addresses = 7; + map backpointers = 8; + optional DataRank rank = 9; + optional CheckpointEntryType checkpointEntryType = 10; + optional int64 checkpointId_most_significant = 11; + optional int64 checkpointId_least_significant = 12; + optional int64 checkpointedStreamId_most_significant = 13; + optional int64 checkpointedStreamId_least_significant = 14; + // Tail of the stream at the time of taking the checkpoint snapshot. + optional int64 checkpointedStreamStartLogAddress = 15; +} + message LogHeader { optional int32 version = 1; optional bool verify_checksum = 2; diff --git a/infrastructure/src/main/java/org/corfudb/infrastructure/log/StreamLogFiles.java b/infrastructure/src/main/java/org/corfudb/infrastructure/log/StreamLogFiles.java index 47f672549e3..9928be10bd8 100644 --- a/infrastructure/src/main/java/org/corfudb/infrastructure/log/StreamLogFiles.java +++ b/infrastructure/src/main/java/org/corfudb/infrastructure/log/StreamLogFiles.java @@ -49,6 +49,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAccumulator; import javax.annotation.Nullable; import lombok.Data; @@ -58,6 +59,7 @@ import org.apache.commons.io.FileUtils; import org.corfudb.format.Types; import org.corfudb.format.Types.LogEntry; +import org.corfudb.format.Types.LogEntryMetadataOnly; import org.corfudb.format.Types.LogHeader; import org.corfudb.format.Types.Metadata; import org.corfudb.format.Types.TrimEntry; @@ -68,6 +70,7 @@ import org.corfudb.runtime.exceptions.DataCorruptionException; import org.corfudb.runtime.exceptions.OverwriteException; import org.corfudb.util.AutoCleanableMappedBuffer; +import org.corfudb.util.MappedByteBufInputStream; /** @@ -93,7 +96,7 @@ public class StreamLogFiles implements StreamLog, StreamLogWithRankedAddressSpac public final String logDir; private final boolean noVerify; private final ServerContext serverContext; - private final AtomicLong globalTail = new AtomicLong(0L); + private final LongAccumulator globalTail = new LongAccumulator(Long::max, -1L); private Map writeChannels; private Set channelsToSync; private MultiReadWriteLock segmentLocks = new MultiReadWriteLock(); @@ -214,7 +217,7 @@ private void syncTailSegment(long address) { // TODO(Maithem) since writing a record and setting the tail segment is not // an atomic operation, it is possible to set an incorrect tail segment. In // that case we will need to scan more than one segment - globalTail.getAndUpdate(maxTail -> address > maxTail ? address : maxTail); + globalTail.accumulate(address); long segment = address / RECORDS_PER_LOG_FILE; if (lastSegment < segment) { serverContext.setTailSegment(segment); @@ -255,14 +258,7 @@ private void initializeMaxGlobalAddress() { long addressInTailSegment = (tailSegment * RECORDS_PER_LOG_FILE) + 1; SegmentHandle sh = getSegmentHandleForAddress(addressInTailSegment); try { - Collection segmentEntries = (Collection) - getCompactedEntries(sh.getFileName(), new HashSet()).getEntries(); - - for (LogEntry entry : segmentEntries) { - long currentAddress = entry.getGlobalAddress(); - globalTail.getAndUpdate(maxTail -> currentAddress > maxTail - ? currentAddress : maxTail); - } + scanAndVerifyEntries(sh.fileName); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -514,6 +510,37 @@ private void trimLogFile(String filePath, Set pendingTrim) throws IOExcept writeChannels.remove(filePath); } + /** Scan all entries, updating the LogUnit structures and verifying the log file integrity. + * + * @param filePath The log file (segment) to scan + */ + private void scanAndVerifyEntries(String filePath) throws IOException { + FileChannel fc = getChannel(filePath, true); + try (MappedByteBufInputStream stream = new MappedByteBufInputStream(fc)) { + Metadata headerMetadata = Metadata.parseFrom(stream.limited(METADATA_SIZE)); + LogHeader header = LogHeader.parseFrom(stream.limited(headerMetadata.getLength())); + + while (stream.available() > 0) { + // Skip delimiter + stream.readShort(); + Metadata logMetadata = Metadata.parseFrom(stream.limited(METADATA_SIZE)); + if (noVerify) { + LogEntryMetadataOnly logEntry = + LogEntryMetadataOnly.parseFrom(stream.limited(logMetadata.getLength())); + globalTail.accumulate(logEntry.getGlobalAddress()); + } else { + LogEntry logEntry = LogEntry.parseFrom(stream.limited(logMetadata.getLength())); + if (logMetadata.getChecksum() != getChecksum(logEntry.toByteArray())) { + log.error("Checksum mismatch detected while trying to read address {}", + logEntry.getGlobalAddress()); + throw new DataCorruptionException(); + } + globalTail.accumulate(logEntry.getGlobalAddress()); + } + } + } + } + private CompactedEntry getCompactedEntries(String filePath, Set pendingTrim) throws IOException { diff --git a/runtime/src/main/java/org/corfudb/util/MappedByteBufInputStream.java b/runtime/src/main/java/org/corfudb/util/MappedByteBufInputStream.java new file mode 100644 index 00000000000..dedff1a82fd --- /dev/null +++ b/runtime/src/main/java/org/corfudb/util/MappedByteBufInputStream.java @@ -0,0 +1,55 @@ +package org.corfudb.util; + +import com.google.common.io.ByteStreams; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import javax.annotation.Nonnull; + +public class MappedByteBufInputStream extends + ByteBufInputStream implements AutoCloseable { + + final AutoCleanableMappedBuffer buffer; + + /** Generate a new {@link MappedByteBufInputStream} over the entire file. + * + * @param fc The FileChannel to map the stream over. + * @throws IOException If the stream cannot be mapped. + */ + public MappedByteBufInputStream(@Nonnull FileChannel fc) + throws IOException { + this(new AutoCleanableMappedBuffer(fc.map(MapMode.READ_ONLY, 0, fc.size()))); + } + + /** Given an existing {@link AutoCleanableMappedBuffer}, generate a new + * {@link MappedByteBufInputStream} over it. + * @param buf The {@link AutoCleanableMappedBuffer} to base this stream off of. + */ + private MappedByteBufInputStream(@Nonnull AutoCleanableMappedBuffer buf) { + super(Unpooled.wrappedBuffer(buf.getBuffer())); + this.buffer = buf; + } + + /** Return a limited version of this {@link MappedByteBufInputStream} which only returns + * {@code limit} bytes. + * + * @param limit The number of bytes to limit to. + * @return An {@link InputStream} which only permits reading up to the + * given number of bytes. + */ + public InputStream limited(int limit) { + return ByteStreams.limit(this, limit); + } + + /** {@inheritDoc} + * + * Also closes the underlying {@link AutoCleanableMappedBuffer}. + */ + @Override + public void close() { + buffer.close(); + } +} diff --git a/test/src/test/java/org/corfudb/infrastructure/log/StreamLogFilesTest.java b/test/src/test/java/org/corfudb/infrastructure/log/StreamLogFilesTest.java index 436aead7931..cceb01c2618 100644 --- a/test/src/test/java/org/corfudb/infrastructure/log/StreamLogFilesTest.java +++ b/test/src/test/java/org/corfudb/infrastructure/log/StreamLogFilesTest.java @@ -520,7 +520,7 @@ public void testWritingFileHeader() throws Exception { public void testGetGlobalTail() { StreamLogFiles log = new StreamLogFiles(getContext(), false); - assertThat(log.getGlobalTail()).isEqualTo(0); + assertThat(log.getGlobalTail()).isEqualTo(-1L); // Write to multiple segments final int segments = 3;