Skip to content

Commit

Permalink
ZOOKEEPER-3144: Fix potential ephemeral nodes inconsistent due to glo…
Browse files Browse the repository at this point in the history
…bal session inconsistent with fuzzy snapshot

There is a race condition between update the lastProcessedZxid and the actual session change in DataTree, which could cause global session inconsistent, which then could cause ephemeral inconsistent.

For more details, please check the description in JIRA ZOOKEEPER-3144.

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: Michael Han <hanm@apache.org>

Closes #621 from lvfangmin/ZOOKEEPER-3144
  • Loading branch information
Fangmin Lyu authored and hanm committed Sep 14, 2018
1 parent 7163008 commit b587910
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 14 deletions.
28 changes: 15 additions & 13 deletions src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,21 @@ public void setZKDatabase(ZKDatabase zkDb) {
*/
public void loadData() throws IOException, InterruptedException {
/*
* When a new leader starts executing Leader#lead, it
* When a new leader starts executing Leader#lead, it
* invokes this method. The database, however, has been
* initialized before running leader election so that
* the server could pick its zxid for its initial vote.
* It does it by invoking QuorumPeer#getLastLoggedZxid.
* Consequently, we don't need to initialize it once more
* and avoid the penalty of loading it a second time. Not
* and avoid the penalty of loading it a second time. Not
* reloading it is particularly important for applications
* that host a large database.
*
*
* The following if block checks whether the database has
* been initialized or not. Note that this method is
* invoked by at least one other method:
* invoked by at least one other method:
* ZooKeeperServer#startdata.
*
*
* See ZOOKEEPER-1642 for more detail.
*/
if(zkDb.isInitialized()){
Expand All @@ -291,7 +291,7 @@ public void loadData() throws IOException, InterruptedException {
else {
setZxid(zkDb.loadDataBase());
}

// Clean up dead sessions
List<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {
Expand Down Expand Up @@ -364,7 +364,7 @@ public long getZxid() {
public SessionTracker getSessionTracker() {
return sessionTracker;
}

long getNextZxid() {
return hzxid.incrementAndGet();
}
Expand Down Expand Up @@ -1181,7 +1181,7 @@ private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IO
String authorizationID = saslServer.getAuthorizationID();
LOG.info("adding SASL authorization for authorizationID: " + authorizationID);
cnxn.addAuthInfo(new Id("sasl",authorizationID));
if (System.getProperty("zookeeper.superUser") != null &&
if (System.getProperty("zookeeper.superUser") != null &&
authorizationID.equals(System.getProperty("zookeeper.superUser"))) {
cnxn.addAuthInfo(new Id("super", ""));
}
Expand Down Expand Up @@ -1224,11 +1224,7 @@ private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
rc = getZKDatabase().processTxn(hdr, txn);
} else {
rc = new ProcessTxnResult();
}

if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
Expand All @@ -1241,6 +1237,12 @@ private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}

if (hdr != null) {
rc = getZKDatabase().processTxn(hdr, txn);
} else {
rc = new ProcessTxnResult();
}
return rc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.sasl.SaslException;

import org.apache.jute.OutputArchive;
Expand All @@ -42,6 +43,7 @@
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;

import org.junit.Assert;
Expand All @@ -60,14 +62,15 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {

MainThread[] mt = null;
ZooKeeper[] zk = null;
int[] clientPorts = null;
int leaderId;
int followerA;

@Before
public void setup() throws Exception {
LOG.info("Start up a 3 server quorum");
final int ENSEMBLE_SERVERS = 3;
final int clientPorts[] = new int[ENSEMBLE_SERVERS];
clientPorts = new int[ENSEMBLE_SERVERS];
StringBuilder sb = new StringBuilder();
String server;

Expand Down Expand Up @@ -259,6 +262,55 @@ private void compareStat(String path, int sid, int compareWithSid) throws Except
Assert.assertEquals(stat1, stat2);
}

@Test
public void testGlobalSessionConsistency() throws Exception {
LOG.info("Hook to catch the commitSession event on followerA");
CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main;
final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer();

// only take snapshot for the next global session we're going to create
final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true);
followerAMain.setCommitSessionListener(new CommitSessionListener() {
@Override
public void process(long sessionId) {
LOG.info("Take snapshot");
if (shouldTakeSnapshot.getAndSet(false)) {
zkServer.takeSnapshot(true);
}
}
});

LOG.info("Create a global session");
ZooKeeper globalClient = new ZooKeeper(
"127.0.0.1:" + clientPorts[followerA],
ClientBase.CONNECTION_TIMEOUT, this);
QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED);

LOG.info("Restart followerA to load the data from disk");
mt[followerA].shutdown();
QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);

mt[followerA].start();
QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);

LOG.info("Make sure the global sessions are consistent with leader");

Map<Long, Integer> globalSessionsOnLeader =
mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
if (mt[followerA].main.quorumPeer == null) {
LOG.info("quorumPeer is null");
}
if (mt[followerA].main.quorumPeer.getZkDb() == null) {
LOG.info("zkDb is null");
}
Map<Long, Integer> globalSessionsOnFollowerA =
mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(),
globalSessionsOnFollowerA.keySet());
Assert.assertTrue(globalSessionsOnFollowerA.keySet().containsAll(
globalSessionsOnLeader.keySet()));
}

private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Expand Down Expand Up @@ -310,7 +362,17 @@ static interface NodeSerializeListener {
public void nodeSerialized(String path);
}

static interface CommitSessionListener {
public void process(long sessionId);
}

static class CustomizedQPMain extends TestQPMain {
CommitSessionListener commitSessionListener;

public void setCommitSessionListener(CommitSessionListener listener) {
this.commitSessionListener = listener;
}

@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer() {
Expand All @@ -323,6 +385,31 @@ public DataTree createDataTree() {
}
});
}

@Override
protected Follower makeFollower(FileTxnSnapLog logFactory)
throws IOException {
return new Follower(this, new FollowerZooKeeperServer(
logFactory, this, this.getZkDb()) {
@Override
public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(
this, getZKDatabase().getSessionWithTimeOuts(),
this.tickTime, self.getId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener()) {

public synchronized boolean commitSession(
long sessionId, int sessionTimeout) {
if (commitSessionListener != null) {
commitSessionListener.process(sessionId);
}
return super.commitSession(sessionId, sessionTimeout);
}
};
}
});
}
};
}
}
Expand Down

0 comments on commit b587910

Please sign in to comment.