diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java index 7044ddf884..d7bf6d8210 100644 --- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java +++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java @@ -18,6 +18,7 @@ */ package org.apache.curator; +import com.google.common.annotations.VisibleForTesting; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.drivers.EventTrace; import org.apache.curator.drivers.OperationTrace; @@ -56,6 +57,9 @@ class ConnectionState implements Watcher, Closeable private final AtomicLong instanceIndex = new AtomicLong(); private volatile long connectionStartMs = 0; + @VisibleForTesting + volatile boolean debugWaitOnExpiredEvent = false; + ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference tracer, boolean canBeReadOnly) { this.ensembleProvider = ensembleProvider; @@ -149,7 +153,9 @@ public void process(WatchedEvent event) log.debug("ConnectState watcher: " + event); } - if ( event.getType() == Watcher.Event.EventType.None ) + final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None; + + if ( eventTypeNone ) { boolean wasConnected = isConnected.get(); boolean newIsConnected = checkState(event.getState(), wasConnected); @@ -160,13 +166,32 @@ public void process(WatchedEvent event) } } - for ( Watcher parentWatcher : parentWatchers ) + // only wait during tests + if (debugWaitOnExpiredEvent && event.getState() == Event.KeeperState.Expired) { + waitOnExpiredEvent(); + } + for ( Watcher parentWatcher : parentWatchers ) + { OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId()); parentWatcher.process(event); trace.commit(); } + + if (eventTypeNone) handleState(event.getState()); + } + + // only for testing + private void waitOnExpiredEvent() + { + log.debug("Waiting on Expired event for testing"); + try + { + Thread.sleep(1000); + } + catch(InterruptedException e) {} + log.debug("Continue processing"); } EnsembleProvider getEnsembleProvider() @@ -240,11 +265,11 @@ private synchronized void reset() throws Exception private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; - boolean checkNewConnectionString = true; switch ( state ) { default: case Disconnected: + case Expired: { isConnected = false; break; @@ -264,14 +289,6 @@ private boolean checkState(Event.KeeperState state, boolean wasConnected) break; } - case Expired: - { - isConnected = false; - checkNewConnectionString = false; - handleExpiredSession(); - break; - } - case SaslAuthenticated: { // NOP @@ -283,12 +300,19 @@ private boolean checkState(Event.KeeperState state, boolean wasConnected) new EventTrace(state.toString(), tracer.get(), getSessionId()).commit(); } - if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() ) + return isConnected; + } + + private void handleState(Event.KeeperState state) + { + if (state == Event.KeeperState.Expired) + { + handleExpiredSession(); + } + else if (zooKeeper.hasNewConnectionString()) { handleNewConnectionString(); } - - return isConnected; } private void handleNewConnectionString() diff --git a/curator-framework/src/test/java/org/apache/curator/ConnectionStateAccessor.java b/curator-framework/src/test/java/org/apache/curator/ConnectionStateAccessor.java new file mode 100644 index 0000000000..0efa9891c0 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/ConnectionStateAccessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator; + +import org.apache.curator.framework.CuratorFramework; + +import java.lang.reflect.Field; + +public final class ConnectionStateAccessor +{ + public static void setDebugWaitOnExpiredForClient(CuratorFramework client) + { + CuratorZookeeperClient zookeeperClient = client.getZookeeperClient(); + ConnectionState state = (ConnectionState)getInternalState(zookeeperClient, "state"); + state.debugWaitOnExpiredEvent = true; + } + + private static Object getInternalState(Object target, String field) { + Class c = target.getClass(); + try { + Field f = c.getDeclaredField(field); + f.setAccessible(true); + return f.get(target); + } catch (Exception e) { + throw new RuntimeException("Unable to get internal state on a private field.", e); + } + } +} diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java index a6dc7abf7d..3597f95098 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java @@ -19,8 +19,13 @@ package org.apache.curator.framework.imps; +import org.apache.curator.ConnectionStateAccessor; +import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; @@ -28,6 +33,8 @@ import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.Watcher; +import org.mockito.internal.util.reflection.Whitebox; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Timer; @@ -256,4 +263,79 @@ public void testBlockUntilConnectedTightLoop() throws InterruptedException } } } + + /** + * Test that we got disconnected before calling blockUntilConnected and we reconnect we receive a session expired event. + */ + @Test + public void testBlockUntilConnectedSessionExpired() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + + final CountDownLatch lostLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + } + }); + + final CountDownLatch expiredLatch = new CountDownLatch(1); + client.getCuratorListenable().addListener(new CuratorListener() { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { + if (event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired) + { + expiredLatch.countDown(); + } + } + }); + + ConnectionStateAccessor.setDebugWaitOnExpiredForClient(client); + + try + { + client.start(); + + //Block until we're connected + Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect"); + + final long sessionTimeoutMs = client.getZookeeperClient().getConnectionTimeoutMs(); + + //Kill the server + CloseableUtils.closeQuietly(server); + + //Wait until we hit the lost state + Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state"); + + Thread.sleep(sessionTimeoutMs); + + server = new TestingServer(server.getPort(), server.getTempDirectory()); + + //Wait until we get expired event + Assert.assertTrue(timing.awaitLatch(expiredLatch), "Failed to get Expired event"); + + final boolean blockUntilConnected5Seconds = client.blockUntilConnected(5, TimeUnit.SECONDS); + Assert.assertTrue(client.getZookeeperClient().isConnected(), "ConnectionState.isConnected returned false"); + Assert.assertTrue(blockUntilConnected5Seconds, "BlockUntilConnected returned false"); + } + catch ( Exception e ) + { + Assert.fail("Unexpected exception " + e); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } }