forked from dustin/java-memcached-client
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SPY-172: Wakeup the selector if idle.
Motivation ---------- Waking the selector up from time to time and providing implementations a chance to run certain checks helps to improve robustness in certain situations. Modifications ------------- The wakeup time is configurable through a system property, but is low impact even if set to a smaller value. If the added queue is empty (which means the selector has been woken up but no op has been added) a custom method is called where implementations can run custom code like idle polls. Result ------ Better handling in idle situations. Change-Id: I43ea722b8a4fc28be4f997674ea85f73f2c66a50 Reviewed-on: http://review.couchbase.org/37725 Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com> Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
- Loading branch information
Showing
2 changed files
with
112 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,6 +92,11 @@ public class MemcachedConnection extends SpyThread { | |
*/ | ||
private static final int EXCESSIVE_EMPTY = 0x1000000; | ||
|
||
/** | ||
* The default wakeup delay if not overriden by a system property. | ||
*/ | ||
private static final int DEFAULT_WAKEUP_DELAY = 1000; | ||
|
||
/** | ||
* If an operation gets cloned more than this ceiling, cancel it for | ||
* safety reasons. | ||
|
@@ -230,6 +235,11 @@ public class MemcachedConnection extends SpyThread { | |
*/ | ||
protected final MetricType metricType; | ||
|
||
/** | ||
* The selector wakeup delay, defaults to 1000ms. | ||
*/ | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
daschl
Author
|
||
private final int wakeupDelay; | ||
|
||
/** | ||
* Construct a {@link MemcachedConnection}. | ||
* | ||
|
@@ -266,6 +276,9 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, | |
verifyAliveOnConnect = false; | ||
} | ||
|
||
wakeupDelay = Integer.parseInt( System.getProperty("net.spy.wakeupDelay", | ||
Integer.toString(DEFAULT_WAKEUP_DELAY))); | ||
|
||
List<MemcachedNode> connections = createConnections(a); | ||
locator = f.createLocator(connections); | ||
|
||
|
@@ -396,7 +409,7 @@ public void handleIO() throws IOException { | |
handleInputQueue(); | ||
getLogger().debug("Done dealing with queue."); | ||
|
||
long delay = 0; | ||
long delay = 1000; | ||
if (!reconnectQueue.isEmpty()) { | ||
long now = System.currentTimeMillis(); | ||
long then = reconnectQueue.firstKey(); | ||
|
@@ -405,9 +418,12 @@ public void handleIO() throws IOException { | |
getLogger().debug("Selecting with delay of %sms", delay); | ||
assert selectorsMakeSense() : "Selectors don't make sense."; | ||
int selected = selector.select(delay); | ||
//Set<SelectionKey> selectedKeys = selector.selectedKeys(); | ||
|
||
if (selector.selectedKeys().isEmpty() && !shutDown) { | ||
if (shutDown) { | ||
return; | ||
} else if (selected == 0 && addedQueue.isEmpty()) { | ||
handleWokenUpSelector(); | ||
} else if (selector.selectedKeys().isEmpty()) { | ||
handleEmptySelects(); | ||
} else { | ||
getLogger().debug("Selected %d, selected %d keys", selected, | ||
|
@@ -425,6 +441,23 @@ public void handleIO() throws IOException { | |
handleOperationalTasks(); | ||
} | ||
|
||
/** | ||
* Helper method which gets called if the selector is woken up because of the | ||
* timeout setting, if has been interrupted or if happens during regular | ||
* write operation phases. | ||
* | ||
* <p>This method can be overriden by child implementations to handle custom | ||
* behavior on a manually woken selector, like sending pings through the | ||
* channels to make sure they are alive.</p> | ||
* | ||
* <p>Note that there is no guarantee that this method is at all or in the | ||
* regular interval called, so all overriding implementations need to take | ||
* that into account. Also, it needs to take into account that it may be | ||
* called very often under heavy workloads, so it should not perform extensive | ||
* tasks in the same thread.</p> | ||
*/ | ||
protected void handleWokenUpSelector() { } | ||
|
||
/** | ||
* Helper method for {@link #handleIO()} to encapsulate everything that | ||
* needs to be checked on a regular basis that has nothing to do directly | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/** | ||
* Copyright (C) 2006-2009 Dustin Sallings | ||
* Copyright (C) 2009-2014 Couchbase, Inc. | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in | ||
* all copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | ||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING | ||
* IN THE SOFTWARE. | ||
*/ | ||
package net.spy.memcached; | ||
|
||
import net.spy.memcached.protocol.binary.BinaryOperationFactory; | ||
import org.junit.Test; | ||
|
||
import java.io.IOException; | ||
import java.net.InetSocketAddress; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.junit.Assert.assertTrue; | ||
|
||
/** | ||
* Verifies the functionality of the {@link MemcachedConnection} that the | ||
* selector gets woken up automatically if idle. | ||
*/ | ||
public class WokenUpOnIdleTest { | ||
|
||
@Test | ||
public void shouldWakeUpOnIdle() throws Exception { | ||
CountDownLatch latch = new CountDownLatch(3); | ||
MemcachedConnection connection = new InstrumentedConnection( | ||
latch, | ||
1024, | ||
new BinaryConnectionFactory(), | ||
Arrays.asList(new InetSocketAddress(11210)), | ||
Collections.<ConnectionObserver>emptyList(), | ||
FailureMode.Redistribute, | ||
new BinaryOperationFactory() | ||
); | ||
|
||
assertTrue(latch.await(5, TimeUnit.SECONDS)); | ||
} | ||
|
||
static class InstrumentedConnection extends MemcachedConnection { | ||
final CountDownLatch latch; | ||
InstrumentedConnection(CountDownLatch latch, int bufSize, ConnectionFactory f, | ||
List<InetSocketAddress> a, Collection<ConnectionObserver> obs, | ||
FailureMode fm, OperationFactory opfactory) throws IOException { | ||
super(bufSize, f, a, obs, fm, opfactory); | ||
this.latch = latch; | ||
} | ||
|
||
@Override | ||
protected void handleWokenUpSelector() { | ||
latch.countDown(); | ||
} | ||
} | ||
|
||
} |
@daschl I know this code is a little old, but I can't find code using wakeupDelay.