Skip to content

Commit

Permalink
Store snapshots in chunks via the snapshot log manager in order to en…
Browse files Browse the repository at this point in the history
…sure that snapshots can be efficiently replicated via AppendEntries without preventing leader->follower heartbeats.
  • Loading branch information
kuujo committed Feb 4, 2015
1 parent 4541e75 commit 7ccddd1
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 15 deletions.
Expand Up @@ -26,7 +26,10 @@


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
Expand All @@ -42,11 +45,16 @@
public class DefaultStateLog<T> extends AbstractResource<StateLog<T>> implements StateLog<T> { public class DefaultStateLog<T> extends AbstractResource<StateLog<T>> implements StateLog<T> {
private static final int SNAPSHOT_ENTRY = 0; private static final int SNAPSHOT_ENTRY = 0;
private static final int COMMAND_ENTRY = 1; private static final int COMMAND_ENTRY = 1;
private static final int SNAPSHOT_CHUNK_SIZE = 1024 * 1024;
private static final int SNAPSHOT_INFO = 0;
private static final int SNAPSHOT_CHUNK = 1;
private final Map<Integer, OperationInfo> operations = new ConcurrentHashMap<>(128); private final Map<Integer, OperationInfo> operations = new ConcurrentHashMap<>(128);
private final Consistency defaultConsistency; private final Consistency defaultConsistency;
private final SnapshottableLogManager log; private final SnapshottableLogManager log;
private Supplier snapshotter; private Supplier snapshotter;
private Consumer installer; private Consumer installer;
private SnapshotInfo snapshotInfo;
private List<ByteBuffer> snapshotChunks;


public DefaultStateLog(ResourceContext context) { public DefaultStateLog(ResourceContext context) {
super(context); super(context);
Expand Down Expand Up @@ -200,25 +208,133 @@ private void takeSnapshot(long index) {
Object snapshot = snapshotter != null ? snapshotter.get() : null; Object snapshot = snapshotter != null ? snapshotter.get() : null;
ByteBuffer snapshotBuffer = serializer.writeObject(snapshot); ByteBuffer snapshotBuffer = serializer.writeObject(snapshot);
snapshotBuffer.flip(); snapshotBuffer.flip();
ByteBuffer snapshotEntry = ByteBuffer.allocate(snapshotBuffer.limit() + 4);
snapshotEntry.putInt(SNAPSHOT_ENTRY); // Create a unique snapshot ID and calculate the number of chunks for the snapshot.
snapshotEntry.put(snapshotBuffer); byte[] snapshotId = UUID.randomUUID().toString().getBytes();
snapshotEntry.flip(); int numChunks = (int) Math.ceil(snapshotBuffer.limit() / SNAPSHOT_CHUNK_SIZE);
List<ByteBuffer> chunks = new ArrayList<>(numChunks);

// The first entry in the snapshot is snapshot metadata.
ByteBuffer info = ByteBuffer.allocate(20 + snapshotId.length);
info.putInt(SNAPSHOT_ENTRY);
info.putInt(SNAPSHOT_INFO);
info.putInt(snapshotId.length);
info.put(snapshotId);
info.putInt(snapshotBuffer.limit());
info.putInt(numChunks);

// Now we append a list of snapshot chunks. This ensures that snapshots can be easily replicated in chunks.
int i = 0;
int position = 0;
while (position < snapshotBuffer.limit()) {
byte[] bytes = new byte[Math.min(snapshotBuffer.limit() - position, SNAPSHOT_CHUNK_SIZE)];
snapshotBuffer.get(bytes);
ByteBuffer chunk = ByteBuffer.allocate(16 + bytes.length);
chunk.putInt(SNAPSHOT_ENTRY); // Indicates the entry is a snapshot entry.
chunk.putInt(SNAPSHOT_CHUNK);
chunk.putInt(i++); // The position of the chunk in the snapshot.
chunk.putInt(bytes.length); // The snapshot chunk length.
chunk.put(bytes); // The snapshot chunk bytes.
chunk.flip();
chunks.add(chunk);
position += bytes.length;
}

try { try {
log.appendSnapshot(index, snapshotEntry); log.appendSnapshot(index, chunks);
} catch (IOException e) { } catch (IOException e) {
throw new CopycatException("Failed to compact state log", e); throw new CopycatException("Failed to compact state log", e);
} }
} }


/** /**
* Installs a snapshot. * Installs a snapshot.
*
* This method operates on the assumption that snapshots will always be replicated with an initial metadata entry.
* The metadata entry specifies a set of chunks that complete the entire snapshot.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void installSnapshot(ByteBuffer snapshot) { private void installSnapshot(ByteBuffer snapshotChunk) {
if (installer != null) { // Get the snapshot entry type.
Object value = serializer.readObject(snapshot); int type = snapshotChunk.getInt();
installer.accept(value); if (type == SNAPSHOT_INFO) {
// The snapshot info defines the snapshot ID and number of chunks. When a snapshot info entry is processed,
// reset the current snapshot chunks and store the snapshot info. The next entries to be applied should be the
// snapshot chunks themselves.
int idLength = snapshotChunk.getInt();
byte[] idBytes = new byte[idLength];
snapshotChunk.get(idBytes);
String id = new String(idBytes);
int size = snapshotChunk.getInt();
int numChunks = snapshotChunk.getInt();
if (snapshotInfo == null || !snapshotInfo.id.equals(id)) {
snapshotInfo = new SnapshotInfo(id, size, numChunks);
snapshotChunks = new ArrayList<>(numChunks);
}
} else if (type == SNAPSHOT_CHUNK && snapshotInfo != null) {
// When a chunk is received, use the chunk's position in the snapshot to ensure consistency. Extract the chunk
// bytes and only append the the chunk if it matches the expected position in the local chunks list.
int index = snapshotChunk.getInt();
int chunkLength = snapshotChunk.getInt();
byte[] chunkBytes = new byte[chunkLength];
snapshotChunk.get(chunkBytes);
if (snapshotChunks.size() == index) {
snapshotChunks.add(ByteBuffer.wrap(chunkBytes));

// Once the number of chunks has grown to the complete expected chunk count, combine and install the snapshot.
if (snapshotChunks.size() == snapshotInfo.chunks) {
if (installer != null) {
// Calculate the total aggregated size of the snapshot.
int size = 0;
for (ByteBuffer chunk : snapshotChunks) {
size += chunk.limit();
}

// Make sure the combined snapshot size is equal to the expected snapshot size.
Assert.state(size == snapshotInfo.size, "Received inconsistent snapshot");

// Create a complete view of the snapshot by appending all chunks to each other.
ByteBuffer completeSnapshot = ByteBuffer.allocate(size);
for (ByteBuffer chunk : snapshotChunks) {
completeSnapshot.put(chunk);
}

// Once a view of the snapshot has been created, deserialize and install the snapshot.
Object value = serializer.readObject(completeSnapshot);
installer.accept(value);
}
snapshotInfo = null;
snapshotChunks = null;
}
}
}
}

/**
* Current snapshot info.
*/
private static class SnapshotInfo {
private final String id;
private final int size;
private final int chunks;

private SnapshotInfo(String id, int size, int chunks) {
this.id = id;
this.size = size;
this.chunks = chunks;
}
}

/**
* Snapshot chunk.
*/
private static class SnapshotChunk {
private final int index;
private final ByteBuffer chunk;

private SnapshotChunk(int index, ByteBuffer chunk) {
this.index = index;
this.chunk = chunk;
} }
} }


Expand Down
Expand Up @@ -15,14 +15,15 @@
*/ */
package net.kuujo.copycat.state.internal; package net.kuujo.copycat.state.internal;


import net.kuujo.copycat.util.internal.Assert;
import net.kuujo.copycat.log.LogConfig; import net.kuujo.copycat.log.LogConfig;
import net.kuujo.copycat.log.LogManager; import net.kuujo.copycat.log.LogManager;
import net.kuujo.copycat.log.LogSegment; import net.kuujo.copycat.log.LogSegment;
import net.kuujo.copycat.util.internal.Assert;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;


Expand Down Expand Up @@ -122,7 +123,7 @@ public boolean isSnapshottable(long index) {
* @return The index at which the snapshot was written. * @return The index at which the snapshot was written.
* @throws IOException If the log could not be rolled over. * @throws IOException If the log could not be rolled over.
*/ */
public long appendSnapshot(long index, ByteBuffer snapshot) throws IOException { public long appendSnapshot(long index, List<ByteBuffer> snapshot) throws IOException {
LogSegment segment = logManager.segment(index); LogSegment segment = logManager.segment(index);
if (segment == null) { if (segment == null) {
throw new IndexOutOfBoundsException("Invalid snapshot index " + index); throw new IndexOutOfBoundsException("Invalid snapshot index " + index);
Expand All @@ -134,10 +135,10 @@ public long appendSnapshot(long index, ByteBuffer snapshot) throws IOException {


// When appending a snapshot, force the snapshot log manager to roll over to a new segment, append the snapshot // When appending a snapshot, force the snapshot log manager to roll over to a new segment, append the snapshot
// to the log, and then compact the log once the snapshot has been appended. // to the log, and then compact the log once the snapshot has been appended.
ByteBuffer entry = ByteBuffer.allocate(8 + snapshot.limit()); snapshotManager.rollOver(index - snapshot.size() + 1);
entry.put(snapshot); for (ByteBuffer entry : snapshot) {
snapshotManager.rollOver(index); snapshotManager.appendEntry(entry);
snapshotManager.appendEntry(entry); }
compact(snapshotManager); compact(snapshotManager);
compact(logManager); compact(logManager);
return index; return index;
Expand Down

0 comments on commit 7ccddd1

Please sign in to comment.