From 066ff40e2e99fadffb1a95be72c934369e3c1011 Mon Sep 17 00:00:00 2001 From: randgalt Date: Sun, 28 Jul 2019 01:01:13 -0500 Subject: [PATCH] CURATOR-533 CURATOR-505 introduced circuit breaking behavior via CircuitBreakingConnectionStateListener and ConnectionStateListenerDecorator. Elastic has been using it to success but reports that the implementation can be improved. The existing implementation uses a new CircuitBreaker for each ConnectionStateListener set in a Curator client. It turns out that this is not ideal. Instead, a shared CircuitBreaker should be used per Curator client. Unfortunately, the best way to do this is to remove the ConnectionStateListenerDecorator semantics and use a different mechanism. This Issue proposes to do this and remove ConnectionStateListenerDecorator. This is a breaking change but given the short amount of time it's been in Curator it's unlikely that it's been widely adopted. In this commit, ConnectionStateListenerDecorator is removed in favor of ConnectionStateListenerManagerFactory. ConnectionStateManager uses this factory to create the container to hold registered ConnectionStateListeners. A new CircuitBreakerManager now manages the circuit breaking behavior using a shared CircuitBreaker. --- .../framework/CuratorFrameworkFactory.java | 20 ++-- .../framework/imps/CuratorFrameworkImpl.java | 4 +- .../framework/imps/EnsembleTracker.java | 2 +- .../framework/listen/ListenerManager.java | 5 + .../listen/StandardListenerManager.java | 2 +- .../listen/UnaryListenerManager.java | 26 +++++ .../framework/state/CircuitBreaker.java | 20 +++- ...ircuitBreakingConnectionStateListener.java | 37 +++++-- .../state/CircuitBreakingManager.java | 91 +++++++++++++++ .../state/ConnectionStateListener.java | 10 +- .../ConnectionStateListenerDecorator.java | 81 -------------- ...ConnectionStateListenerManagerFactory.java | 69 ++++++++++++ .../state/ConnectionStateManager.java | 14 +-- .../framework/state/TestCircuitBreaker.java | 6 +- .../framework/recipes/leader/LeaderLatch.java | 3 +- .../recipes/leader/TestLeaderLatch.java | 104 ++++++++++++++---- src/site/confluence/utilities.confluence | 16 +-- 17 files changed, 357 insertions(+), 153 deletions(-) create mode 100644 curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java create mode 100644 curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java delete mode 100644 curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java create mode 100644 curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index a5c08ff8da..887a2aa96b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -36,7 +36,7 @@ import org.apache.curator.framework.schema.SchemaSet; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; -import org.apache.curator.framework.state.ConnectionStateListenerDecorator; +import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory; import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy; import org.apache.curator.utils.DefaultZookeeperFactory; import org.apache.curator.utils.ZookeeperFactory; @@ -153,7 +153,7 @@ public static class Builder private boolean zk34CompatibilityMode = isZK34(); private int waitForShutdownTimeoutMs = 0; private Executor runSafeService = null; - private ConnectionStateListenerDecorator connectionStateListenerDecorator = ConnectionStateListenerDecorator.standard; + private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard; /** * Apply the current values and build a new CuratorFramework @@ -498,18 +498,16 @@ public Builder runSafeService(Executor runSafeService) } /** - * Sets the connection state listener decorator. For example, - * you can set {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s - * via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)} - * or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)} + * Sets the connection state listener manager factory. For example, + * you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)} * - * @param connectionStateListenerDecorator decorator to use + * @param connectionStateListenerManagerFactory manager factory to use * @return this * @since 4.2.0 */ - public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator) + public Builder connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory) { - this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null"); + this.connectionStateListenerManagerFactory = Objects.requireNonNull(connectionStateListenerManagerFactory, "connectionStateListenerManagerFactory cannot be null"); return this; } @@ -660,9 +658,9 @@ public boolean canBeReadOnly() return canBeReadOnly; } - public ConnectionStateListenerDecorator getConnectionStateListenerDecorator() + public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFactory() { - return connectionStateListenerDecorator; + return connectionStateListenerManagerFactory; } private Builder() diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index d9c34240fe..e003bf03c2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -139,7 +139,7 @@ public void process(WatchedEvent watchedEvent) namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); @@ -327,7 +327,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) } @Override - public boolean doNotDecorate() + public boolean doNotProxy() { return true; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java index 8ca63f6f5d..b2c55f6937 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -74,7 +74,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) } @Override - public boolean doNotDecorate() + public boolean doNotProxy() { return true; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java index cab04264e4..85bc8f9093 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java @@ -41,4 +41,9 @@ public interface ListenerManager extends Listenable * @param function function to call for each listener */ void forEach(Consumer function); + + default boolean isEmpty() + { + return size() == 0; + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java index e07fe47ba1..8213967336 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java @@ -26,7 +26,7 @@ /** * Non mapping version of a listener container */ -public class StandardListenerManager implements ListenerManager +public class StandardListenerManager implements UnaryListenerManager { private final ListenerManager container; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java new file mode 100644 index 0000000000..54497f4a5e --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.listen; + +/** + * A {@link ListenerManager} that doesn't do any mapping + */ +public interface UnaryListenerManager extends ListenerManager +{ +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java index 03e44f8121..c207128d42 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java @@ -20,12 +20,12 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.RetrySleeper; +import org.apache.curator.utils.ThreadUtils; import java.time.Duration; import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -// must be guarded by sync class CircuitBreaker { private final RetryPolicy retryPolicy; @@ -35,12 +35,18 @@ class CircuitBreaker private int retryCount = 0; private long startNanos = 0; - CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service) + static CircuitBreaker build(RetryPolicy retryPolicy) { - this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null"); - this.service = Objects.requireNonNull(service, "service cannot be null"); + return new CircuitBreaker(retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener")); + } + + static CircuitBreaker build(RetryPolicy retryPolicy, ScheduledExecutorService service) + { + return new CircuitBreaker(retryPolicy, service); } + // IMPORTANT - all methods below MUST be guarded by synchronization + boolean isOpen() { return isOpen; @@ -96,4 +102,10 @@ boolean close() startNanos = 0; return wasOpen; } + + private CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service) + { + this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null"); + this.service = Objects.requireNonNull(service, "service cannot be null"); + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java index 24eba01818..beec66fd25 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java @@ -20,7 +20,6 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; @@ -28,14 +27,14 @@ /** *

- * A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network + * A proxy for connection state listeners that adds circuit breaking behavior. During network * outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. * Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete * its lock node and try to recreate it in order to try to re-obtain leadership, etc. *

* *

- * This noisy herding can be avoided by using the circuit breaking listener decorator. When it + * This noisy herding can be avoided by using the circuit breaking listener. When it * receives {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit * becomes "open" (based on the provided {@link org.apache.curator.RetryPolicy}) and will ignore * future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection @@ -44,10 +43,10 @@ *

* *

- * When the circuit decorator is closed, all connection state changes are forwarded to the managed + * When the circuit is closed, all connection state changes are forwarded to the managed * listener. When the first disconnected state is received, the circuit becomes open. The state change * that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to - * get a delay amount. While the delay is active, the decorator will store state changes but will not + * get a delay amount. While the delay is active, the circuit breaker will store state changes but will not * forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST). * When the delay elapses, if the connection has been restored, the circuit closes and forwards the * new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked @@ -55,6 +54,23 @@ * RetryPolicy indicates that retries are exhausted then the circuit closes - if the current state * is different than the state that caused the circuit to open it is forwarded to the managed listener. *

+ * + *

+ * NOTE: You should not use this listener directly. Instead, set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)} + * in the {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}. + *

+ * + *

+ * E.g. + *

+ * ConnectionStateListenerManagerFactory factory = ConnectionStateListenerManagerFactory.circuitBreaking(...retry policy for circuit breaking...);
+ * CuratorFramework client = CuratorFrameworkFactory.builder()
+ *     .connectionStateListenerManagerFactory(factory)
+ *     ... etc ...
+ *     .build();
+ * // all connection state listeners set for "client" will get circuit breaking behavior
+ * 
+ *

*/ public class CircuitBreakingConnectionStateListener implements ConnectionStateListener { @@ -77,7 +93,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi */ public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy) { - this(client, listener, retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener")); + this(client, listener, CircuitBreaker.build(retryPolicy)); } /** @@ -88,9 +104,14 @@ public CircuitBreakingConnectionStateListener(CuratorFramework client, Connectio */ public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy, ScheduledExecutorService service) { - this.client = client; + this(client, listener, CircuitBreaker.build(retryPolicy, service)); + } + + CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, CircuitBreaker circuitBreaker) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); this.listener = Objects.requireNonNull(listener, "listener cannot be null"); - circuitBreaker = new CircuitBreaker(retryPolicy, service); + this.circuitBreaker = Objects.requireNonNull(circuitBreaker, "circuitBreaker cannot be null"); reset(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java new file mode 100644 index 0000000000..0d9f7ebf8e --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.state; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.listen.UnaryListenerManager; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +class CircuitBreakingManager implements UnaryListenerManager +{ + private final StandardListenerManager mainContainer = StandardListenerManager.standard(); + private final StandardListenerManager doNotProxyContainer = StandardListenerManager.standard(); + private final CircuitBreakingConnectionStateListener masterListener; + + CircuitBreakingManager(CuratorFramework client, CircuitBreaker circuitBreaker) + { + ConnectionStateListener masterStateChanged = (__, newState) -> mainContainer.forEach(listener -> listener.stateChanged(client, newState)); + masterListener = new CircuitBreakingConnectionStateListener(client, masterStateChanged, circuitBreaker); + } + + @Override + public void clear() + { + doNotProxyContainer.clear(); + mainContainer.clear(); + } + + @Override + public int size() + { + return mainContainer.size() + doNotProxyContainer.size(); + } + + @Override + public void forEach(Consumer function) + { + doNotProxyContainer.forEach(function); + function.accept(masterListener); + } + + @Override + public void addListener(ConnectionStateListener listener) + { + if ( listener.doNotProxy() ) + { + doNotProxyContainer.addListener(listener); + } + else + { + mainContainer.addListener(listener); + } + } + + @Override + public void addListener(ConnectionStateListener listener, Executor executor) + { + if ( listener.doNotProxy() ) + { + doNotProxyContainer.addListener(listener, executor); + } + else + { + mainContainer.addListener(listener, executor); + } + } + + @Override + public void removeListener(ConnectionStateListener listener) + { + mainContainer.removeListener(listener); + doNotProxyContainer.removeListener(listener); + } +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java index 71635d0622..d626e3f5bf 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java @@ -31,14 +31,14 @@ public interface ConnectionStateListener void stateChanged(CuratorFramework client, ConnectionState newState); /** - * Normally, ConnectionStateListeners are decorated via the configured - * {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain - * critical cases, however, this is not desired. If your listener returns true - * for doNotDecorate(), it will not be passed through the decorator. + * ConnectionStateListener managers set via {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)} + * are allowed to proxy (etc.) ConnectionStateListeners as needed. If this method returns true + * the ConnectionStateListener manager must not proxy the listener as it's a vital internal + * listener used by Curator. * * @return true/false */ - default boolean doNotDecorate() + default boolean doNotProxy() { return false; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java deleted file mode 100644 index b95c4b3b3e..0000000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.state; - -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import java.util.concurrent.ScheduledExecutorService; - -/** - *

- * Allows for the enhancement of the {@link org.apache.curator.framework.state.ConnectionStateListener} instances - * used with Curator. Client code that sets a ConnectionStateListener should always wrap it using the configured - * ConnectionStateListenerDecorator. All Curator recipes do this. - *

- * - *

- * E.g. - * - *

- * CuratorFramework client ...
- * ConnectionStateListener listener = ...
- * ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
- *
- * ...
- *
- * client.getConnectionStateListenable().addListener(decoratedListener);
- *
- * // later, to remove...
- * client.getConnectionStateListenable().removeListener(decoratedListener);
- * 
- *

- */ -@FunctionalInterface -public interface ConnectionStateListenerDecorator -{ - ConnectionStateListener decorateListener(CuratorFramework client, ConnectionStateListener actual); - - /** - * Pass through - does no decoration - */ - ConnectionStateListenerDecorator standard = (__, actual) -> actual; - - /** - * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} - * - * @param retryPolicy the circuit breaking policy to use - * @return new decorator - */ - static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy) - { - return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy); - } - - /** - * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} - * - * @param retryPolicy the circuit breaking policy to use - * @param service the scheduler to use - * @return new decorator - */ - static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service) - { - return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy, service); - } -} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java new file mode 100644 index 0000000000..8c6497a784 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.state; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.listen.UnaryListenerManager; +import java.util.concurrent.ScheduledExecutorService; + +@FunctionalInterface +public interface ConnectionStateListenerManagerFactory +{ + /** + * Create a new listener manager + * + * @param client curator client + * @return manager + */ + UnaryListenerManager newManager(CuratorFramework client); + + /** + * Pass through + */ + ConnectionStateListenerManagerFactory standard = (__) -> StandardListenerManager.standard(); + + /** + * Listeners set in this manager receive circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} + * as a master listener that proxies to any listener registered by client code (unless the listener returns true + * for {@link ConnectionStateListener#doNotProxy()}). + * + * @param retryPolicy the circuit breaking policy to use + * @return new listener manager factory + */ + static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy) + { + return client -> new CircuitBreakingManager(client, CircuitBreaker.build(retryPolicy)); + } + + /** + * Listeners set in this manager receive circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} + * as a master listener that proxies to any listener registered by client code (unless the listener returns true + * for {@link ConnectionStateListener#doNotProxy()}). + * + * @param retryPolicy the circuit breaking policy to use + * @param service the scheduler to use + * @return new listener manager factory + */ + static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service) + { + return client -> new CircuitBreakingManager(client, CircuitBreaker.build(retryPolicy, service)); + } +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 55e17c85ec..46325d20ed 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -22,7 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.listen.UnaryListenerManager; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; @@ -71,7 +71,7 @@ public class ConnectionStateManager implements Closeable private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; private final AtomicReference state = new AtomicReference(State.LATENT); - private final StandardListenerManager listeners; + private final UnaryListenerManager listeners; // guarded by sync private ConnectionState currentConnectionState; @@ -93,7 +93,7 @@ private enum State */ public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent) { - this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerDecorator.standard); + this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerManagerFactory.standard); } /** @@ -101,9 +101,9 @@ public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFacto * @param threadFactory thread factory to use or null for a default * @param sessionTimeoutMs the ZK session timeout in milliseconds * @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all - * @param connectionStateListenerDecorator the decorator to use + * @param managerFactory manager factory to use */ - public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerManagerFactory managerFactory) { this.client = client; this.sessionTimeoutMs = sessionTimeoutMs; @@ -113,7 +113,7 @@ public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFacto threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); } service = Executors.newSingleThreadExecutor(threadFactory); - listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener)); + listeners = managerFactory.newManager(client); } /** @@ -272,7 +272,7 @@ private void processEvents() final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS); if ( newState != null ) { - if ( listeners.size() == 0 ) + if ( listeners.isEmpty() ) { log.warn("There are no ConnectionStateListeners registered."); } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java index f4096cbccb..bee917ec94 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java @@ -58,7 +58,7 @@ public void testBasic() final int retryQty = 1; final Duration delay = Duration.ofSeconds(10); - CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service); + CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryNTimes(retryQty, (int)delay.toMillis()), service); AtomicInteger counter = new AtomicInteger(0); Assert.assertTrue(circuitBreaker.tryToOpen(counter::incrementAndGet)); @@ -79,7 +79,7 @@ public void testBasic() @Test public void testVariousOpenRetryFails() { - CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service); + CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryForever(1), service); Assert.assertFalse(circuitBreaker.tryToRetry(() -> {})); Assert.assertTrue(circuitBreaker.tryToOpen(() -> {})); Assert.assertFalse(circuitBreaker.tryToOpen(() -> {})); @@ -91,7 +91,7 @@ public void testVariousOpenRetryFails() public void testWithRetryUntilElapsed() { RetryPolicy retryPolicy = new RetryUntilElapsed(10000, 10000); - CircuitBreaker circuitBreaker = new CircuitBreaker(retryPolicy, service); + CircuitBreaker circuitBreaker = CircuitBreaker.build(retryPolicy, service); Assert.assertTrue(circuitBreaker.tryToOpen(() -> {})); Assert.assertEquals(lastDelay[0], Duration.ofMillis(10000)); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 7107efac02..a0b2187374 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -622,7 +622,8 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } - private void handleStateChange(ConnectionState newState) + @VisibleForTesting + protected void handleStateChange(ConnectionState newState) { switch ( newState ) { diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 9717302713..3d9e9b7a20 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -28,7 +28,7 @@ import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.framework.state.ConnectionStateListenerDecorator; +import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory; import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy; import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy; import org.apache.curator.retry.RetryForever; @@ -43,6 +43,7 @@ import org.testng.annotations.Test; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -50,53 +51,114 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class TestLeaderLatch extends BaseClassForTests { private static final String PATH_NAME = "/one/two/me"; private static final int MAX_LOOPS = 5; + private static class Holder + { + final BlockingQueue stateChanges = new LinkedBlockingQueue<>(); + final CountDownLatch isLockedLatch = new CountDownLatch(1); + volatile LeaderLatch latch; + } + @Test public void testWithCircuitBreaker() throws Exception { + final int threadQty = 5; + + ExecutorService executorService = Executors.newFixedThreadPool(threadQty); + List holders = Collections.emptyList(); Timing2 timing = new Timing2(); - ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds())); - try ( CuratorFramework client = CuratorFrameworkFactory.builder() + ConnectionStateListenerManagerFactory managerFactory = ConnectionStateListenerManagerFactory.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds())); + CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) - .connectionStateListenerDecorator(decorator) + .connectionStateListenerManagerFactory(managerFactory) .connectionTimeoutMs(timing.connection()) .sessionTimeoutMs(timing.session()) - .build() ) - { + .build(); + try { client.start(); - AtomicInteger resetCount = new AtomicInteger(0); - try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar") + client.create().forPath("/hey"); + + Semaphore lostSemaphore = new Semaphore(0); + ConnectionStateListener unProxiedListener = new ConnectionStateListener() { @Override - void reset() throws Exception + public void stateChanged(CuratorFramework client, ConnectionState newState) { - resetCount.incrementAndGet(); - super.reset(); + if ( newState == ConnectionState.LOST ) + { + lostSemaphore.release(); + } } - } ) - { - latch.start(); - Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - for ( int i = 0; i < 5; ++i ) + @Override + public boolean doNotProxy() { - server.stop(); - server.restart(); - timing.sleepABit(); + return true; } - Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - Assert.assertEquals(resetCount.get(), 2); + }; + client.getConnectionStateListenable().addListener(unProxiedListener); + + holders = IntStream.range(0, threadQty) + .mapToObj(index -> { + Holder holder = new Holder(); + holder.latch = new LeaderLatch(client, "/foo/bar/" + index) + { + @Override + protected void handleStateChange(ConnectionState newState) + { + holder.stateChanges.offer(newState); + super.handleStateChange(newState); + } + }; + return holder; + }) + .collect(Collectors.toList()); + + holders.forEach(holder -> { + executorService.submit(() -> { + holder.latch.start(); + Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + holder.isLockedLatch.countDown(); + return null; + }); + timing.awaitLatch(holder.isLockedLatch); + }); + + for ( int i = 0; i < 4; ++i ) // note: 4 is just a random number of loops to simulate disconnections + { + server.stop(); + Assert.assertTrue(timing.acquireSemaphore(lostSemaphore)); + server.restart(); + timing.sleepABit(); } + + for ( Holder holder : holders ) + { + Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + Assert.assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.SUSPENDED); + Assert.assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.LOST); + Assert.assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.RECONNECTED); + } + } + finally + { + holders.forEach(holder -> CloseableUtils.closeQuietly(holder.latch)); + CloseableUtils.closeQuietly(client); + executorService.shutdownNow(); } } diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence index 720d8d94ab..2bd7ac1252 100644 --- a/src/site/confluence/utilities.confluence +++ b/src/site/confluence/utilities.confluence @@ -19,13 +19,13 @@ h2. Circuit Breaking ConnectionStateListener During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc. -This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open" +This noisy herding can be avoided by using the circuit breaking listener. When it receives ConnectionState.SUSPENDED, the circuit becomes "open" (based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent. -When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the +When the circuit is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount. -While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state +While the delay is active, the circuit breaker will store state changes but will not forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however, the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is @@ -34,12 +34,12 @@ forwarded to the managed listener. You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. E.g. {code} -ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...); +ConnectionStateListenerManagerFactory factory = ConnectionStateListenerManagerFactory.circuitBreaking(...retry policy for circuit breaking...); CuratorFramework client = CuratorFrameworkFactory.builder() - ... - .connectionStateListenerDecorator(decorator) - ... - .build(); + .connectionStateListenerManagerFactory(factory) + ... etc ... + .build(); +// all connection state listeners set for "client" will get circuit breaking behavior {code} h2. Locker