Skip to content

Commit

Permalink
ARTEMIS-1495 Fixing In Handler executor and added benchmark to measur…
Browse files Browse the repository at this point in the history
…e impact of changes
  • Loading branch information
clebertsuconic committed Nov 9, 2017
1 parent 0fadc68 commit 91db080
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.activemq.artemis.utils.actors;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,9 +42,15 @@ public void execute(Runnable command) {

/** It will wait the current execution (if there is one) to finish
* but will not complete any further executions */
default void shutdownNow() {
default List<Runnable> shutdownNow() {
return Collections.emptyList();
}


default void shutdown() {
}


/**
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.apache.activemq.artemis.utils.actors;

/**
* This abstract class will encapsulate
* ThreadLocals to determine when a class is a handler.
* This is because some functionality has to be avoided if inHandler().
*
*/
public abstract class HandlerBase {

//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;

// this cannot be static as the Actor will be used within another executor. For that reason
// each instance will have its own ThreadLocal.
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();

protected void enter() {
assert inHandler.get() == null : "should be null";
inHandler.set(DUMMY);
}

public boolean inHandler() {
final Object dummy = inHandler.get();
return dummy != null;
}

protected void leave() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@

package org.apache.activemq.artemis.utils.actors;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;

public abstract class ProcessorBase<T> {
import org.jboss.logging.Logger;

private static final int STATE_NOT_RUNNING = 0;
private static final int STATE_RUNNING = 1;
private static final int STATE_FORCED_SHUTDOWN = 2;
public abstract class ProcessorBase<T> extends HandlerBase {

private static final Logger logger = Logger.getLogger(ProcessorBase.class);

public static final int STATE_NOT_RUNNING = 0;
public static final int STATE_RUNNING = 1;
public static final int STATE_FORCED_SHUTDOWN = 2;

protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();

Expand All @@ -41,6 +48,8 @@ public abstract class ProcessorBase<T> {

private volatile boolean requestedShutdown = false;

private volatile boolean started = true;

private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");

private final class ExecutorTask implements Runnable {
Expand All @@ -50,19 +59,23 @@ public void run() {
do {
//if there is no thread active and is not already dead then we run
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
enter();
try {
T task = tasks.poll();
//while the queue is not empty we process in order
while (task != null) {
while (task != null && !requestedShutdown) {
//just drain the tasks if has been requested a shutdown to help the shutdown process
if (!requestedShutdown) {
doTask(task);
if (requestedShutdown) {
tasks.add(task);
break;
}
doTask(task);
task = tasks.poll();
}
} finally {
leave();
//set state back to not running.
stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
}
} else {
return;
Expand All @@ -75,31 +88,57 @@ public void run() {
}
}

/** It will wait the current execution (if there is one) to finish
* but will not complete any further executions */
public void shutdownNow() {
/**
* It will shutdown and wait 30 seconds for timeout.
*/
public void shutdown() {
shutdown(30, TimeUnit.SECONDS);
}

public void shutdown(long timeout, TimeUnit unit) {
started = false;

if (!inHandler()) {
// if it's in handler.. we just return
flush(timeout, unit);
}
}

/**
* It will wait the current execution (if there is one) to finish
* but will not complete any further executions
*/
public List<T> shutdownNow() {
//alert anyone that has been requested (at least) an immediate shutdown
requestedShutdown = true;
//it could take a very long time depending on the current executing task
do {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown
return;
}
if (startState == STATE_RUNNING) {
//wait 100 ms to avoid burning CPU while waiting and
//give other threads a chance to make progress
LockSupport.parkNanos(100_000_000L);
started = false;

if (inHandler()) {
stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
} else {
//it could take a very long time depending on the current executing task
do {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown
break;
}
if (startState == STATE_RUNNING) {
//wait 100 ms to avoid burning CPU while waiting and
//give other threads a chance to make progress
LockSupport.parkNanos(100_000_000L);
}
}
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
//this could happen just one time: the forced shutdown state is the last one and
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
}
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
//this could happen just one time: the forced shutdown state is the last one and
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
ArrayList<T> returnList = new ArrayList<>(tasks);
tasks.clear();
//we can report the killed tasks somehow: ExecutorService do the same on shutdownNow

return returnList;
}

protected abstract void doTask(T task);
Expand All @@ -112,26 +151,48 @@ public final boolean isFlushed() {
return stateUpdater.get(this) == STATE_NOT_RUNNING;
}

protected void task(T command) {
if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command);
//cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this);
if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
tasks.clear();
} else if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(task);
/**
* WARNING: This will only flush when all the activity is suspended.
* don't expect success on this call if another thread keeps feeding the queue
* this is only valid on situations where you are not feeding the queue,
* like in shutdown and failover situations.
*/
public final boolean flush(long timeout, TimeUnit unit) {
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
// quick test, most of the time it will be empty anyways
return true;
}

long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
try {
while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {

if (tasks.isEmpty()) {
return true;
}

Thread.sleep(10);
}
} catch (InterruptedException e) {
// ignored
}

return stateUpdater.get(this) == STATE_NOT_RUNNING;
}

protected void startPoller() {
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
//note that this can result in multiple tasks being queued
//this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
protected void task(T command) {
if (!started) {
logger.debug("Ordered executor has been shutdown at", new Exception("debug"));
}
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command);
//cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this);
if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
tasks.clear();
} else if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(task);
}
}
Expand All @@ -146,4 +207,8 @@ public final int remaining() {
return tasks.size();
}

public final int status() {
return stateUpdater.get(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -70,12 +71,78 @@ public void shouldShutdownNowDoNotExecuteFurtherTasks() throws InterruptedExcept
//from now on new tasks won't be executed
final CountDownLatch afterDeatchExecution = new CountDownLatch(1);
executor.execute(afterDeatchExecution::countDown);
Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(1, TimeUnit.SECONDS));
Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS));
//to avoid memory leaks the executor must take care of the new submitted tasks immediatly
Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining());
} finally {
executorService.shutdown();
}
}



@Test
public void shutdownWithin() throws InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger numberOfTasks = new AtomicInteger(0);
final CountDownLatch ran = new CountDownLatch(1);

executor.execute(() -> {
try {
latch.await(1, TimeUnit.MINUTES);
numberOfTasks.set(executor.shutdownNow().size());
ran.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});


for (int i = 0; i < 100; i++) {
executor.execute(() -> System.out.println("Dont worry, this will never happen"));
}

latch.countDown();
ran.await(1, TimeUnit.SECONDS);
Assert.assertEquals(100, numberOfTasks.get());

Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status());
Assert.assertEquals(0, executor.remaining());
} finally {
executorService.shutdown();
}
}


@Test
public void testMeasure() throws InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
int MAX_LOOP = 1_000_000;

// extend the number for longer numbers
int runs = 10;

for (int i = 0; i < runs; i++) {
long start = System.nanoTime();
final CountDownLatch executed = new CountDownLatch(MAX_LOOP);
for (int l = 0; l < MAX_LOOP; l++) {
executor.execute(executed::countDown);
}
Assert.assertTrue(executed.await(1, TimeUnit.MINUTES));
long end = System.nanoTime();

long elapsed = (end - start);

System.out.println("execution " + i + " in " + TimeUnit.NANOSECONDS.toMillis(elapsed) + " milliseconds");
}
} finally {
executorService.shutdown();
}
}

}
Loading

0 comments on commit 91db080

Please sign in to comment.