Skip to content

Commit

Permalink
ZOOKEEPER-4785: Txn loss due to race condition in Learner.syncWithLea…
Browse files Browse the repository at this point in the history
…der() during DIFF sync (#2111) (#2132)

Author: Li Wang <liwang@apple.com>

Co-authored-by: liwang <liwang@apple.com>
  • Loading branch information
li4wang and liwang committed Feb 13, 2024
1 parent 6428bab commit 34f2929
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -702,14 +702,14 @@ private void sendNotifications() {
qv.toString().getBytes(UTF_8));

LOG.debug(
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid), 0x{} (n.peerEpoch) ",
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.peerEpoch), 0x{} (n.round), {} (recipient),"
+ " {} (myid) ",
proposedLeader,
Long.toHexString(proposedZxid),
Long.toHexString(proposedEpoch),
Long.toHexString(logicalclock.get()),
sid,
self.getMyId(),
Long.toHexString(proposedEpoch));
self.getMyId());

sendqueue.offer(notmsg);
}
Expand All @@ -722,12 +722,13 @@ private void sendNotifications() {
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}, epoch: 0x{}, proposed epoch: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
Long.toHexString(curZxid));

Long.toHexString(curZxid),
Long.toHexString(newEpoch),
Long.toHexString(curEpoch));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,23 @@ protected void setupRequestProcessors() {
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();

public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
final Request request = buildRequestToProcess(hdr, txn, digest);
syncProcessor.processRequest(request);
}

/**
* Build a request for the txn and append it to the transaction log
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = buildRequestToProcess(hdr, txn, digest);
getZKDatabase().append(request);
return request;
}

/**
* When a COMMIT message is received, eventually this method is called,
* which matches up the zxid from the COMMIT with (hopefully) the head of
Expand Down Expand Up @@ -181,4 +190,19 @@ protected void unregisterMetrics() {

}

/**
* Build a request for the txn
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
Expand Down Expand Up @@ -749,7 +751,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
zk.takeSnapshot(syncSnapshot);
}

self.setCurrentEpoch(newEpoch);

writeToTxnLog = true;
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
Expand All @@ -759,14 +761,27 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);
}

// persist the txns to disk
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
}

// set the current epoch after all the tnxs are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);

// send NEWLEADER ack after all the tnxs are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
}
}
Expand All @@ -785,13 +800,25 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());

for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
}
LOG.info("{} txns have been committed", packetsCommitted.size());
} else if (zk instanceof ObserverZooKeeperServer) {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
Expand Down

0 comments on commit 34f2929

Please sign in to comment.