Skip to content

Commit

Permalink
Memory mapped read/address-space implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
no2chem committed Dec 5, 2017
1 parent cbb2c2a commit 7bd4da3
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package org.corfudb.infrastructure.log;

import com.esotericsoftware.kryo.io.ByteBufferInputStream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteStreams;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistryLite;
import com.google.protobuf.InvalidProtocolBufferException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;

import java.io.File;
Expand All @@ -16,10 +20,13 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -60,6 +67,7 @@
import org.corfudb.protocols.wireprotocol.LogData;
import org.corfudb.runtime.exceptions.DataCorruptionException;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.util.AutoCleanableMappedBuffer;


/**
Expand Down Expand Up @@ -611,7 +619,6 @@ private LogData getLogData(LogEntry entry) {
private void readAddressSpace(SegmentHandle sh) throws IOException {
long logFileSize;


try (MultiReadWriteLock.AutoCloseableLock ignored =
segmentLocks.acquireReadLock(sh.getSegment())) {
logFileSize = sh.logChannel.size();
Expand All @@ -624,61 +631,46 @@ private void readAddressSpace(SegmentHandle sh) throws IOException {
return;
}

// Skip the header
ByteBuffer headerMetadataBuf = ByteBuffer.allocate(METADATA_SIZE);
fc.read(headerMetadataBuf);
headerMetadataBuf.flip();

Metadata headerMetadata = Metadata.parseFrom(headerMetadataBuf.array());

fc.position(fc.position() + headerMetadata.getLength());
long channelOffset = fc.position();
ByteBuffer o = ByteBuffer.allocate((int) logFileSize - (int) fc.position());
fc.read(o);
fc.close();
o.flip();

while (o.hasRemaining()) {

short magic = o.getShort();
channelOffset += Short.BYTES;

if (magic != RECORD_DELIMITER) {
log.error("Expected a delimiter but found something else while "
+ "trying to read file {}", sh.fileName);
throw new DataCorruptionException();
}

byte[] metadataBuf = new byte[METADATA_SIZE];
o.get(metadataBuf);
channelOffset += METADATA_SIZE;

try {
Metadata metadata = Metadata.parseFrom(metadataBuf);

byte[] logEntryBuf = new byte[metadata.getLength()];

o.get(logEntryBuf);

LogEntry entry = LogEntry.parseFrom(logEntryBuf);
try (AutoCleanableMappedBuffer logBuf =
new AutoCleanableMappedBuffer(fc.map(MapMode.READ_ONLY, 0, logFileSize))) {
final ByteBuf buf = Unpooled.wrappedBuffer(logBuf.getBuffer());

try (ByteBufInputStream bbis = new ByteBufInputStream(buf)) {
Metadata headerMetadata = Metadata
.parseFrom(ByteStreams.limit(bbis, METADATA_SIZE));
bbis.skipBytes(headerMetadata.getLength());

while (bbis.available() > 0) {
try {
final short magic = bbis.readShort();
if (magic != RECORD_DELIMITER) {
log.error("Expected a delimiter but found something else while "
+ "trying to read file {}", sh.fileName);
throw new DataCorruptionException();
}

if (!noVerify) {
if (metadata.getChecksum() != getChecksum(entry.toByteArray())) {
log.error("Checksum mismatch detected while trying to read file {}",
sh.fileName);
Metadata metadata = Metadata
.parseFrom(ByteStreams.limit(bbis, METADATA_SIZE));
int index = buf.readerIndex();
LogEntry entry = LogEntry.parseFrom(ByteStreams.limit(bbis,
metadata.getLength()));
if (!noVerify) {
if (metadata.getChecksum() != getChecksum(entry.toByteArray())) {
log.error("Checksum mismatch detected while trying to read file {}",
sh.fileName);
throw new DataCorruptionException();
}
}
sh.knownAddresses.put(entry.getGlobalAddress(),
new AddressMetaData(metadata.getChecksum(),
metadata.getLength(), index));
} catch (InvalidProtocolBufferException e) {
throw new DataCorruptionException();
}
}

sh.knownAddresses.put(entry.getGlobalAddress(),
new AddressMetaData(metadata.getChecksum(),
metadata.getLength(), channelOffset));

channelOffset += metadata.getLength();

} catch (InvalidProtocolBufferException e) {
throw new DataCorruptionException();
}
} finally {
fc.close();
}
}

Expand All @@ -691,23 +683,24 @@ private void readAddressSpace(SegmentHandle sh) throws IOException {
*/
private LogData readRecord(SegmentHandle sh, long address)
throws IOException {
FileChannel fc = null;
FileChannel fc = getChannel(sh.fileName, true);
try {
fc = getChannel(sh.fileName, true);
AddressMetaData metaData = sh.getKnownAddresses().get(address);
if (metaData == null) {
return null;
}

fc.position(metaData.offset);

try {
ByteBuffer entryBuf = ByteBuffer.allocate(metaData.length);
fc.read(entryBuf);
return getLogData(LogEntry.parseFrom(entryBuf.array()));
} catch (InvalidProtocolBufferException e) {
throw new DataCorruptionException();
try (AutoCleanableMappedBuffer logBuf =
new AutoCleanableMappedBuffer(
fc.map(MapMode.READ_ONLY, metaData.offset, metaData.length))) {
try (ByteBufInputStream bbis = new ByteBufInputStream(
Unpooled.wrappedBuffer(logBuf.getBuffer()))) {
return getLogData(LogEntry.parseFrom(ByteStreams.limit(bbis, metaData.length)));
}
}

} catch (InvalidProtocolBufferException e) {
throw new DataCorruptionException();
} finally {
if (fc != null) {
fc.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.corfudb.util;

import java.nio.MappedByteBuffer;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import sun.nio.ch.DirectBuffer;

/** {@link this} wraps around a {@link MappedByteBuffer}, implementing the
* {@link java.lang.AutoCloseable} interface which enables a {@link MappedByteBuffer} to be
* automatically cleaned via a try-resources block.
*
* <p>Once the buffer is closed it should no longer be used.
*/
@Slf4j
public class AutoCleanableMappedBuffer implements AutoCloseable {

/** The {@link java.nio.MappedByteBuffer} being wrapped. */
final MappedByteBuffer buffer;

/** Construct a new {@link org.corfudb.util.AutoCleanableMappedBuffer}.
*
* @param buffer The {@link MappedByteBuffer} to wrap.
*/
public AutoCleanableMappedBuffer(@Nonnull MappedByteBuffer buffer) {
this.buffer = buffer;
}

/** Get the underlying buffer. It is recommended that a reference NOT be saved (e.g, call
* {@link this#getBuffer()} each time the buffer is needed).
*
* @return The underlying {@link MappedByteBuffer}
*/
public MappedByteBuffer getBuffer() {
return buffer;
}

/** {@inheritDoc}
*
* @throws UnsupportedOperationException If the buffer could not be auto-cleaned.
*/
@Override
public void close() {
try {
((DirectBuffer) buffer).cleaner().clean();
} catch (Exception ex) {
throw new UnsupportedOperationException("Failed to autoclean buffer");
}
}
}

0 comments on commit 7bd4da3

Please sign in to comment.