Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pveentjer committed May 6, 2016
1 parent 3af7565 commit 0ad3376
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 67 deletions.
Expand Up @@ -35,9 +35,6 @@
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Connection;
import com.hazelcast.spi.properties.HazelcastProperty;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import com.hazelcast.util.concurrent.NoOpIdleStrategy;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -52,18 +49,12 @@
import static com.hazelcast.client.spi.properties.ClientProperty.MAX_CONCURRENT_INVOCATIONS;
import static com.hazelcast.instance.OutOfMemoryErrorDispatcher.onOutOfMemory;
import static com.hazelcast.spi.exception.TargetDisconnectedException.newTargetDisconnectedExceptionCausedByHeartBeat;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static com.hazelcast.spi.impl.operationservice.impl.AsyncResponseHandler.getIdleStrategy;


abstract class ClientInvocationServiceSupport implements ClientInvocationService {
public static final HazelcastProperty FAST
= new HazelcastProperty("hazelcast.client.responsequeue.fast", false);

private static final long IDLE_MAX_SPINS = 20;
private static final long IDLE_MAX_YIELDS = 50;
private static final long IDLE_MIN_PARK_NS = NANOSECONDS.toNanos(1);
private static final long IDLE_MAX_PARK_NS = MICROSECONDS.toNanos(100);
public static final HazelcastProperty IDLE_STRATEGY
= new HazelcastProperty("hazelcast.client.responsequeue.idlestrategy", "block");

private static final int WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED_THRESHOLD = 5000;
protected final HazelcastClientInstanceImpl client;
Expand Down Expand Up @@ -274,14 +265,11 @@ public ClientMessage getClientMessage() {
private class ResponseThread extends Thread {
private final BlockingQueue<ClientPacket> responseQueue;

public ResponseThread(ThreadGroup threadGroup, String name, ClassLoader classLoader) {
ResponseThread(ThreadGroup threadGroup, String name, ClassLoader classLoader) {
super(threadGroup, name);
setContextClassLoader(classLoader);

IdleStrategy idleStrategy = client.getProperties().getBoolean(FAST)
? new BackoffIdleStrategy(IDLE_MAX_SPINS, IDLE_MAX_YIELDS, IDLE_MIN_PARK_NS, IDLE_MAX_PARK_NS)
: new NoOpIdleStrategy();
this.responseQueue = new MPSCQueue<ClientPacket>(this, idleStrategy);
this.responseQueue = new MPSCQueue<ClientPacket>(this, getIdleStrategy(client.getProperties(), IDLE_STRATEGY));
}

@Override
Expand Down
Expand Up @@ -59,22 +59,21 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
* Creates a new {@link MPSCQueue} with the provided {@link IdleStrategy} and consumer thread.
*
* @param consumerThread the Thread that consumes the items.
* @param idleStrategy the idleStrategy
* @throws NullPointerException when idleStrategy or consumerThread is null.
* @param idleStrategy the idleStrategy. If null, this consumer will block if the queue is empty.
* @throws NullPointerException when consumerThread is null.
*/
public MPSCQueue(Thread consumerThread, IdleStrategy idleStrategy) {
this.consumerThread = checkNotNull(consumerThread, "consumerThread can't be null");
this.idleStrategy = checkNotNull(idleStrategy, "idleStrategy can't be null");
this.idleStrategy = idleStrategy;
}

/**
* Creates a new {@link MPSCQueue} with the provided {@link IdleStrategy}.
*
* @param idleStrategy the idleStrategy
* @throws NullPointerException when idleStrategy is null.
* @param idleStrategy the idleStrategy. If null, the consumer will block.
*/
public MPSCQueue(IdleStrategy idleStrategy) {
this.idleStrategy = checkNotNull(idleStrategy, "idleStrategy can't be null");
this.idleStrategy = idleStrategy;
}

/**
Expand Down Expand Up @@ -195,9 +194,8 @@ private void takeAll() throws InterruptedException {
Node currentPutStackHead = putStack.get();

if (currentPutStackHead == null) {
// first we try to idle;
if (!idleStrategy.idle(iteration)) {
// we don't need to block yet; so lets try again.
if (idleStrategy != null) {
idleStrategy.idle(iteration);
continue;
}

Expand Down
Expand Up @@ -36,7 +36,6 @@
import com.hazelcast.spi.impl.operationexecutor.OperationRunnerFactory;
import com.hazelcast.spi.impl.operationservice.impl.operations.Backup;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.concurrent.NoOpIdleStrategy;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -48,8 +47,6 @@
import static com.hazelcast.spi.properties.GroupProperty.PARTITION_OPERATION_THREAD_COUNT;
import static com.hazelcast.spi.properties.GroupProperty.PRIORITY_GENERIC_OPERATION_THREAD_COUNT;
import static com.hazelcast.util.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
Expand Down Expand Up @@ -77,7 +74,7 @@
@SuppressWarnings("checkstyle:methodcount")
public final class OperationExecutorImpl implements OperationExecutor, MetricsProvider {

public static final int TERMINATION_TIMEOUT_SECONDS = 3;
private static final int TERMINATION_TIMEOUT_SECONDS = 3;

private final ILogger logger;

Expand Down Expand Up @@ -132,7 +129,6 @@ private OperationRunner[] initGenericOperationRunners(HazelcastProperties proper
threadCount = Math.max(2, coreSize / 2);
}


OperationRunner[] operationRunners = new OperationRunner[threadCount + priorityThreadCount];
for (int partitionId = 0; partitionId < operationRunners.length; partitionId++) {
operationRunners[partitionId] = runnerFactory.createGenericRunner();
Expand All @@ -154,9 +150,9 @@ private PartitionOperationThread[] initPartitionThreads(HazelcastProperties prop
PartitionOperationThread[] threads = new PartitionOperationThread[threadCount];
for (int threadId = 0; threadId < threads.length; threadId++) {
String threadName = threadGroup.getThreadPoolNamePrefix("partition-operation") + threadId;
MPSCQueue<Object> normalQueue = new MPSCQueue<Object>(new NoOpIdleStrategy());
OperationQueue operationQueue = new DefaultOperationQueue(
normalQueue, new ConcurrentLinkedQueue<Object>());
// the normalQueue will be a blocking queue. We don't want to idle, because there are many operation threads.
MPSCQueue<Object> normalQueue = new MPSCQueue<Object>(null);
OperationQueue operationQueue = new DefaultOperationQueue(normalQueue, new ConcurrentLinkedQueue<Object>());

PartitionOperationThread partitionThread = new PartitionOperationThread(threadName, threadId, operationQueue, logger,
threadGroup, nodeExtension, partitionOperationRunners);
Expand Down
Expand Up @@ -30,7 +30,6 @@
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.BusySpinIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import com.hazelcast.util.concurrent.NoOpIdleStrategy;

import java.util.concurrent.BlockingQueue;

Expand All @@ -55,7 +54,7 @@
* {@link com.hazelcast.spi.impl.operationservice.impl.responses.Response} and let the invocation-future
* deal with the response can be rather expensive currently.
*/
class AsyncResponseHandler implements PacketHandler, MetricsProvider {
public class AsyncResponseHandler implements PacketHandler, MetricsProvider {

public static final HazelcastProperty IDLE_STRATEGY
= new HazelcastProperty("hazelcast.operation.responsequeue.idlestrategy", "block");
Expand Down Expand Up @@ -101,6 +100,19 @@ public void shutdown() {
responseThread.shutdown();
}

public static IdleStrategy getIdleStrategy(HazelcastProperties properties, HazelcastProperty property) {
String idleStrategyString = properties.getString(property);
if ("block".equals(idleStrategyString)) {
return null;
} else if ("backoff".equals(idleStrategyString)) {
return new BackoffIdleStrategy(IDLE_MAX_SPINS, IDLE_MAX_YIELDS, IDLE_MIN_PARK_NS, IDLE_MAX_PARK_NS);
} else if ("busyspin".equals(idleStrategyString)) {
return new BusySpinIdleStrategy();
} else {
throw new IllegalStateException("Unrecognized " + property.getName() + " value=" + idleStrategyString);
}
}

/**
* The ResponseThread needs to implement the OperationHostileThread interface to make sure that the OperationExecutor
* is not going to schedule any operations on this task due to retry.
Expand All @@ -117,23 +129,7 @@ private ResponseThread(HazelcastThreadGroup threadGroup,
super(threadGroup.getInternalThreadGroup(), threadGroup.getThreadNamePrefix("response"));
setContextClassLoader(threadGroup.getClassLoader());
this.responsePacketHandler = responsePacketHandler;

this.responseQueue = new MPSCQueue<Packet>(this, getIdleStrategy(properties));
}

private IdleStrategy getIdleStrategy(HazelcastProperties properties) {
String idleStrategyString = properties.getString(IDLE_STRATEGY);
IdleStrategy idleStrategy;
if ("blocking".equals(idleStrategyString)) {
idleStrategy = new NoOpIdleStrategy();
} else if ("backoff".equals(idleStrategyString)) {
idleStrategy = new BackoffIdleStrategy(IDLE_MAX_SPINS, IDLE_MAX_YIELDS, IDLE_MIN_PARK_NS, IDLE_MAX_PARK_NS);
} else if ("busyspin".equals(idleStrategyString)) {
idleStrategy = new BusySpinIdleStrategy();
} else {
throw new IllegalStateException("Unrecognized " + IDLE_STRATEGY.getName() + " value=" + idleStrategyString);
}
return idleStrategy;
this.responseQueue = new MPSCQueue<Packet>(this, getIdleStrategy(properties, IDLE_STRATEGY));
}

@Override
Expand Down
Expand Up @@ -71,16 +71,15 @@ public BackoffIdleStrategy(long maxSpins, long maxYields, long minParkPeriodNs,
}
final long parkTime = parkTime(n);
LockSupport.parkNanos(parkTime);
//return parkTime == maxParkPeriodNs;
return false;
return parkTime == maxParkPeriodNs;
}

long parkTime(long n) {
final long proposedShift = n - parkThreshold;
final long allowedShift = min(maxShift, proposedShift);
return proposedShift > maxShift ? maxParkPeriodNs
: proposedShift < maxShift ? minParkPeriodNs << allowedShift
: min(minParkPeriodNs << allowedShift, maxParkPeriodNs);
: proposedShift < maxShift ? minParkPeriodNs << allowedShift
: min(minParkPeriodNs << allowedShift, maxParkPeriodNs);
}
}

Expand Up @@ -34,6 +34,6 @@ public boolean idle(final long n) {
} else {
dummyCounter = dummyValue;
}
return false;
return true;
}
}
Expand Up @@ -5,6 +5,7 @@
import com.hazelcast.test.TestThread;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -22,25 +23,42 @@
@Category(NightlyTest.class)
public class MPSCQueueStressTest extends HazelcastTestSupport {

private static final long DURATION_SECONDS = MINUTES.toSeconds(2);

private final AtomicBoolean stop = new AtomicBoolean();

@Test
public void test_singleProducer() throws Exception {
test(1);
public void test_singleProducer_block() throws Exception {
test(1, null);
}

@Test
public void test_twoProducers_block() throws Exception {
test(2, null);
}

@Test
public void test_multipleProducers_block() throws Exception {
test(10, null);
}

@Test
public void test_singleProducer_backoff() throws Exception {
test(1, new BackoffIdleStrategy(100, 1000, 1000, MILLISECONDS.toNanos(1)));
}

@Test
public void test_twoProducers() throws Exception {
test(2);
public void test_twoProducers_backoff() throws Exception {
test(2, new BackoffIdleStrategy(100, 1000, 1000, MILLISECONDS.toNanos(1)));
}

@Test
public void test_multipleProducers() throws Exception {
test(10);
public void test_multipleProducers_backoff() throws Exception {
test(10, new BackoffIdleStrategy(100, 1000, 1000, MILLISECONDS.toNanos(1)));
}

public void test(int producerCount) throws Exception {
MPSCQueue<Item> queue = new MPSCQueue<Item>(new BackoffIdleStrategy(100, 1000, 1000, MILLISECONDS.toNanos(1)));
public void test(int producerCount, IdleStrategy idleStrategy) throws Exception {
MPSCQueue<Item> queue = new MPSCQueue<Item>(idleStrategy);
ConsumerThread consumers = new ConsumerThread(queue, producerCount);
queue.setConsumerThread(consumers);
consumers.start();
Expand All @@ -51,7 +69,7 @@ public void test(int producerCount) throws Exception {
producer.start();
producers.add(producer);
}
sleepAndStop(stop, MINUTES.toSeconds(1));
sleepAndStop(stop, DURATION_SECONDS);

long totalProduced = 0;
for (ProducerThread producer : producers) {
Expand Down

0 comments on commit 0ad3376

Please sign in to comment.