Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CURATOR-367] Delay reconnect on session expired #197

Merged
merged 5 commits into from Feb 28, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -149,7 +149,9 @@ public void process(WatchedEvent event)
log.debug("ConnectState watcher: " + event);
}

if ( event.getType() == Watcher.Event.EventType.None )
final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;

if ( eventTypeNone )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
Expand All @@ -160,13 +162,33 @@ public void process(WatchedEvent event)
}
}

// only wait during tests
assert waitOnExpiredEvent(event.getState());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this meant to work? Are you assuming that assertions will only be enabled in testing? If you need some logic to only be executed during testing shouldn't there be some sort of internal flag indicating this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, assuming assertions are enabled in testing. I've added the delay here wait for the reconnect to happen, but since now the reset is called later in the method it might no longer be necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it's reasonable to assume that assertions will only be turned on during testing. If you look at something like LeaderSelector, it has specific code in there to support unit testing (the debugLeadershipLatch variable).

If you can cause the problem to occur without this code though, then it should be removed. I had a quick play with it and I couldn't seem to reproduce it without this code though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed using a debug flag instead of assert. I added an accessor class to be able to set the flag in the package private ConnectionState class.

I cannot cause the problem to occur without this code.


for ( Watcher parentWatcher : parentWatchers )
{

OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}

if (eventTypeNone) handleState(event.getState());
}

// only for testing
private boolean waitOnExpiredEvent(Event.KeeperState currentState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the boolean return? The method only ever returns true? Is it purely so you can call it from the assert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - only so that it can be called form assert.

{
if (currentState == Event.KeeperState.Expired)
{
log.debug("Waiting on Expired event for testing");
try
{
Thread.sleep(1000);
}
catch(InterruptedException e) {}
log.debug("Continue processing");
}
return true;
}

EnsembleProvider getEnsembleProvider()
Expand Down Expand Up @@ -240,11 +262,11 @@ private synchronized void reset() throws Exception
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
case Expired:
{
isConnected = false;
break;
Expand All @@ -264,14 +286,6 @@ private boolean checkState(Event.KeeperState state, boolean wasConnected)
break;
}

case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}

case SaslAuthenticated:
{
// NOP
Expand All @@ -283,12 +297,19 @@ private boolean checkState(Event.KeeperState state, boolean wasConnected)
new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();
}

if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
return isConnected;
}

private void handleState(Event.KeeperState state)
{
if (state == Event.KeeperState.Expired)
{
handleExpiredSession();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost functionality here right? If there's a new connection string and an expired session we don't get the new connection string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets handled in the handleState() method, which is called from process()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handleState() that I'm concerned about. When state == Event.KeeperState.Expired the zooKeeper.hasNewConnectionString() isn't called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that true of the existing implementation though? If the state is expired then the checkNewConnectionString flag gets set to false so the handleNewConnectionString() method won't get called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh - I think you're right. The old version set checkNewConnectionString=false

}
else if (zooKeeper.hasNewConnectionString())
{
handleNewConnectionString();
}

return isConnected;
}

private void handleNewConnectionString()
Expand Down
Expand Up @@ -21,13 +21,17 @@

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Timer;
Expand Down Expand Up @@ -256,4 +260,75 @@ public void testBlockUntilConnectedTightLoop() throws InterruptedException
}
}
}

/**
* Test that we got disconnected before calling blockUntilConnected and we reconnect we receive a session expired event.
*/
@Test
public void testBlockUntilConnectedSessionExpired() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
retryPolicy(new RetryOneTime(1)).
build();

final CountDownLatch lostLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( newState == ConnectionState.LOST )
{
lostLatch.countDown();
}
}
});

final CountDownLatch expiredLatch = new CountDownLatch(1);
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired)
{
expiredLatch.countDown();
}
}
});

try
{
client.start();

//Block until we're connected
Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");

final long sessionTimeoutMs = client.getZookeeperClient().getConnectionTimeoutMs();

//Kill the server
CloseableUtils.closeQuietly(server);

//Wait until we hit the lost state
Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");

Thread.sleep(sessionTimeoutMs);

server = new TestingServer(server.getPort(), server.getTempDirectory());

//Wait until we get expired event
Assert.assertTrue(timing.awaitLatch(expiredLatch), "Failed to get Expired event");

Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
}
catch ( Exception e )
{
Assert.fail("Unexpected exception " + e);
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
}