Permalink
Browse files

Fail fast when Zookeeper connections expire

  • Loading branch information...
1 parent 6f8ad92 commit db7a8908d758be67c045bd5b7b6191062c70bdc8 @matthieumorel matthieumorel committed Jul 20, 2012
@@ -118,11 +118,11 @@ public void handleDataDeleted(String dataPath) throws Exception {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
- if (!state.equals(KeeperState.SyncConnected)) {
- logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
- zkClient.close();
- zkClient.connect(connectionTimeout, null);
- handleNewSession();
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterName);
+ System.exit(1);
}
}
@@ -186,7 +186,12 @@ public boolean equals(Object obj) {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
- // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterRef.get().toString());
+ System.exit(1);
+ }
}
@Override
@@ -78,10 +78,11 @@ private void doProcess() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
- if (!state.equals(KeeperState.SyncConnected)) {
- logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
- zkClient.connect(connectionTimeout, null);
- handleNewSession();
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterName);
+ System.exit(1);
}
}
@@ -114,6 +114,11 @@ private void doProcess() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
+ if (state.equals(KeeperState.Expired)) {
+ logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+ System.exit(1);
+ }
+
}
@Override
@@ -13,7 +13,12 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
-public class AssignmentsFromZKTest extends ZKBaseTest {
+/**
+ * Separated from AssignmentsFromZKTest2 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ *
+ */
+public class AssignmentsFromZKTest1 extends ZKBaseTest {
@Test
public void testAssignmentFor1Cluster() throws Exception {
@@ -22,15 +27,7 @@ public void testAssignmentFor1Cluster() throws Exception {
testAssignment(taskSetup, topologyNames);
}
- @Test
- public void testAssignmentFor2Clusters() throws Exception {
- Thread.sleep(2000);
- TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
- final String topologyNames = "cluster2, cluster3";
- testAssignment(taskSetup, topologyNames);
- }
-
- private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+ public static void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
taskSetup.clean("s4");
for (String topologyName : names) {
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.topology;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+/**
+ * Separated from AssignmentsFromZKTest1 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ *
+ */
+public class AssignmentsFromZKTest2 extends ZkBasedTest {
+
+ @Test
+ public void testAssignmentFor2Clusters() throws Exception {
+ Thread.sleep(2000);
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String topologyNames = "cluster2, cluster3";
+ AssignmentsFromZKTest1.testAssignment(taskSetup, topologyNames);
+ }
+
+}

0 comments on commit db7a890

Please sign in to comment.