Skip to content
Permalink
Browse files
CURATOR-561 Reset connection after repeat expiry
If there is a problem posting the Expired KeeperState
during a session expiration, then the ZooKeeper event
thread will die without ever posting the Expired event.
This would then cause curator to keep trying to expire
the connection but it does nothing because the connection
is dead and no events will ever be posted.

This can be prevented by forcibly resetting the connection
if it's detected that the previous expiry had no effect
  • Loading branch information
scott-kirk committed Nov 3, 2021
1 parent 1038aa6 commit e2d32bd00ec05a18bf149a47865fd807e00a60e1
Showing 2 changed files with 54 additions and 1 deletion.
@@ -77,6 +77,8 @@ public class ConnectionStateManager implements Closeable

private volatile long startOfSuspendedEpoch = 0;

private volatile long lastExpiredInstanceIndex = -1;

private enum State
{
LATENT,
@@ -318,7 +320,13 @@ private void checkSessionExpiration()
log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs));
try
{
client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
if (lastExpiredInstanceIndex == client.getZookeeperClient().getInstanceIndex()) {
// last expiration didn't work for this instance, so event thread is dead and a reset is needed. CURATOR-561
client.getZookeeperClient().reset();
} else {
lastExpiredInstanceIndex = client.getZookeeperClient().getInstanceIndex();
client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
}
}
catch ( Exception e )
{
@@ -18,18 +18,24 @@
*/
package org.apache.curator.framework.state;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.common.collect.Queues;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@@ -83,4 +89,43 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testConnectionStateRecoversFromUnexpectedExpiredConnection() throws Exception {
Timing2 timing = new Timing2();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.connectionTimeoutMs(1_000)
.sessionTimeoutMs(250) // try to aggressively expire the connection
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
.build();
final BlockingQueue<ConnectionState> queue = Queues.newLinkedBlockingQueue();
ConnectionStateListener listener = (client1, state) -> queue.add(state);
client.getConnectionStateListenable().addListener(listener);
client.start();
try {
ConnectionState polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertEquals(polled, ConnectionState.CONNECTED);
client.getZookeeperClient().getZooKeeper().getTestable().queueEvent(new WatchedEvent(
Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null));
polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertEquals(polled, ConnectionState.SUSPENDED);
assertThrows(RuntimeException.class, () -> client.getZookeeperClient()
.getZooKeeper().getTestable().queueEvent(new WatchedEvent(
Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null) {
@Override
public String getPath() {
// exception will cause ZooKeeper to update current state but fail to notify watchers
throw new RuntimeException("Path doesn't exist!");
}
}));
polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertEquals(polled, ConnectionState.LOST);
polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertEquals(polled, ConnectionState.RECONNECTED);
} finally {
CloseableUtils.closeQuietly(client);
}
}
}

0 comments on commit e2d32bd

Please sign in to comment.