Skip to content

Commit

Permalink
Merge pull request #320 from apache/CURATOR-505-improve-circuit-break…
Browse files Browse the repository at this point in the history
…er-to-shared

[CURATOR-533] - improve circuit breaking behavior
  • Loading branch information
shayshim committed Aug 12, 2019
2 parents 0e4994b + 066ff40 commit c670844
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
Expand Down Expand Up @@ -153,7 +153,7 @@ public static class Builder
private boolean zk34CompatibilityMode = isZK34();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ConnectionStateListenerDecorator connectionStateListenerDecorator = ConnectionStateListenerDecorator.standard;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;

/**
* Apply the current values and build a new CuratorFramework
Expand Down Expand Up @@ -498,18 +498,16 @@ public Builder runSafeService(Executor runSafeService)
}

/**
* Sets the connection state listener decorator. For example,
* you can set {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
* via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)}
* or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)}
* Sets the connection state listener manager factory. For example,
* you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
*
* @param connectionStateListenerDecorator decorator to use
* @param connectionStateListenerManagerFactory manager factory to use
* @return this
* @since 4.2.0
*/
public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator)
public Builder connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory)
{
this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
this.connectionStateListenerManagerFactory = Objects.requireNonNull(connectionStateListenerManagerFactory, "connectionStateListenerManagerFactory cannot be null");
return this;
}

Expand Down Expand Up @@ -660,9 +658,9 @@ public boolean canBeReadOnly()
return canBeReadOnly;
}

public ConnectionStateListenerDecorator getConnectionStateListenerDecorator()
public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFactory()
{
return connectionStateListenerDecorator;
return connectionStateListenerManagerFactory;
}

private Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void process(WatchedEvent watchedEvent)
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
Expand Down Expand Up @@ -327,7 +327,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
}

@Override
public boolean doNotDecorate()
public boolean doNotProxy()
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
}

@Override
public boolean doNotDecorate()
public boolean doNotProxy()
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public interface ListenerManager<K, V> extends Listenable<K>
* @param function function to call for each listener
*/
void forEach(Consumer<V> function);

default boolean isEmpty()
{
return size() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Non mapping version of a listener container
*/
public class StandardListenerManager<T> implements ListenerManager<T, T>
public class StandardListenerManager<T> implements UnaryListenerManager<T>
{
private final ListenerManager<T, T> container;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.listen;

/**
* A {@link ListenerManager} that doesn't do any mapping
*/
public interface UnaryListenerManager<T> extends ListenerManager<T, T>
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.apache.curator.utils.ThreadUtils;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

// must be guarded by sync
class CircuitBreaker
{
private final RetryPolicy retryPolicy;
Expand All @@ -35,12 +35,18 @@ class CircuitBreaker
private int retryCount = 0;
private long startNanos = 0;

CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
static CircuitBreaker build(RetryPolicy retryPolicy)
{
this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
this.service = Objects.requireNonNull(service, "service cannot be null");
return new CircuitBreaker(retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
}

static CircuitBreaker build(RetryPolicy retryPolicy, ScheduledExecutorService service)
{
return new CircuitBreaker(retryPolicy, service);
}

// IMPORTANT - all methods below MUST be guarded by synchronization

boolean isOpen()
{
return isOpen;
Expand Down Expand Up @@ -96,4 +102,10 @@ boolean close()
startNanos = 0;
return wasOpen;
}

private CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
{
this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
this.service = Objects.requireNonNull(service, "service cannot be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;

/**
* <p>
* A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
* A proxy for connection state listeners that adds circuit breaking behavior. During network
* outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
* Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
* its lock node and try to recreate it in order to try to re-obtain leadership, etc.
* </p>
*
* <p>
* This noisy herding can be avoided by using the circuit breaking listener decorator. When it
* This noisy herding can be avoided by using the circuit breaking listener. When it
* receives {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit
* becomes "open" (based on the provided {@link org.apache.curator.RetryPolicy}) and will ignore
* future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
Expand All @@ -44,17 +43,34 @@
* </p>
*
* <p>
* When the circuit decorator is closed, all connection state changes are forwarded to the managed
* When the circuit is closed, all connection state changes are forwarded to the managed
* listener. When the first disconnected state is received, the circuit becomes open. The state change
* that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to
* get a delay amount. While the delay is active, the decorator will store state changes but will not
* get a delay amount. While the delay is active, the circuit breaker will store state changes but will not
* forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST).
* When the delay elapses, if the connection has been restored, the circuit closes and forwards the
* new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked
* again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however, the
* RetryPolicy indicates that retries are exhausted then the circuit closes - if the current state
* is different than the state that caused the circuit to open it is forwarded to the managed listener.
* </p>
*
* <p>
* <strong>NOTE:</strong> You should not use this listener directly. Instead, set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
* in the {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}.
* </p>
*
* <p>
* E.g.
* <code><pre>
* ConnectionStateListenerManagerFactory factory = ConnectionStateListenerManagerFactory.circuitBreaking(...retry policy for circuit breaking...);
* CuratorFramework client = CuratorFrameworkFactory.builder()
* .connectionStateListenerManagerFactory(factory)
* ... etc ...
* .build();
* // all connection state listeners set for "client" will get circuit breaking behavior
* </pre></code>
* </p>
*/
public class CircuitBreakingConnectionStateListener implements ConnectionStateListener
{
Expand All @@ -77,7 +93,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
*/
public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy)
{
this(client, listener, retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
this(client, listener, CircuitBreaker.build(retryPolicy));
}

/**
Expand All @@ -88,9 +104,14 @@ public CircuitBreakingConnectionStateListener(CuratorFramework client, Connectio
*/
public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy, ScheduledExecutorService service)
{
this.client = client;
this(client, listener, CircuitBreaker.build(retryPolicy, service));
}

CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, CircuitBreaker circuitBreaker)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.listener = Objects.requireNonNull(listener, "listener cannot be null");
circuitBreaker = new CircuitBreaker(retryPolicy, service);
this.circuitBreaker = Objects.requireNonNull(circuitBreaker, "circuitBreaker cannot be null");
reset();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.state;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.listen.UnaryListenerManager;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

class CircuitBreakingManager implements UnaryListenerManager<ConnectionStateListener>
{
private final StandardListenerManager<ConnectionStateListener> mainContainer = StandardListenerManager.standard();
private final StandardListenerManager<ConnectionStateListener> doNotProxyContainer = StandardListenerManager.standard();
private final CircuitBreakingConnectionStateListener masterListener;

CircuitBreakingManager(CuratorFramework client, CircuitBreaker circuitBreaker)
{
ConnectionStateListener masterStateChanged = (__, newState) -> mainContainer.forEach(listener -> listener.stateChanged(client, newState));
masterListener = new CircuitBreakingConnectionStateListener(client, masterStateChanged, circuitBreaker);
}

@Override
public void clear()
{
doNotProxyContainer.clear();
mainContainer.clear();
}

@Override
public int size()
{
return mainContainer.size() + doNotProxyContainer.size();
}

@Override
public void forEach(Consumer<ConnectionStateListener> function)
{
doNotProxyContainer.forEach(function);
function.accept(masterListener);
}

@Override
public void addListener(ConnectionStateListener listener)
{
if ( listener.doNotProxy() )
{
doNotProxyContainer.addListener(listener);
}
else
{
mainContainer.addListener(listener);
}
}

@Override
public void addListener(ConnectionStateListener listener, Executor executor)
{
if ( listener.doNotProxy() )
{
doNotProxyContainer.addListener(listener, executor);
}
else
{
mainContainer.addListener(listener, executor);
}
}

@Override
public void removeListener(ConnectionStateListener listener)
{
mainContainer.removeListener(listener);
doNotProxyContainer.removeListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public interface ConnectionStateListener
void stateChanged(CuratorFramework client, ConnectionState newState);

/**
* Normally, ConnectionStateListeners are decorated via the configured
* {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain
* critical cases, however, this is not desired. If your listener returns <code>true</code>
* for doNotDecorate(), it will not be passed through the decorator.
* ConnectionStateListener managers set via {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}
* are allowed to proxy (etc.) ConnectionStateListeners as needed. If this method returns <code>true</code>
* the ConnectionStateListener manager must <em>not</em> proxy the listener as it's a vital internal
* listener used by Curator.
*
* @return true/false
*/
default boolean doNotDecorate()
default boolean doNotProxy()
{
return false;
}
Expand Down

0 comments on commit c670844

Please sign in to comment.