Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

I'd always thought that if the client is disconnected from the server

long enough then an Expired event would be generated. Testing, however, shows this not to be the case. I believe
it's related to ZOOKEEPER-1159. The behavior associated with this is that if the clients lost connection to the
cluster for longer than the session expiration they would _never_ be able to reconnect. The connection would
be permanently lost. Many users were seeing this as endless log messages indicating "Connection timed out
for connection...". As a workaround, in 1.3.0+ when the Curator state changes to LOST, a flag will be set
so that the next time Curator needs to get the ZooKeeper instance, the current instance will be closed and a new
ZooKeeper instance will be allocated (as if the session had expired).
  • Loading branch information...
commit 332bac2acb3d5d117adc486763cfc417e0ae6493 1 parent b3eeb7b
@Randgalt Randgalt authored
View
11 CHANGES.txt
@@ -1,5 +1,14 @@
-1.2.7 - xxxxxxxxxxxxxxx
+1.3.0 - xxxxxxxxxxxxxxx
=======================
+* MAJOR CHANGE (thus a version bump): I'd always thought that if the client is disconnected from the server
+long enough then an Expired event would be generated. Testing, however, shows this not to be the case. I believe
+it's related to ZOOKEEPER-1159. The behavior associated with this is that if the clients lost connection to the
+cluster for longer than the session expiration they would _never_ be able to reconnect. The connection would
+be permanently lost. Many users were seeing this as endless log messages indicating "Connection timed out
+for connection...". As a workaround, in 1.3.0+ when the Curator state changes to LOST, a flag will be set
+so that the next time Curator needs to get the ZooKeeper instance, the current instance will be closed and a new
+ZooKeeper instance will be allocated (as if the session had expired).
+
* Added checks for illegal namespaces.
* Issue 232: NodeCache wasn't handling server connection issues well. It would repeatedly execute checkExists()
View
18 curator-client/src/main/java/com/netflix/curator/ConnectionState.java
@@ -42,6 +42,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final HandleHolder zooKeeper;
private final AtomicBoolean isConnected = new AtomicBoolean(false);
+ private final AtomicBoolean lost = new AtomicBoolean(false);
private final EnsembleProvider ensembleProvider;
private final int connectionTimeoutMs;
private final AtomicReference<TracerDriver> tracer;
@@ -71,6 +72,12 @@ ZooKeeper getZooKeeper() throws Exception
throw new SessionFailRetryLoop.SessionFailedException();
}
+ if ( lost.compareAndSet(true, false) )
+ {
+ log.debug("resetting after loss");
+ reset();
+ }
+
Exception exception = backgroundExceptions.poll();
if ( exception != null )
{
@@ -147,8 +154,17 @@ void removeParentWatcher(Watcher watcher)
parentWatchers.remove(watcher);
}
- private void reset() throws Exception
+ void markLost()
{
+ log.debug("lost");
+
+ lost.set(true);
+ }
+
+ void reset() throws Exception
+ {
+ log.debug("reset");
+
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
zooKeeper.closeAndReset();
View
9 curator-client/src/main/java/com/netflix/curator/CuratorZookeeperClient.java
@@ -183,6 +183,15 @@ public void start() throws Exception
}
/**
+ * Mark the connection as lost. The next time {@link #getZooKeeper()} is called,
+ * a new connection will be created.
+ */
+ public void markLost()
+ {
+ state.markLost();
+ }
+
+ /**
* Close the client
*/
public void close()
View
5 ...r-framework/src/main/java/com/netflix/curator/framework/state/ConnectionStateManager.java
@@ -141,6 +141,11 @@ public void addStateChange(ConnectionState newState)
return;
}
+ if ( newState == ConnectionState.LOST )
+ {
+ client.getZookeeperClient().markLost();
+ }
+
ConnectionState previousState = currentState.getAndSet(newState);
if ( previousState == newState )
{
View
77 curator-framework/src/test/java/com/netflix/curator/framework/imps/TestFrameworkEdges.java
@@ -18,6 +18,7 @@
package com.netflix.curator.framework.imps;
import com.google.common.collect.Queues;
+import com.google.common.io.Closeables;
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.RetrySleeper;
import com.netflix.curator.framework.CuratorFramework;
@@ -50,6 +51,54 @@
public class TestFrameworkEdges extends BaseClassForTests
{
@Test
+ public void testReconnectAfterLoss() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+
+ client.checkExists().forPath("/");
+
+ server.close();
+
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+ try
+ {
+ client.checkExists().forPath("/");
+ Assert.fail();
+ }
+ catch ( KeeperException.ConnectionLossException e )
+ {
+ // correct
+ }
+
+ server = new TestingServer(server.getPort());
+ client.checkExists().forPath("/");
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testGetAclNoStat() throws Exception
{
@@ -68,7 +117,7 @@ public void testGetAclNoStat() throws Exception
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -98,7 +147,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -117,7 +166,7 @@ public void testMissedResponseOnESCreate() throws Exception
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -149,7 +198,7 @@ public void process(WatchedEvent event)
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -188,7 +237,7 @@ else if ( event.getType() == CuratorEventType.CREATE )
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -226,7 +275,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -251,7 +300,7 @@ public void testFailure() throws Exception
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -312,7 +361,7 @@ public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleep
}
finally
{
- client.close();
+ Closeables.closeQuietly(client);
}
}
@@ -339,9 +388,15 @@ public void testNotStarted() throws Exception
public void testStopped() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
- client.getData();
- client.close();
+ try
+ {
+ client.start();
+ client.getData();
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
try
{
View
19 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestNodeCache.java
@@ -179,20 +179,19 @@ public void testKilledSession() throws Exception
final CountDownLatch latch = new CountDownLatch(1);
cache.getListenable().addListener
- (
- new NodeCacheListener()
+ (
+ new NodeCacheListener()
+ {
+ @Override
+ public void nodeChanged() throws Exception
{
- @Override
- public void nodeChanged() throws Exception
- {
- latch.countDown();
- }
+ latch.countDown();
}
- );
+ }
+ );
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- Thread.sleep(timing.multiple(2).session());
- Assert.assertTrue(timing.awaitLatch(latch));
+ Assert.assertTrue(timing.multiple(4).awaitLatch(latch));
}
finally
{
View
2  gradle.properties
@@ -14,4 +14,4 @@
# limitations under the License.
#
-version=1.2.7-SNAPSHOT
+version=1.3.0-SNAPSHOT

0 comments on commit 332bac2

Please sign in to comment.
Something went wrong with that request. Please try again.