Skip to content

Commit

Permalink
CURATOR-562 - Remove ConnectionHandlingPolicy - flatten out behavior …
Browse files Browse the repository at this point in the history
…to match old StandardConnectionHandlingPolicy - closes #348
  • Loading branch information
tisonkun authored and randgalt committed Mar 14, 2020
1 parent 959c1ca commit 009bfc4
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.curator;

import org.apache.curator.connection.ConnectionHandlingPolicy;
import org.apache.curator.drivers.EventTrace;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.drivers.TracerDriver;
Expand All @@ -27,7 +26,6 @@
import org.apache.curator.utils.DebugUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
Expand All @@ -36,7 +34,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -52,22 +49,16 @@ class ConnectionState implements Watcher, Closeable
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final AtomicInteger lastNegotiatedSessionTimeoutMs = new AtomicInteger(0);
private final EnsembleProvider ensembleProvider;
private final int sessionTimeoutMs;
private final int connectionTimeoutMs;
private final AtomicReference<TracerDriver> tracer;
private final ConnectionHandlingPolicy connectionHandlingPolicy;
private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
private final AtomicLong instanceIndex = new AtomicLong();
private volatile long connectionStartMs = 0;

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
Expand All @@ -93,7 +84,7 @@ ZooKeeper getZooKeeper() throws Exception
boolean localIsConnected = isConnected.get();
if ( !localIsConnected )
{
checkTimeouts();
checkNewConnectionString();
}

return handleHolder.getZooKeeper();
Expand Down Expand Up @@ -204,64 +195,13 @@ synchronized void reset() throws Exception
handleHolder.getZooKeeper(); // initiate connection
}

private synchronized void checkTimeouts() throws Exception
private synchronized void checkNewConnectionString()
{
final AtomicReference<String> newConnectionString = new AtomicReference<>();
Callable<String> hasNewConnectionString = new Callable<String>()
{
@Override
public String call()
{
newConnectionString.set(handleHolder.getNewConnectionString());
return newConnectionString.get();
}
};
int lastNegotiatedSessionTimeoutMs = getLastNegotiatedSessionTimeoutMs();
int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs;
ConnectionHandlingPolicy.CheckTimeoutsResult result = connectionHandlingPolicy.checkTimeouts(hasNewConnectionString, connectionStartMs, useSessionTimeoutMs, connectionTimeoutMs);
switch ( result )
{
default:
case NOP:
{
break;
}

case NEW_CONNECTION_STRING:
{
handleNewConnectionString(newConnectionString.get());
break;
}

case RESET_CONNECTION:
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
long elapsed = System.currentTimeMillis() - connectionStartMs;
int maxTimeout = Math.max(useSessionTimeoutMs, connectionTimeoutMs);
log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout));
}
reset();
break;
}
final String newConnectionString = handleHolder.getNewConnectionString();

case CONNECTION_TIMEOUT:
{
KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException();
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
long elapsed = System.currentTimeMillis() - connectionStartMs;
log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", handleHolder.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
}
new EventTrace("connections-timed-out", tracer.get(), getSessionId()).commit();
throw connectionLossException;
}

case SESSION_TIMEOUT:
{
handleExpiredSession();
break;
}
if (newConnectionString != null)
{
handleNewConnectionString(newConnectionString);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.curator;

import com.google.common.base.Preconditions;
import org.apache.curator.connection.ConnectionHandlingPolicy;
import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.drivers.TracerDriver;
import org.apache.curator.ensemble.EnsembleProvider;
Expand Down Expand Up @@ -55,7 +53,6 @@ public class CuratorZookeeperClient implements Closeable
private final int waitForShutdownTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
private final ConnectionHandlingPolicy connectionHandlingPolicy;

/**
*
Expand All @@ -67,7 +64,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}

/**
Expand All @@ -79,7 +76,7 @@ public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int co
*/
public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}

/**
Expand All @@ -96,27 +93,9 @@ public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTime
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new StandardConnectionHandlingPolicy());
this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, 0, watcher, retryPolicy, canBeReadOnly);
}

/**
* @param zookeeperFactory factory for creating {@link ZooKeeper} instances
* @param ensembleProvider the ensemble provider
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
* @param canBeReadOnly if true, allow ZooKeeper client to enter
* read only mode in case of a network partition. See
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
* for details
* @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own
* @since 3.0.0
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, 0,
watcher, retryPolicy, canBeReadOnly, connectionHandlingPolicy);
}
/**
* @param zookeeperFactory factory for creating {@link ZooKeeper} instances
* @param ensembleProvider the ensemble provider
Expand All @@ -129,14 +108,12 @@ public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvide
* read only mode in case of a network partition. See
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
* for details
* @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own
* @since 4.0.2
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
RetryPolicy retryPolicy, boolean canBeReadOnly)
{
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
Expand All @@ -147,7 +124,7 @@ public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvide

this.connectionTimeoutMs = connectionTimeoutMs;
this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}

Expand Down Expand Up @@ -376,16 +353,6 @@ public long getInstanceIndex()
return state.getInstanceIndex();
}

/**
* Return the configured connection handling policy
*
* @return ConnectionHandlingPolicy
*/
public ConnectionHandlingPolicy getConnectionHandlingPolicy()
{
return connectionHandlingPolicy;
}

/**
* Return the most recent value of {@link ZooKeeper#getSessionTimeout()} or 0
*
Expand Down
30 changes: 29 additions & 1 deletion curator-client/src/main/java/org/apache/curator/RetryLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.curator;

import org.apache.curator.connection.ThreadLocalRetryLoop;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import java.util.concurrent.Callable;

Expand Down Expand Up @@ -78,7 +80,33 @@ public static RetrySleeper getDefaultRetrySleeper()
*/
public static <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
{
return client.getConnectionHandlingPolicy().callWithRetry(client, proc);
client.internalBlockUntilConnectedOrTimedOut();

T result = null;
ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
RetryLoop retryLoop = threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
try
{
while ( retryLoop.shouldContinue() )
{
try
{
result = proc.call();
retryLoop.markComplete();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
retryLoop.takeException(e);
}
}
}
finally
{
threadLocalRetryLoop.release();
}

return result;
}

/**
Expand Down

This file was deleted.

Loading

0 comments on commit 009bfc4

Please sign in to comment.