From 0675e09a5d6122bff6b6543d80076b03d05d5f85 Mon Sep 17 00:00:00 2001 From: Brian Lininger Date: Thu, 16 Nov 2017 09:58:13 -0800 Subject: [PATCH 1/6] ZOOKEEPER-2849: Added BackoffStrategy interface with ExponentialBackoffStrategy as a concrete implementation. Enhanced QuorumCnxManager to utilize the provided BackoffStrategy to determine retry of port binding as opposed to a fixed interval/count. --- .../server/quorum/BackoffStrategy.java | 19 ++ .../quorum/ExponentialBackoffStrategy.java | 192 ++++++++++++++++++ .../server/quorum/QuorumCnxManager.java | 27 ++- .../zookeeper/server/quorum/QuorumPeer.java | 3 +- .../ExponentialBackoffStrategyTest.java | 163 +++++++++++++++ .../quorum/FLEBackwardElectionRoundTest.java | 4 +- .../server/quorum/FLELostMessageTest.java | 2 +- .../apache/zookeeper/test/CnxManagerTest.java | 13 +- .../zookeeper/test/FLEPredicateTest.java | 3 +- 9 files changed, 406 insertions(+), 20 deletions(-) create mode 100644 src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java create mode 100644 src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java create mode 100644 src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java diff --git a/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java b/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java new file mode 100644 index 00000000000..886f56327d4 --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java @@ -0,0 +1,19 @@ +package org.apache.zookeeper.server.quorum; + +public interface BackoffStrategy { + + long STOP = -1L; + + /** + * Get the number of milliseconds to wait before retrying the operation, + * or {@code BackoffStrategy.STOP} if no more retries should be made. + * @return the number of milliseconds to wait before retrying the operation. + * @throws IllegalStateException + */ + long nextWaitMillis() throws IllegalStateException; + + /** + * Reset to it's initial state. + */ + void reset(); +} diff --git a/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java b/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java new file mode 100644 index 00000000000..3aa307faba1 --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java @@ -0,0 +1,192 @@ +package org.apache.zookeeper.server.quorum; + +/** + * A {@link BackoffStrategy} that increases the wait time between each + * interval up to the configured maximum wait time. + */ +public class ExponentialBackoffStrategy implements BackoffStrategy { + + // Sensible default values to use if not set by the user + private static final long DEFAULT_INITIAL_BACKOFF_MILLIS = 500L; // 0.5s + private static final long DEFAULT_MAX_BACKOFF_MILLIS = 30_000L; // 30s + private static final long DEFAULT_MAX_ELAPSED_MILLIS = 5 * 60_000L; // 10m + private static final double DEFAULT_BACKOFF_MULTIPLE = 1.5; + + // internal values per instance + private final long initialBackoffMillis; + private final long maxBackoffMillis; + private final long maxElapsedMillis; + private final double backoffMultiple; + + // internal state + private long nextWait; + private long totalElapsed; + private final boolean limitBackoffMillis; + private final boolean checkElapsedTime; + + /** + * Construct a new instance. + * @param builder the Builder to use for configuring this BackoffStrategy + */ + private ExponentialBackoffStrategy(Builder builder) { + this.initialBackoffMillis = builder.initialBackoffMillis; + this.maxBackoffMillis = builder.maxBackoffMillis; + this.maxElapsedMillis = builder.maxElapsedMillis; + this.backoffMultiple = builder.backoffMultiple; + + if(maxBackoffMillis == -1) { + limitBackoffMillis = false; + } else { + limitBackoffMillis = true; + } + + if(maxElapsedMillis == -1) { + checkElapsedTime = false; + } else { + checkElapsedTime = true; + } + + reset(); + } + + + @Override + public long nextWaitMillis() throws IllegalStateException { + // check if we have exceeded the allowed maximum elapsed time + if(checkElapsedTime && totalElapsed > maxElapsedMillis) { + return BackoffStrategy.STOP; + } + + long waitMillis = nextWait; + + // calculate the next wait milliseconds + nextWait = Math.round(nextWait * backoffMultiple); + + // don't exceed the allowed maximum wait milliseconds + // if a maximum was configured + if(limitBackoffMillis && nextWait > maxBackoffMillis) { + nextWait = maxBackoffMillis; + } + + // track total elapsed time, even if we don't wait we have to assume + // that some amount of time passed outside of the wait or we'll never + // hit the elapsed time limit + totalElapsed += waitMillis != 0 ? waitMillis : 1L; + return waitMillis; + } + + @Override + public void reset() { + nextWait = this.initialBackoffMillis; + totalElapsed = 0; + } + + /** + * + * @return a new {@link Builder} instance. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for instances of {@link ExponentialBackoffStrategy}. + */ + public static final class Builder { + private long initialBackoffMillis = DEFAULT_INITIAL_BACKOFF_MILLIS; + private long maxBackoffMillis = DEFAULT_MAX_BACKOFF_MILLIS; + private long maxElapsedMillis = DEFAULT_MAX_ELAPSED_MILLIS; + private double backoffMultiple = DEFAULT_BACKOFF_MULTIPLE; + + /** + * Set the initial number of milliseconds to wait. Valid values are + * 0 to Long.MAX_VALUE, 0 resulting in no wait time in the initial + * interval. + * @param milliseconds the initial number of milliseconds to wait + * @return this Builder + */ + public Builder setInitialBackoff(long milliseconds) { + this.initialBackoffMillis = milliseconds; + return this; + } + + /** + * Set the maximum number of wait milliseconds that can be generated. + * Valid values are -1 to Long.MAX_VALUE, 0 resulting in 0 + * milliseconds in each wait interval (no wait) & -1 resulting in no + * limit to the number of milliseconds in each wait interval. + * @param milliseconds the maximum number of wait milliseconds + * @return this Builder + */ + public Builder setMaxBackoff(long milliseconds) { + this.maxBackoffMillis = milliseconds; + return this; + } + + /** + * Set the total number of milliseconds that can be provided to wait. + * Valid values are -1 to Long.MAX_VALUE. 0 would result in + * {@link BackoffStrategy#STOP} returned on the first call to + * {@link ExponentialBackoffStrategy#nextWaitMillis()}. -1 would + * result in no maximum being applied to the BackoffStrategy. + * @param milliseconds the maximum elapsed milliseconds this BackoffStrategy + * can provide + * @return this Builder + */ + public Builder setMaxElapsed(long milliseconds) { + this.maxElapsedMillis = milliseconds; + return this; + } + + /** + * Set the multiple applied to the previous backoff milliseconds + * value to get to the next backoff milliseconds value. Valid values + * must be greater than 0.0 and less than Double.MAX_VALUE. + * @param multiple the backoff multiple + * @return this Builder + */ + public Builder setBackoffMultiplier(double multiple) { + this.backoffMultiple = multiple; + return this; + } + + /** + * Construct a new {@link ExponentialBackoffStrategy} instance using this + * Builder's configuration. + * @return a new ExponentialBackoffStrategy instance + */ + public ExponentialBackoffStrategy build() { + // Valid Range: 0 to Long.MAX_VALUE, 0 meaning no initial wait + validateInclusiveBetween(0L, Long.MAX_VALUE, initialBackoffMillis); + // Valid Range: 0 to Long.MAX_VALUE, 0 meaning no limit + validateInclusiveBetween(-1L, Long.MAX_VALUE, maxBackoffMillis); + // Valid Range: 0 to Long.MAX_VALUE, 0 meaning no limit + validateInclusiveBetween(-1L, Long.MAX_VALUE, maxElapsedMillis); + // Valid Range: 1.0001 to Double.MAX_VALUE + validateExclusiveBetween(0.0, Double.MAX_VALUE, backoffMultiple); + + return new ExponentialBackoffStrategy(this); + } + + private void validateInclusiveBetween(long min, long max, long value) { + if(value < min) { + throw new IllegalArgumentException(); + } + + if(value > max) { + throw new IllegalArgumentException(); + } + } + + private void validateExclusiveBetween(double min, double max, double + value) { + if(value <= min) { + throw new IllegalArgumentException(); + } + + if(value >= max) { + throw new IllegalArgumentException(); + } + } + } +} diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index c8f73a3b31e..01f4726af75 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -217,12 +217,12 @@ static public InitialMessage parse(Long protocolVersion, DataInputStream din) } } - public QuorumCnxManager(QuorumPeer self) { + public QuorumCnxManager(QuorumPeer self, BackoffStrategy backoffStrategy) { this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY); this.queueSendMap = new ConcurrentHashMap>(); this.senderWorkerMap = new ConcurrentHashMap(); this.lastMessageSent = new ConcurrentHashMap(); - + String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); @@ -231,7 +231,7 @@ public QuorumCnxManager(QuorumPeer self) { this.self = self; // Starts listener thread that waits for connection requests - listener = new Listener(); + listener = new Listener(backoffStrategy); listener.setName("QuorumPeerListener"); } @@ -607,12 +607,18 @@ public QuorumPeer getQuorumPeer() { */ public class Listener extends ZooKeeperThread { + /* + * Retry BackoffStrategy + */ + private final BackoffStrategy backoffStrategy; + volatile ServerSocket ss = null; - public Listener() { + public Listener(BackoffStrategy backoffStrategy) { // During startup of thread, thread name will be overridden to // specific election address super("ListenerThread"); + this.backoffStrategy = backoffStrategy; } /** @@ -620,10 +626,9 @@ public Listener() { */ @Override public void run() { - int numRetries = 0; InetSocketAddress addr; Socket client = null; - while((!shutdown) && (numRetries < 3)){ + while(!shutdown){ try { ss = new ServerSocket(); ss.setReuseAddress(true); @@ -639,6 +644,7 @@ public void run() { LOG.info("My election bind port: " + addr.toString()); setName(addr.toString()); ss.bind(addr); + backoffStrategy.reset(); while (!shutdown) { try { client = ss.accept(); @@ -646,7 +652,6 @@ public void run() { LOG.info("Received connection request " + client.getRemoteSocketAddress()); receiveConnection(client); - numRetries = 0; } catch (SocketTimeoutException e) { LOG.warn("The socket is listening for the election accepted " + "and it timed out unexpectedly, but will retry." @@ -658,10 +663,14 @@ public void run() { break; } LOG.error("Exception while listening", e); - numRetries++; try { ss.close(); - Thread.sleep(1000); + long waitTime = backoffStrategy.nextWaitMillis(); + if(waitTime == BackoffStrategy.STOP) { + break; + } else if (waitTime > 0) { + Thread.sleep(waitTime); + } } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 01e594723f1..3721c57e41b 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -946,7 +946,8 @@ protected Election createElectionAlgorithm(int electionAlgorithm){ le = new AuthFastLeaderElection(this, true); break; case 3: - qcm = new QuorumCnxManager(this); + qcm = new QuorumCnxManager(this, + ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); diff --git a/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java b/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java new file mode 100644 index 00000000000..fe601e7c71d --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java @@ -0,0 +1,163 @@ +package org.apache.zookeeper.server.quorum; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit tests for {@link ExponentialBackoffStrategy}. + */ +public class ExponentialBackoffStrategyTest { + + // Input validation tests + @Test(expected = IllegalArgumentException.class) + public void initialBackoffRangeMin() { + ExponentialBackoffStrategy.builder() + .setInitialBackoff(-1L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void initialBackoffRangeMax() { + ExponentialBackoffStrategy.builder() + .setInitialBackoff(Long.MAX_VALUE + 1L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void maxBackoffRangeMin() { + ExponentialBackoffStrategy.builder() + .setMaxBackoff(-2L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void maxBackoffRangeMax() { + ExponentialBackoffStrategy.builder() + .setMaxBackoff(Long.MAX_VALUE + 1L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void maxElapsedRangeMin() { + ExponentialBackoffStrategy.builder() + .setMaxElapsed(-2L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void maxElapsedRangeMax() { + ExponentialBackoffStrategy.builder() + .setMaxElapsed(Long.MIN_VALUE) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void multipleRangeMin() { + ExponentialBackoffStrategy.builder() + .setBackoffMultiplier(-0.00000000001) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void multipleRangeMax() { + ExponentialBackoffStrategy.builder() + .setBackoffMultiplier(Double.MAX_VALUE + 0.1) + .build(); + } + + // Test the generated intervals are what we expect + @Test + public void generateWaitIntervals() { + final ExponentialBackoffStrategy strategy = ExponentialBackoffStrategy + .builder() + .setInitialBackoff(10L) + .setMaxBackoff(50L) + .setMaxElapsed(80L) + .setBackoffMultiplier(1.5) + .build(); + + assertEquals(10L, strategy.nextWaitMillis()); + assertEquals(15L, strategy.nextWaitMillis()); + assertEquals(23L, strategy.nextWaitMillis()); + assertEquals(35L, strategy.nextWaitMillis()); + assertEquals(BackoffStrategy.STOP, strategy.nextWaitMillis()); + } + + + @Test + public void exponentialDecreasingBackoff() { + final ExponentialBackoffStrategy strategy = + ExponentialBackoffStrategy.builder() + .setInitialBackoff(100L) + .setMaxBackoff(50L) + .setBackoffMultiplier(0.6) + .setMaxElapsed(1_000L) + .build(); + + + assertEquals(100L, strategy.nextWaitMillis()); + assertEquals(50L, strategy.nextWaitMillis()); + assertEquals(30L, strategy.nextWaitMillis()); + assertEquals(18L, strategy.nextWaitMillis()); + assertEquals(11L, strategy.nextWaitMillis()); + assertEquals(7L, strategy.nextWaitMillis()); + assertEquals(4L, strategy.nextWaitMillis()); + assertEquals(2L, strategy.nextWaitMillis()); + assertEquals(1L, strategy.nextWaitMillis()); + // total elapsed so far is 223, so check that we get 777 more + // intervals of 1ms then STOP + for (int i = 0; i <= 777; i++) { + assertEquals(1L, strategy.nextWaitMillis()); + } + assertEquals(BackoffStrategy.STOP, strategy.nextWaitMillis()); + } + + + @Test + public void generateWaitIntervalsNoWaits() { + final ExponentialBackoffStrategy strategy = ExponentialBackoffStrategy + .builder() + .setInitialBackoff(0L) + .setMaxBackoff(0L) + .setMaxElapsed(100L) + .setBackoffMultiplier(10.0) + .build(); + + for (int i = 0; i <= 100; i++) { + assertEquals(0L, strategy.nextWaitMillis()); + } + assertEquals(BackoffStrategy.STOP, strategy.nextWaitMillis()); + } + + @Test + public void generateWaitIntervalsNoLimits() { + final ExponentialBackoffStrategy strategy = ExponentialBackoffStrategy + .builder() + .setInitialBackoff(1L) + .setMaxBackoff(-1L) + .setMaxElapsed(-1L) + .setBackoffMultiplier(2.5) + .build(); + + assertEquals(1L, strategy.nextWaitMillis()); + assertEquals(3L, strategy.nextWaitMillis()); + assertEquals(8L, strategy.nextWaitMillis()); + assertEquals(20L, strategy.nextWaitMillis()); + assertEquals(50L, strategy.nextWaitMillis()); + assertEquals(125L, strategy.nextWaitMillis()); + assertEquals(313L, strategy.nextWaitMillis()); + assertEquals(783L, strategy.nextWaitMillis()); + + long previousWait = 783L; + for (int i = 0; i < 10_000; i++) { + long currentWait = strategy.nextWaitMillis(); + assertNotEquals(BackoffStrategy.STOP, currentWait); + assertTrue("Expected " + previousWait + " to be greater than " + + currentWait, previousWait <= currentWait); + previousWait = currentWait; + } + } +} diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java index ddb7f9104cf..09b0839bbd1 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java @@ -114,7 +114,7 @@ public void testBackwardElectionRound() throws Exception { * Start mock server 1 */ QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2); - cnxManagers[0] = new QuorumCnxManager(mockPeer); + cnxManagers[0] = new QuorumCnxManager(mockPeer, ExponentialBackoffStrategy.builder().build()); cnxManagers[0].listener.start(); cnxManagers[0].toSend(0l, initialMsg); @@ -123,7 +123,7 @@ public void testBackwardElectionRound() throws Exception { * Start mock server 2 */ mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2); - cnxManagers[1] = new QuorumCnxManager(mockPeer); + cnxManagers[1] = new QuorumCnxManager(mockPeer, ExponentialBackoffStrategy.builder().build()); cnxManagers[1].listener.start(); cnxManagers[1].toSend(0l, initialMsg); diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java index cc44243fdb0..902fc35c239 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java @@ -95,7 +95,7 @@ public void testLostMessage() throws Exception { void mockServer() throws InterruptedException, IOException { QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2); - cnxManager = new QuorumCnxManager(peer); + cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); cnxManager.listener.start(); cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0)); diff --git a/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java index b9d92124b5f..6e9e66b353f 100644 --- a/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java +++ b/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java @@ -35,6 +35,7 @@ import java.net.Socket; import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.quorum.ExponentialBackoffStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.PortAssignment; @@ -110,7 +111,7 @@ class CnxManagerThread extends Thread { public void run(){ try { QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2); - QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = cnxManager.listener; if(listener != null){ listener.start(); @@ -154,7 +155,7 @@ public void testCnxManager() throws Exception { thread.start(); QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); - QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = cnxManager.listener; if(listener != null){ listener.start(); @@ -201,7 +202,7 @@ public void testCnxManagerTimeout() throws Exception { peerTmpdir[2] = ClientBase.createTmpDir(); QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); - QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = cnxManager.listener; if(listener != null){ listener.start(); @@ -229,7 +230,7 @@ public void testCnxManagerTimeout() throws Exception { @Test public void testCnxManagerSpinLock() throws Exception { QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); - QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = cnxManager.listener; if(listener != null){ listener.start(); @@ -294,7 +295,7 @@ public void testCnxManagerNPE() throws Exception { peers.get(2L).type = LearnerType.OBSERVER; QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); - QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = cnxManager.listener; if (listener != null) { listener.start(); @@ -341,7 +342,7 @@ public void testCnxManagerNPE() throws Exception { @Test public void testSocketTimeout() throws Exception { QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2); - QuorumCnxManager cnxManager = new QuorumCnxManager(peer); + QuorumCnxManager cnxManager = new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build()); QuorumCnxManager.Listener listener = cnxManager.listener; if(listener != null){ listener.start(); diff --git a/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java b/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java index a4244d89846..25c8dc09895 100644 --- a/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java +++ b/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; +import org.apache.zookeeper.server.quorum.ExponentialBackoffStrategy; import org.apache.zookeeper.server.quorum.FastLeaderElection; import org.apache.zookeeper.server.quorum.QuorumCnxManager; import org.apache.zookeeper.server.quorum.QuorumPeer; @@ -41,7 +42,7 @@ public class FLEPredicateTest extends ZKTestCase { class MockFLE extends FastLeaderElection { MockFLE(QuorumPeer peer){ - super(peer, new QuorumCnxManager(peer)); + super(peer, new QuorumCnxManager(peer, ExponentialBackoffStrategy.builder().build())); } boolean predicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch){ From 6fa8a3fcf5a5db0c7aebe8698c2b04cc33394ef2 Mon Sep 17 00:00:00 2001 From: Brian Lininger Date: Thu, 16 Nov 2017 13:55:21 -0800 Subject: [PATCH 2/6] ZOOKEEPER-2849 Added missing Apache license header --- .../server/quorum/BackoffStrategy.java | 22 +++++++++++++++++++ .../quorum/ExponentialBackoffStrategy.java | 17 ++++++++++++++ .../ExponentialBackoffStrategyTest.java | 17 ++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java b/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java index 886f56327d4..733438732e2 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/BackoffStrategy.java @@ -1,5 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zookeeper.server.quorum; +/** + * Interface that defines a strategy for backing off arbitrary retry attempts. Each successive call to + * {@link BackoffStrategy#nextWaitMillis()} returns the amount of time to wait or {@link BackoffStrategy#STOP} if no + * further waiting is supported. + */ public interface BackoffStrategy { long STOP = -1L; diff --git a/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java b/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java index 3aa307faba1..896b5a977fd 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategy.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zookeeper.server.quorum; /** diff --git a/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java b/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java index fe601e7c71d..e862385b154 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zookeeper.server.quorum; import static org.junit.Assert.assertEquals; From 8e9755db8d85cca5852d7454b4aaff1975b1e574 Mon Sep 17 00:00:00 2001 From: Brian Lininger Date: Thu, 16 Nov 2017 15:09:36 -0800 Subject: [PATCH 3/6] ZOOKEEPER-2849 Unit tests extend ZKTestCase --- .../server/quorum/ExponentialBackoffStrategyTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java b/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java index e862385b154..7410001844c 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/ExponentialBackoffStrategyTest.java @@ -21,12 +21,13 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import org.apache.zookeeper.ZKTestCase; import org.junit.Test; /** * Unit tests for {@link ExponentialBackoffStrategy}. */ -public class ExponentialBackoffStrategyTest { +public class ExponentialBackoffStrategyTest extends ZKTestCase { // Input validation tests @Test(expected = IllegalArgumentException.class) From 5fb15d235e8048509a945a8e5049e8e248900058 Mon Sep 17 00:00:00 2001 From: Brian Lininger Date: Fri, 17 Nov 2017 11:11:36 -0800 Subject: [PATCH 4/6] ZOOKEEPER-2849 Added support for FixedIntervalBackoffStrategy to maintain the current retry behavior. BackoffStrategy is now configurable via system properties, user can select the type of BackoffStrategy as well as configure it as desired. --- .../zookeeper/server/quorum/QuorumPeer.java | 86 ++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 3721c57e41b..713580da90b 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -946,8 +946,7 @@ protected Election createElectionAlgorithm(int electionAlgorithm){ le = new AuthFastLeaderElection(this, true); break; case 3: - qcm = new QuorumCnxManager(this, - ExponentialBackoffStrategy.builder().build()); + qcm = new QuorumCnxManager(this, buildBackoffStrategy()); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); @@ -964,6 +963,89 @@ protected Election createElectionAlgorithm(int electionAlgorithm){ return le; } + + /** + * Support configuring the fixed interval backoff strategy via a system property. + */ + public static final String FIXED_INTERVAL_BACKOFF_INTERVALS = "zookeeper.quorum.fixedIntervalBackoff.intervalMillis"; + + /** + * Support configuring the fixed interval backoff strategy via a system property. + */ + public static final String FIXED_INTERVAL_BACKOFF_INTERVAL_MILLIS = "zookeeper.quorum.fixedIntervalBackoff.intervals"; + + /** + * Support setting the exponential backoff strategy via a system property. + */ + public static final String EXPONENTIAL_BACKOFF = "zookeeper.quorum.exponentialBackoff"; + + /** + * Support configuring the fixed interval backoff strategy via a system property. + */ + public static final String EXPONENTIAL_BACKOFF_INITIAL = "zookeeper.quorum.exponentialBackoff.initialBackoff"; + + /** + * Support configuring the fixed interval backoff strategy via a system property. + */ + public static final String EXPONENTIAL_BACKOFF_MULTIPLIER = "zookeeper.quorum.exponentialBackoff.multiplier"; + + /** + * Support configuring the fixed interval backoff strategy via a system property. + */ + public static final String EXPONENTIAL_BACKOFF_MAX_ELAPSED = "zookeeper.quorum.exponentialBackoff.maxElapsedMillis"; + + /** + * Support configuring the fixed interval backoff strategy via a system property. + */ + public static final String EXPONENTIAL_BACKOFF_MAX_BACKOFF = "zookeeper.quorum.exponentialBackoff.maxBackoffMillis"; + + /** + * Construct the {@link BackoffStrategy} to use. + * @return the constructed BackoffStrategy + */ + protected BackoffStrategy buildBackoffStrategy() { + + // if the user specifies the exponential back, configure and create it + if(Boolean.getBoolean(EXPONENTIAL_BACKOFF)) { + final ExponentialBackoffStrategy.Builder builder = ExponentialBackoffStrategy.builder(); + + if(Long.getLong(EXPONENTIAL_BACKOFF_INITIAL) != null) { + builder.setInitialBackoff(Long.getLong(EXPONENTIAL_BACKOFF_INITIAL)); + } + + if(Long.getLong(EXPONENTIAL_BACKOFF_MAX_ELAPSED) != null) { + builder.setMaxElapsed(Long.getLong(EXPONENTIAL_BACKOFF_MAX_ELAPSED)); + } + + if(Long.getLong(EXPONENTIAL_BACKOFF_MAX_BACKOFF) != null) { + builder.setMaxBackoff(Long.getLong(EXPONENTIAL_BACKOFF_MAX_BACKOFF)); + } + + final String multiplierProp = System.getProperty(EXPONENTIAL_BACKOFF_MULTIPLIER); + if(multiplierProp != null) { + try { + builder.setBackoffMultiplier(Double.valueOf(multiplierProp)); + } catch(NumberFormatException nfe) { + LOG.warn("{} is not a valid floating point value, ignoring it.", multiplierProp); + } + } + + return builder.build(); + } + + // default to FixedIntervalBackoffStrategy to maintain current behavior + final FixedIntervalBackoffStrategy.Builder builder = FixedIntervalBackoffStrategy.builder(); + if(Long.getLong(FIXED_INTERVAL_BACKOFF_INTERVAL_MILLIS) != null) { + builder.setIntervalMillis(Long.getLong(FIXED_INTERVAL_BACKOFF_INTERVAL_MILLIS)); + } + + if(Long.getLong(FIXED_INTERVAL_BACKOFF_INTERVALS) != null) { + builder.setIntervals(Long.getLong(FIXED_INTERVAL_BACKOFF_INTERVALS)); + } + + return builder.build(); + } + @SuppressWarnings("deprecation") protected Election makeLEStrategy(){ LOG.debug("Initializing leader election protocol..."); From 63721091ba05de4e78d473d4f1ae9cac5552c167 Mon Sep 17 00:00:00 2001 From: Brian Lininger Date: Fri, 17 Nov 2017 15:17:19 -0800 Subject: [PATCH 5/6] ZOOKEEPER-2849 Add FixedIntervalBackoffStrategy and unit test that got missed. --- .../quorum/FixedIntervalBackoffStrategy.java | 130 ++++++++++++++++++ .../FixedIntervalBackoffStrategyTest.java | 89 ++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 src/java/main/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategy.java create mode 100644 src/java/test/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategyTest.java diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategy.java b/src/java/main/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategy.java new file mode 100644 index 00000000000..db3a54536a1 --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategy.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +/** + * {@link BackoffStrategy} that supports configurable fixed intervals, with either an unlimited or fixed number of + * intervals. + */ +public class FixedIntervalBackoffStrategy implements BackoffStrategy { + + // Defaults are the same as the current implementation + private static final long DEFAULT_INTERVAL_MILLIS = 1000L; // 1.0s + private static final long DEFAULT_INTERVAL_COUNT = 3L; // 3 + + private final long intervalMillis; + private final long maxIntervals; + + // internal state + private boolean unlimitedIntervals = false; + private long currentInterval; + + /** + * Construct a new instance. + * @param builder the Builder to use for configuring this BackoffStrategy + */ + private FixedIntervalBackoffStrategy(Builder builder) { + this.intervalMillis = builder.intervalMillis; + this.maxIntervals = builder.maxIntervals; + + if(maxIntervals == -1L) { + unlimitedIntervals = true; + } + + reset(); + } + + @Override + public long nextWaitMillis() throws IllegalStateException { + currentInterval++; + if(!unlimitedIntervals && currentInterval > maxIntervals) { + return BackoffStrategy.STOP; + } + + return intervalMillis; + } + + @Override + public void reset() { + currentInterval = 0; + } + + /** + * + * @return a new {@link Builder} instance. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for instances of {@link FixedIntervalBackoffStrategy}. + */ + public static final class Builder { + private long intervalMillis = DEFAULT_INTERVAL_MILLIS; + private long maxIntervals = DEFAULT_INTERVAL_COUNT; + + /** + * Set the number of milliseconds to wait between each interval. Valid values are 0 to Long.MAX_VALUE, 0 + * resulting in no wait time between intervals. + * + * @param milliseconds the number of milliseconds to wait + * @return this Builder + */ + public Builder setIntervalMillis(long milliseconds) { + this.intervalMillis = milliseconds; + return this; + } + + /** + * Set the number of intervals to wait. Valid values are -1 to Long.MAX_VALUE, -1 would + * result in no maximum being applied to the BackoffStrategy. + * + * @param intervals the number of intervals to check + * @return this Builder + */ + public Builder setIntervals(long intervals) { + this.maxIntervals = intervals; + return this; + } + + /** + * Construct a new {@link FixedIntervalBackoffStrategy} instance using this + * Builder's configuration. + * @return a new FixedIntervalBackoffStrategy instance + */ + public FixedIntervalBackoffStrategy build() { + // Valid Range: 0 to Long.MAX_VALUE, 0 meaning no initial wait + validateInclusiveBetween(0L, Long.MAX_VALUE, intervalMillis); + // Valid Range: 0 to Long.MAX_VALUE, 0 meaning no limit + validateInclusiveBetween(-1L, Long.MAX_VALUE, maxIntervals); + + return new FixedIntervalBackoffStrategy(this); + } + + private void validateInclusiveBetween(long min, long max, long value) { + if(value < min) { + throw new IllegalArgumentException(); + } + + if(value > max) { + throw new IllegalArgumentException(); + } + } + } +} diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategyTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategyTest.java new file mode 100644 index 00000000000..742ca9c497b --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/FixedIntervalBackoffStrategyTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link FixedIntervalBackoffStrategy}. + */ +public class FixedIntervalBackoffStrategyTest extends ZKTestCase { + + // Input validation tests + @Test(expected = IllegalArgumentException.class) + public void intervalMillisRangeMin() { + FixedIntervalBackoffStrategy.builder() + .setIntervalMillis(-1L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void intervalMillisRangeMax() { + FixedIntervalBackoffStrategy.builder() + .setIntervalMillis(Long.MAX_VALUE + 1L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void intervalsRangeMin() { + FixedIntervalBackoffStrategy.builder() + .setIntervals(-2L) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void maxBackoffRangeMax() { + FixedIntervalBackoffStrategy.builder() + .setIntervals(Long.MAX_VALUE + 1L) + .build(); + } + + // Test the generated intervals are what we expect + @Test + public void generateWaitIntervals() { + final FixedIntervalBackoffStrategy strategy = FixedIntervalBackoffStrategy + .builder() + .setIntervalMillis(5000L) + .setIntervals(3) + .build(); + + assertEquals(5000L, strategy.nextWaitMillis()); + assertEquals(5000L, strategy.nextWaitMillis()); + assertEquals(5000L, strategy.nextWaitMillis()); + assertEquals(BackoffStrategy.STOP, strategy.nextWaitMillis()); + } + + @Test + public void generateWaitIntervalsNoLimits() { + final FixedIntervalBackoffStrategy strategy = FixedIntervalBackoffStrategy + .builder() + .setIntervalMillis(2000L) + .setIntervals(-1L) + .build(); + + for (int i = 0; i < 10_000; i++) { + long currentWait = strategy.nextWaitMillis(); + assertEquals(2000L, strategy.nextWaitMillis()); + } + } +} From 2fb5ced3dccccd039c7cf4926c84ae928813c01c Mon Sep 17 00:00:00 2001 From: Brian Lininger Date: Tue, 21 Nov 2017 09:56:21 -0800 Subject: [PATCH 6/6] ZOOKEEPER-2849 Add support for external implementations of BackoffStrategy. --- .../zookeeper/server/quorum/QuorumPeer.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 713580da90b..3580527f13d 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -963,6 +963,10 @@ protected Election createElectionAlgorithm(int electionAlgorithm){ return le; } + /** + * Support setting the exponential backoff strategy via a system property. + */ + public static final String BACKOFF_STRATEGY = "zookeeper.quorum.backoffStrategy"; /** * Support configuring the fixed interval backoff strategy via a system property. @@ -1005,6 +1009,20 @@ protected Election createElectionAlgorithm(int electionAlgorithm){ */ protected BackoffStrategy buildBackoffStrategy() { + // check for external BackoffStrategy + final String externalBackoffStrategy = System.getProperty(BACKOFF_STRATEGY); + if(externalBackoffStrategy != null) { + try { + return (BackoffStrategy) Class.forName(externalBackoffStrategy).newInstance(); + } catch(ClassNotFoundException cnfe) { + LOG.warn("Unable to load BackoffStrategy {}, default to Zookeeper internal BackoffStrategy", + externalBackoffStrategy); + } catch(InstantiationException | IllegalAccessException ex) { + LOG.warn("Unable to construct BackoffStrategy {}, default to Zookeeper internal BackoffStrategy", + externalBackoffStrategy); + } + } + // if the user specifies the exponential back, configure and create it if(Boolean.getBoolean(EXPONENTIAL_BACKOFF)) { final ExponentialBackoffStrategy.Builder builder = ExponentialBackoffStrategy.builder();