Skip to content
Permalink
Browse files
CURATOR-533
CURATOR-505 introduced circuit breaking behavior via CircuitBreakingConnectionStateListener and ConnectionStateListenerDecorator. Elastic has been using it to success but reports that the implementation can be improved. The existing implementation uses a new CircuitBreaker for each ConnectionStateListener set in a Curator client. It turns out that this is not ideal. Instead, a shared CircuitBreaker should be used per Curator client.

Unfortunately, the best way to do this is to remove the ConnectionStateListenerDecorator semantics and use a different mechanism. This Issue proposes to do this and remove ConnectionStateListenerDecorator. This is a breaking change but given the short amount of time it's been in Curator it's unlikely that it's been widely adopted.

In this commit, ConnectionStateListenerDecorator is removed in favor of ConnectionStateListenerManagerFactory. ConnectionStateManager uses this factory to create the container to hold registered ConnectionStateListeners. A new CircuitBreakerManager now manages the circuit breaking behavior using a shared CircuitBreaker.
  • Loading branch information
randgalt committed Aug 12, 2019
1 parent b5eea4c commit 066ff40e2e99fadffb1a95be72c934369e3c1011
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 153 deletions.
@@ -36,7 +36,7 @@
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
@@ -153,7 +153,7 @@ public static byte[] getLocalAddress()
private boolean zk34CompatibilityMode = isZK34();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ConnectionStateListenerDecorator connectionStateListenerDecorator = ConnectionStateListenerDecorator.standard;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;

/**
* Apply the current values and build a new CuratorFramework
@@ -498,18 +498,16 @@ public Builder runSafeService(Executor runSafeService)
}

/**
* Sets the connection state listener decorator. For example,
* you can set {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
* via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)}
* or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)}
* Sets the connection state listener manager factory. For example,
* you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
*
* @param connectionStateListenerDecorator decorator to use
* @param connectionStateListenerManagerFactory manager factory to use
* @return this
* @since 4.2.0
*/
public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator)
public Builder connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory)
{
this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
this.connectionStateListenerManagerFactory = Objects.requireNonNull(connectionStateListenerManagerFactory, "connectionStateListenerManagerFactory cannot be null");
return this;
}

@@ -660,9 +658,9 @@ public boolean canBeReadOnly()
return canBeReadOnly;
}

public ConnectionStateListenerDecorator getConnectionStateListenerDecorator()
public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFactory()
{
return connectionStateListenerDecorator;
return connectionStateListenerManagerFactory;
}

private Builder()
@@ -139,7 +139,7 @@ public void process(WatchedEvent watchedEvent)
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -327,7 +327,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
}

@Override
public boolean doNotDecorate()
public boolean doNotProxy()
{
return true;
}
@@ -74,7 +74,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
}

@Override
public boolean doNotDecorate()
public boolean doNotProxy()
{
return true;
}
@@ -41,4 +41,9 @@
* @param function function to call for each listener
*/
void forEach(Consumer<V> function);

default boolean isEmpty()
{
return size() == 0;
}
}
@@ -26,7 +26,7 @@
/**
* Non mapping version of a listener container
*/
public class StandardListenerManager<T> implements ListenerManager<T, T>
public class StandardListenerManager<T> implements UnaryListenerManager<T>
{
private final ListenerManager<T, T> container;

@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.listen;

/**
* A {@link ListenerManager} that doesn't do any mapping
*/
public interface UnaryListenerManager<T> extends ListenerManager<T, T>
{
}
@@ -20,12 +20,12 @@

import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.apache.curator.utils.ThreadUtils;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

// must be guarded by sync
class CircuitBreaker
{
private final RetryPolicy retryPolicy;
@@ -35,12 +35,18 @@
private int retryCount = 0;
private long startNanos = 0;

CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
static CircuitBreaker build(RetryPolicy retryPolicy)
{
this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
this.service = Objects.requireNonNull(service, "service cannot be null");
return new CircuitBreaker(retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
}

static CircuitBreaker build(RetryPolicy retryPolicy, ScheduledExecutorService service)
{
return new CircuitBreaker(retryPolicy, service);
}

// IMPORTANT - all methods below MUST be guarded by synchronization

boolean isOpen()
{
return isOpen;
@@ -96,4 +102,10 @@ boolean close()
startNanos = 0;
return wasOpen;
}

private CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
{
this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
this.service = Objects.requireNonNull(service, "service cannot be null");
}
}
@@ -20,22 +20,21 @@

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;

/**
* <p>
* A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
* A proxy for connection state listeners that adds circuit breaking behavior. During network
* outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
* Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
* its lock node and try to recreate it in order to try to re-obtain leadership, etc.
* </p>
*
* <p>
* This noisy herding can be avoided by using the circuit breaking listener decorator. When it
* This noisy herding can be avoided by using the circuit breaking listener. When it
* receives {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit
* becomes "open" (based on the provided {@link org.apache.curator.RetryPolicy}) and will ignore
* future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
@@ -44,17 +43,34 @@
* </p>
*
* <p>
* When the circuit decorator is closed, all connection state changes are forwarded to the managed
* When the circuit is closed, all connection state changes are forwarded to the managed
* listener. When the first disconnected state is received, the circuit becomes open. The state change
* that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to
* get a delay amount. While the delay is active, the decorator will store state changes but will not
* get a delay amount. While the delay is active, the circuit breaker will store state changes but will not
* forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST).
* When the delay elapses, if the connection has been restored, the circuit closes and forwards the
* new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked
* again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however, the
* RetryPolicy indicates that retries are exhausted then the circuit closes - if the current state
* is different than the state that caused the circuit to open it is forwarded to the managed listener.
* </p>
*
* <p>
* <strong>NOTE:</strong> You should not use this listener directly. Instead, set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
* in the {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}.
* </p>
*
* <p>
* E.g.
* <code><pre>
* ConnectionStateListenerManagerFactory factory = ConnectionStateListenerManagerFactory.circuitBreaking(...retry policy for circuit breaking...);
* CuratorFramework client = CuratorFrameworkFactory.builder()
* .connectionStateListenerManagerFactory(factory)
* ... etc ...
* .build();
* // all connection state listeners set for "client" will get circuit breaking behavior
* </pre></code>
* </p>
*/
public class CircuitBreakingConnectionStateListener implements ConnectionStateListener
{
@@ -77,7 +93,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
*/
public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy)
{
this(client, listener, retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
this(client, listener, CircuitBreaker.build(retryPolicy));
}

/**
@@ -88,9 +104,14 @@ public CircuitBreakingConnectionStateListener(CuratorFramework client, Connectio
*/
public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy, ScheduledExecutorService service)
{
this.client = client;
this(client, listener, CircuitBreaker.build(retryPolicy, service));
}

CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, CircuitBreaker circuitBreaker)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.listener = Objects.requireNonNull(listener, "listener cannot be null");
circuitBreaker = new CircuitBreaker(retryPolicy, service);
this.circuitBreaker = Objects.requireNonNull(circuitBreaker, "circuitBreaker cannot be null");
reset();
}

@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.state;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.listen.UnaryListenerManager;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

class CircuitBreakingManager implements UnaryListenerManager<ConnectionStateListener>
{
private final StandardListenerManager<ConnectionStateListener> mainContainer = StandardListenerManager.standard();
private final StandardListenerManager<ConnectionStateListener> doNotProxyContainer = StandardListenerManager.standard();
private final CircuitBreakingConnectionStateListener masterListener;

CircuitBreakingManager(CuratorFramework client, CircuitBreaker circuitBreaker)
{
ConnectionStateListener masterStateChanged = (__, newState) -> mainContainer.forEach(listener -> listener.stateChanged(client, newState));
masterListener = new CircuitBreakingConnectionStateListener(client, masterStateChanged, circuitBreaker);
}

@Override
public void clear()
{
doNotProxyContainer.clear();
mainContainer.clear();
}

@Override
public int size()
{
return mainContainer.size() + doNotProxyContainer.size();
}

@Override
public void forEach(Consumer<ConnectionStateListener> function)
{
doNotProxyContainer.forEach(function);
function.accept(masterListener);
}

@Override
public void addListener(ConnectionStateListener listener)
{
if ( listener.doNotProxy() )
{
doNotProxyContainer.addListener(listener);
}
else
{
mainContainer.addListener(listener);
}
}

@Override
public void addListener(ConnectionStateListener listener, Executor executor)
{
if ( listener.doNotProxy() )
{
doNotProxyContainer.addListener(listener, executor);
}
else
{
mainContainer.addListener(listener, executor);
}
}

@Override
public void removeListener(ConnectionStateListener listener)
{
mainContainer.removeListener(listener);
doNotProxyContainer.removeListener(listener);
}
}
@@ -31,14 +31,14 @@
void stateChanged(CuratorFramework client, ConnectionState newState);

/**
* Normally, ConnectionStateListeners are decorated via the configured
* {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain
* critical cases, however, this is not desired. If your listener returns <code>true</code>
* for doNotDecorate(), it will not be passed through the decorator.
* ConnectionStateListener managers set via {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}
* are allowed to proxy (etc.) ConnectionStateListeners as needed. If this method returns <code>true</code>
* the ConnectionStateListener manager must <em>not</em> proxy the listener as it's a vital internal
* listener used by Curator.
*
* @return true/false
*/
default boolean doNotDecorate()
default boolean doNotProxy()
{
return false;
}

0 comments on commit 066ff40

Please sign in to comment.