From dad75459c940e208fe0ef36595963b853264b660 Mon Sep 17 00:00:00 2001 From: Sirius Date: Thu, 28 Mar 2024 17:17:00 +0800 Subject: [PATCH] ZOOKEEPER-4643: Committed txns may be improperly truncated if follower crashes right after updating currentEpoch but before persisting txns to disk --- .../zookeeper/server/quorum/Learner.java | 55 ++++++++++++------- .../zookeeper/server/quorum/Zab1_0Test.java | 25 ++------- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index e3bd13d1165..2d8e9a5beff 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -556,7 +556,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { readPacket(qp); Deque packetsCommitted = new ArrayDeque<>(); Deque packetsNotCommitted = new ArrayDeque<>(); - Deque requestsToAck = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { @@ -756,26 +755,51 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { + if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) { long startTime = Time.currentElapsedTime(); FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest); - requestsToAck.add(request); + + /* + * @see https://github.com/apache/zookeeper/pull/1848 + * Log and process the pending txns in "packetsNotCommitted" + * according to "packetsCommitted", which have been committed + * by the leader. For these committed proposals, there is no + * need to reply ack. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394 + * Keep the outstanding proposals in "packetsNotCommitted" to + * avoid NullPointerException when receiving COMMIT packet(s) + * right after replying NEWLEADER ack. + */ + while (!packetsCommitted.isEmpty()) { + long zxid = packetsCommitted.removeFirst(); + pif = packetsNotCommitted.peekFirst(); + if (pif == null) { + LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid)); + continue; + } else if (pif.hdr.getZxid() != zxid) { + LOG.warn("Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid())); + continue; + } + packetsNotCommitted.removeFirst(); + fzk.appendRequest(pif.hdr, pif.rec, pif.digest); + fzk.commit(zxid); } - // persist the txns to disk + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 + // Make sure to persist the txns to disk before replying NEWLEADER ack. fzk.getZKDatabase().commit(); - LOG.info("{} txns have been persisted and it took {}ms", - packetsNotCommitted.size(), Time.currentElapsedTime() - startTime); - packetsNotCommitted.clear(); + LOG.info("It took {}ms to log and commit pending txns in Leader's committed history. " + + "{} txns left in packetsNotCommitted", + Time.currentElapsedTime() - startTime, packetsNotCommitted.size()); } - // set the current epoch after all the tnxs are persisted + // ZOOKEEPER-4643 / ZOOKEEPER-4785: set the current epoch after txns are persisted self.setCurrentEpoch(newEpoch); LOG.info("Set the current epoch to {}", newEpoch); - // send NEWLEADER ack after all the tnxs are persisted + // send NEWLEADER ack after the txns are persisted writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid)); break; @@ -796,15 +820,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { - // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout - // on waiting for a quorum of followers - for (final Request request : requestsToAck) { - final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null); - writePacket(ackPacket, false); - } - writePacket(null, true); - requestsToAck.clear(); - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec, p.digest); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 76a678f501c..139845a3071 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -741,8 +741,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) readPacketSkippingPing(ia, qp); assertEquals(Leader.ACKEPOCH, qp.getType()); - assertEquals(0, qp.getZxid()); - assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid()); + assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt()); assertEquals(1, f.self.getAcceptedEpoch()); assertEquals(0, f.self.getCurrentEpoch()); @@ -765,11 +765,6 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) qp.setZxid(0); oa.writeRecord(qp, null); - // Read the uptodate ack - readPacketSkippingPing(ia, qp); - assertEquals(Leader.ACK, qp.getType()); - assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - // Get the ack of the new leader readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); @@ -777,24 +772,16 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(1, f.self.getAcceptedEpoch()); assertEquals(1, f.self.getCurrentEpoch()); - //Wait for the transactions to be written out. The thread that writes them out - // does not send anything back when it is done. - long start = System.currentTimeMillis(); - while (createSessionZxid != f.fzk.getLastProcessedZxid() - && (System.currentTimeMillis() - start) < 50) { - Thread.sleep(1); - } + // Read the uptodate ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); - start = System.currentTimeMillis(); zkDb2.loadDataBase(); - while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { - Thread.sleep(1); - zkDb2.loadDataBase(); - } LOG.info("zkdb2 sessions:{}", zkDb2.getSessions()); LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts()); assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));