From f42996b1f162a9e9eef143b9d57021d7e81d722f Mon Sep 17 00:00:00 2001 From: Allen George Date: Tue, 29 Apr 2014 08:23:51 -0700 Subject: [PATCH] Snapshot work --- .../libraft/agent/rpc/RaftNetworkClient.java | 2 +- .../io/libraft/algorithm/RPCReceiver.java | 2 +- .../java/io/libraft/algorithm/RPCSender.java | 2 +- .../io/libraft/algorithm/RaftAlgorithm.java | 202 +++++++++--------- .../io/libraft/algorithm/StoringSender.java | 2 +- 5 files changed, 107 insertions(+), 103 deletions(-) diff --git a/libraft-agent/src/main/java/io/libraft/agent/rpc/RaftNetworkClient.java b/libraft-agent/src/main/java/io/libraft/agent/rpc/RaftNetworkClient.java index d860512..8abbf15 100644 --- a/libraft-agent/src/main/java/io/libraft/agent/rpc/RaftNetworkClient.java +++ b/libraft-agent/src/main/java/io/libraft/agent/rpc/RaftNetworkClient.java @@ -472,7 +472,7 @@ public void snapshotChunk(String server, long term, long snapshotTerm, long snap } @Override - public void snapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextExpectedSeqnum) throws RPCException { + public void snapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextSeqnum) throws RPCException { // FIXME (AG): noop } } diff --git a/libraft-core/src/main/java/io/libraft/algorithm/RPCReceiver.java b/libraft-core/src/main/java/io/libraft/algorithm/RPCReceiver.java index eb0755d..6ecc8a1 100644 --- a/libraft-core/src/main/java/io/libraft/algorithm/RPCReceiver.java +++ b/libraft-core/src/main/java/io/libraft/algorithm/RPCReceiver.java @@ -114,5 +114,5 @@ public interface RPCReceiver { void onSnapshotChunk(String server, long term, long snapshotTerm, long snapshotIndex, int seqnum, @Nullable InputStream chunkInputStream); - void onSnapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextExpectedSeqnum); + void onSnapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextSeqnum); } diff --git a/libraft-core/src/main/java/io/libraft/algorithm/RPCSender.java b/libraft-core/src/main/java/io/libraft/algorithm/RPCSender.java index b01296f..bdf175a 100644 --- a/libraft-core/src/main/java/io/libraft/algorithm/RPCSender.java +++ b/libraft-core/src/main/java/io/libraft/algorithm/RPCSender.java @@ -114,5 +114,5 @@ public interface RPCSender { void snapshotChunk(String server, long term, long snapshotTerm, long snapshotIndex, int seqnum, @Nullable InputStream chunkInputStream) throws RPCException; - void snapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextExpectedSeqnum) throws RPCException; + void snapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextSeqnum) throws RPCException; } diff --git a/libraft-core/src/main/java/io/libraft/algorithm/RaftAlgorithm.java b/libraft-core/src/main/java/io/libraft/algorithm/RaftAlgorithm.java index 677b0fc..1e63f62 100644 --- a/libraft-core/src/main/java/io/libraft/algorithm/RaftAlgorithm.java +++ b/libraft-core/src/main/java/io/libraft/algorithm/RaftAlgorithm.java @@ -331,7 +331,7 @@ private final class IncomingSnapshot { private final long snapshotIndex; private final ExtendedSnapshotWriter snapshotWriter; - private int nextExpectedSeqnum = 0; + private int nextSeqnum = 0; private IncomingSnapshot(long snapshotTerm, long snapshotIndex, ExtendedSnapshotWriter snapshotWriter) { this.snapshotTerm = snapshotTerm; @@ -351,14 +351,14 @@ private long getIndex() { return snapshotIndex; } - private int getNextExpectedSeqnum() { - return nextExpectedSeqnum; + private int getNextSeqnum() { + return nextSeqnum; } public void readSnapshotChunk(long seqnum, InputStream chunkInputstream) throws IOException { - checkArgument(seqnum == nextExpectedSeqnum, "seqnum:%s nextExpectedSeqnum:%s", seqnum, nextExpectedSeqnum); + checkArgument(seqnum == nextSeqnum, "seqnum:%s nextSeqnum:%s", seqnum, nextSeqnum); ByteStreams.copy(chunkInputstream, snapshotWriter.getSnapshotOutputStream()); - nextExpectedSeqnum++; + nextSeqnum++; } public void closeSnapshotWriter() throws IOException { @@ -375,12 +375,12 @@ public boolean equals(@Nullable Object o) { return snapshotTerm == other.snapshotTerm && snapshotIndex == other.snapshotIndex && snapshotWriter.equals(other.snapshotWriter) - && nextExpectedSeqnum == other.nextExpectedSeqnum; + && nextSeqnum == other.nextSeqnum; } @Override public int hashCode() { - return Objects.hashCode(snapshotTerm, snapshotIndex, snapshotWriter, nextExpectedSeqnum); + return Objects.hashCode(snapshotTerm, snapshotIndex, snapshotWriter, nextSeqnum); } @Override @@ -390,17 +390,18 @@ public String toString() { .add("snapshotTerm", snapshotTerm) .add("snapshotIndex", snapshotIndex) .add("snapshotWriter", snapshotWriter) - .add("nextExpectedSeqnum", nextExpectedSeqnum) + .add("nextSeqnum", nextSeqnum) .toString(); } } + // FIXME (AG): iterator? private final class OutgoingSnapshot { private final ExtendedSnapshot snapshot; - private int seqnum = 0; - private @Nullable ByteArrayInputStream lastChunkSentInputStream = null; + private int seqnum = -1; + private @Nullable ByteArrayInputStream chunkInputStream = null; private OutgoingSnapshot(ExtendedSnapshot snapshot) { this.snapshot = snapshot; @@ -418,23 +419,23 @@ private int getSeqnum() { return seqnum; } - private @Nullable ByteArrayInputStream getChunkInputStream() { - return lastChunkSentInputStream; + private @Nullable ByteArrayInputStream getChunk() { + return chunkInputStream; } - private @Nullable ByteArrayInputStream getNextChunkInputStream() throws IOException { + private @Nullable ByteArrayInputStream nextChunk() throws IOException { ByteArrayOutputStream chunkOutputStream = new ByteArrayOutputStream(RaftConstants.MAX_CHUNK_SIZE); ByteStreams.copy(snapshot.getSnapshotInputStream(), chunkOutputStream); if (chunkOutputStream.size() != 0) { - lastChunkSentInputStream = new ByteArrayInputStream(chunkOutputStream.toByteArray()); + chunkInputStream = new ByteArrayInputStream(chunkOutputStream.toByteArray()); } else { - lastChunkSentInputStream = null; + chunkInputStream = null; } seqnum++; - return lastChunkSentInputStream; + return chunkInputStream; } @Override @@ -443,12 +444,15 @@ public boolean equals(@Nullable Object o) { if (o == null || getClass() != o.getClass()) return false; OutgoingSnapshot other = (OutgoingSnapshot) o; - return snapshot.equals(other.snapshot) && lastChunkSentInputStream == null ? other.lastChunkSentInputStream == null : lastChunkSentInputStream.equals(other.lastChunkSentInputStream); + + return snapshot.equals(other.snapshot) + && seqnum == other.seqnum + && chunkInputStream == null ? other.chunkInputStream == null : chunkInputStream.equals(other.chunkInputStream); } @Override public int hashCode() { - return Objects.hashCode(snapshot, lastChunkSentInputStream); + return Objects.hashCode(snapshot, chunkInputStream); } @Override @@ -456,7 +460,8 @@ public String toString() { return Objects .toStringHelper(this) .add("snapshot", snapshot) - .add("lastChunkSentInputStream", lastChunkSentInputStream) + .add("seqnum", seqnum) + .add("chunkInputStream", chunkInputStream) .toString(); } } @@ -1015,14 +1020,16 @@ private void handleElectionTimeout() throws StorageException { } } + // use this _only_ for local IOExceptions or StorageExceptions + // // bail // if there's something wrong with the // persistence layer, I don't know, and don't want to know // how to deal with it. the system should not attempt to continue // because we have no idea exactly what happened and whether // it's recoverable or not - private void handleStorageException(StorageException e) { - throw new RaftError(e); + private void throwRaftError(Throwable cause) { + throw new RaftError(cause); } private void beginElection() throws StorageException { @@ -1425,7 +1432,7 @@ public synchronized void onRequestVote(String server, long term, long lastLogTer sendRequestVoteReply(server, currentTerm, currentTerm, voteGranted); } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -1485,7 +1492,7 @@ public synchronized void onRequestVoteReply(String server, long term, boolean vo } } } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -1627,7 +1634,7 @@ public synchronized void onAppendEntries(String server, long term, long commitIn setCommandFuturesAndNotifyClient(originalSelfCommitIndex + 1, selfCommitIndex); } } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -1804,7 +1811,7 @@ public synchronized void onAppendEntriesReply(String server, long term, long pre // TODO (AG): catch up servers that have responded } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -1853,7 +1860,7 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm if (term < currentTerm) { LOGGER.warn("{}: SnapshotChunk from {}: ignore: term:{} currentTerm:{}", self, server, term, currentTerm); - sendSnapshotChunkReply(server, currentTerm, snapTerm, snapIndex, 0); // nextExpectedSeqNum ignored + sendSnapshotChunkReply(server, currentTerm, snapTerm, snapIndex, 0); // nextSeqnum ignored return; } @@ -1865,10 +1872,13 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm checkState(term == currentTerm, "term:%s currentTerm:%s", term, currentTerm); - // this can happen if you've crashed and restarted within a term - // on restart you are in the FOLLOWER role, (because the election timeout hasn't triggered) - // yet you don't know who the current leader is yet - // or, your election timeout expires and you shift into the candidate role + // this can happen if you've crashed and + // restarted within a term on restart you are + // in the FOLLOWER role, (because the election + // timeout hasn't triggered) yet you don't + // know who the current leader is yet or, + // your election timeout expires and you shift + // into the candidate role if ((role == Role.FOLLOWER && leader == null) || (role == Role.CANDIDATE)) { becomeFollowerWithoutUpdatingCurrentTerm(term, server); } @@ -1878,11 +1888,12 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm long commitIndex = store.getCommitIndex(); if (commitIndex >= snapIndex) { - // if our commitIndex is greater than the snapshot index - // that means that the leader should know that we've committed - // those entries, and so they know where our nextIndex is - // as a result this is a late packet, and we shouldn't have - // to send a response to them + // if our commitIndex is greater than the + // snapshot index that means that the leader + // should know that we've committed those entries, + // and so they know where our nextIndex is as + // a result this is a late packet, and we + // shouldn't have to send a response to them LOGGER.warn("{}: SnapshotChunk from {}: late chunk: commitIndex:{} snapIndex:{}", self, server, commitIndex, snapIndex); return; } @@ -1892,10 +1903,12 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm if (incomingSnapshot != null) { int compared = compareLogicalTimestamps(snapTerm, snapIndex, incomingSnapshot.getTerm(), incomingSnapshot.getIndex()); if (compared < 0) { // packet from an older snapshot - // we can safely ignore this packet and not respond to the sender - // because they've already sent us a packet from a newer snapshot - // as a result, they don't actually need a hint from us to tell - // them which packet to send next (they're not sending packets from the older snapshot) + // we can safely ignore this packet and not respond + // to the sender because they've already sent us a + // packet from a newer snapshot as a result, they + // don't actually need a hint from us to tell them + // which packet to send next (they're not sending + // packets from the older snapshot) LOGGER.warn("{}: SnapshotChunk from {}: late snapshot: snapTerm:{} snapIndex:{} incoming:[snapTerm:{} snapIndex:{}]", self, server, snapTerm, snapIndex, incomingSnapshot.getTerm(), incomingSnapshot.getIndex()); return; } else if (compared > 0) { // newer snapshot - remove the current one @@ -1922,28 +1935,21 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm // I simply shift the election timeout on all packets scheduleNextElectionTimeout(); - int expectedSeqnum = incomingSnapshot.getNextExpectedSeqnum(); + int expectedSeqnum = incomingSnapshot.getNextSeqnum(); if (seqnum != expectedSeqnum) { - LOGGER.warn("{}: SnapshotChunk from {}: seqnum mismatch: seqnum:{} expectedSeqnum:{}", self, server, seqnum, expectedSeqnum); + LOGGER.warn("{}: SnapshotChunk from {}: seqnum mismatch: seqnum:[act:{} exp:{}]", self, server, seqnum, expectedSeqnum); sendSnapshotChunkReply(server, currentTerm, snapTerm, snapIndex, expectedSeqnum); return; } checkState(seqnum == expectedSeqnum, "seqnum:%s expectedSeqnum:%s", seqnum, expectedSeqnum); - try { - incomingSnapshot.readSnapshotChunk(seqnum, chunkInputStream); - } catch (IOException e) { - LOGGER.warn("{}: SnapshotChunk from {}: error reading snapshot:{}", self, server, e.getMessage()); - closeIncomingSnapshotQuietly(); - sendSnapshotChunkReply(server, currentTerm, snapTerm, snapIndex, 0); // try again from the beginning - return; - } + incomingSnapshot.readSnapshotChunk(seqnum, chunkInputStream); // the snapshot isn't complete, so we're done if (chunkInputStream != null) { - LOGGER.trace("{}: SnapshotChunk from {}: need more: snapTerm:{} snapIndex:{} nextSeqnum:{}", self, server, snapTerm, snapIndex, incomingSnapshot.getNextExpectedSeqnum()); + LOGGER.trace("{}: SnapshotChunk from {}: need more: snapTerm:{} snapIndex:{} nextSeqnum:{}", self, server, snapTerm, snapIndex, incomingSnapshot.getNextSeqnum()); return; } @@ -1952,21 +1958,24 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm checkState(chunkInputStream == null); - // start by truncating everything after - // the commitIndex. this way, even if applying - // the snapshot fails we'll still have - // the committed entries sitting around + // start by truncating everything after the + // commitIndex. this way, even if applying the + // snapshot fails we'll still have the + // committed entries sitting around log.truncate(commitIndex + 1); - // --- BEGIN UNSAFE AREA + // TODO (AG): consider failure mitigation strategies - // FIXME (AG): if we crash during any point here we have a problem - // the next time we start up the commitIndex << the snapshotindex and - // the system will abort during initialization + // --- BEGIN UNSAFE AREA - // FIXME (AG): could this be _mitigated_ through the use of a snapshotStore.removeSnapshot(...)? + // this entire block is unsafe due to my + // strict consistency checking at startup + // if storing the snapshot fails then we + // may end up with the snapshot store + // having a snapshot but commitIndex < snapIndex, + // causing startup to abort - // store the snapshot + // store snapshot incomingSnapshot.closeSnapshotWriter(); snapshotsStore.storeSnapshot(incomingSnapshot.getSnapshotWriter()); incomingSnapshot = null; @@ -1974,12 +1983,15 @@ public synchronized void onSnapshotChunk(String server, long term, long snapTerm // update our commit index store.setCommitIndex(snapIndex); - // have the listener apply it + // this is also dangerous what happens if the + // snapshot is broken and the listener can't apply it? listener.applyCommitted(snapshotsStore.getLatestSnapshot()); // --- END UNSAFE AREA + } catch (IOException e) { + throwRaftError(e); } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -1993,11 +2005,11 @@ private void closeIncomingSnapshotQuietly() { } } - private void sendSnapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextExpectedSeqnum) { + private void sendSnapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextSeqnum) { try { - sender.snapshotChunkReply(server, term, snapshotTerm, snapshotIndex, nextExpectedSeqnum); + sender.snapshotChunkReply(server, term, snapshotTerm, snapshotIndex, nextSeqnum); } catch (RPCException e) { - LOGGER.warn("{}: fail send snapshot chunk reply snapshotTerm:{} snapshotIndex:{} to {} cause:{}", self, snapshotTerm, snapshotIndex, server, e.getMessage()); + LOGGER.warn("{}: fail send snapshot chunk reply snapshot:[term:{} index:{}] to {} cause:{}", self, snapshotTerm, snapshotIndex, server, e.getMessage()); } } @@ -2007,8 +2019,8 @@ private void sendSnapshotChunkReply(String server, long term, long snapshotTerm, // @Override - public synchronized void onSnapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextExpectedSeqnum) { - LOGGER.trace("{}: SnapshotChunkReply from {}: term:{} snapshotTerm:{} snapshotIndex:{} nextExpectedSeqnum:{}", self, server, term, snapshotTerm, snapshotIndex, nextExpectedSeqnum); + public synchronized void onSnapshotChunkReply(String server, long term, long snapTerm, long snapIndex, int nextSeqnum) { + LOGGER.trace("{}: SnapshotChunkReply from {}: term:{} snapTerm:{} snapIndex:{} nextSeqnum:{}", self, server, term, snapTerm, snapIndex, nextSeqnum); if (!running) { logNotRunning(); @@ -2017,9 +2029,9 @@ public synchronized void onSnapshotChunkReply(String server, long term, long sna try { checkArgument(term >= 0); - checkArgument(snapshotTerm >= 0); - checkArgument(snapshotIndex >= 0); - checkArgument(nextExpectedSeqnum >= 0); + checkArgument(snapTerm >= 0); + checkArgument(snapIndex >= 0); + checkArgument(nextSeqnum >= 0); long currentTerm = store.getCurrentTerm(); @@ -2047,49 +2059,41 @@ public synchronized void onSnapshotChunkReply(String server, long term, long sna ServerDatum serverDatum = serverData.get(server); serverDatum = checkNotNull(serverDatum); - OutgoingSnapshot outgoingSnapshot = serverDatum.outgoingSnapshot; - if (outgoingSnapshot == null) { - LOGGER.trace("{}: SnapshotChunkReply from {}: ignore: late reply", self, server); - return; - } - - checkState(outgoingSnapshot != null); + OutgoingSnapshot snapshot = serverDatum.outgoingSnapshot; - int comparison = compareLogicalTimestamps(snapshotTerm, snapshotIndex, outgoingSnapshot.getTerm(), outgoingSnapshot.getIndex()); - - checkState(comparison <= 0, "snapshotTerm:%s snapshotIndex:%s outgoingSnapshotTerm:%s outgoingSnapshotIndex:%s", - snapshotTerm, snapshotIndex, outgoingSnapshot.getTerm(), outgoingSnapshot.getIndex()); - - if (comparison < 0) { - LOGGER.warn("{}: SnapshotChunkReply from {}: ignore: old reply snapshotTerm:{} snapshotIndex:{}", snapshotTerm, snapshotIndex); + if (snapshot == null) { + LOGGER.trace("{}: SnapshotChunkReply from {}: late reply", self, server); return; } - checkState(nextExpectedSeqnum <= outgoingSnapshot.getSeqnum()); + checkState(snapshot != null); - if (nextExpectedSeqnum < outgoingSnapshot.getSeqnum()) { - LOGGER.warn("{}: SnapshotChunkReply from {}: ignore out-of order reply seqnum:{} nextExpectedSeqnNum:{}", - nextExpectedSeqnum, outgoingSnapshot.getSeqnum()); + int order = compareLogicalTimestamps(snapTerm, snapIndex, snapshot.getTerm(), snapshot.getIndex()); + checkState(order <= 0, "snapTerm:%s snapIndex:%s outgoing:[snapTerm:%s snapIndex:%s]", snapTerm, snapIndex, snapshot.getTerm(), snapshot.getIndex()); + + if (nextSeqnum < snapshot.getSeqnum()) { + LOGGER.warn("{}: SnapshotChunkReply from {}: late reply: nextSeqnum:{} snapshotSeqnum:{}", nextSeqnum, snapshot.getSeqnum()); return; } InputStream chunkInputStream = null; - if (nextExpectedSeqnum == outgoingSnapshot.getSeqnum()) { - chunkInputStream = outgoingSnapshot.getChunkInputStream(); - } else if (nextExpectedSeqnum == outgoingSnapshot.getSeqnum() + 1) { - chunkInputStream = outgoingSnapshot.getNextChunkInputStream(); + if (nextSeqnum == snapshot.getSeqnum() + 1) { + chunkInputStream = snapshot.nextChunk(); + } else { + checkArgument(nextSeqnum == snapshot.getSeqnum(), "nextSeqnum:%s snapshotSeqnum:%s", nextSeqnum, snapshot.getSeqnum()); + chunkInputStream = snapshot.getChunk(); } if (chunkInputStream == null) { serverDatum.outgoingSnapshot = null; } - sendSnapshotChunk(server, currentTerm, snapshotTerm, snapshotIndex, nextExpectedSeqnum, chunkInputStream); + sendSnapshotChunk(server, currentTerm, snapTerm, snapIndex, nextSeqnum, chunkInputStream); } catch (IOException e) { - // FIXME (AG): what do I do here? + throwRaftError(e); } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -2215,7 +2219,7 @@ public synchronized void snapshotWritten(SnapshotWriter snapshotWriter) { checkIndicesAndTerms(); } catch (StorageException e) { - handleStorageException(e); + throwRaftError(e); } } @@ -2285,8 +2289,8 @@ public synchronized void snapshotWritten(SnapshotWriter snapshotWriter) { // return whatever we have (this may be null!) return committed; } catch (StorageException e) { - handleStorageException(e); - throw new RaftError("handleStorageException did not throw", e); // should not ever get here + throwRaftError(e); + throw new RaftError("throwRaftError did not throw", e); // should not ever get here } } @@ -2373,7 +2377,7 @@ public synchronized ListenableFuture submitCommand(Command command) throws throw e; } catch (StorageException e) { commandFuture.setException(e); - handleStorageException(e); + throwRaftError(e); } catch (Exception e) { commandFuture.setException(e); } diff --git a/libraft-core/src/test/java/io/libraft/algorithm/StoringSender.java b/libraft-core/src/test/java/io/libraft/algorithm/StoringSender.java index 82d798b..dc72cac 100644 --- a/libraft-core/src/test/java/io/libraft/algorithm/StoringSender.java +++ b/libraft-core/src/test/java/io/libraft/algorithm/StoringSender.java @@ -218,7 +218,7 @@ public void snapshotChunk(String server, long term, long snapshotTerm, long snap } @Override - public void snapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextExpectedSeqnum) throws RPCException { + public void snapshotChunkReply(String server, long term, long snapshotTerm, long snapshotIndex, int nextSeqnum) throws RPCException { // FIXME (AG): noop } }