Skip to content

Commit

Permalink
Remove manual-remove-on-cancel workaround for Java 6 in LoggingSchedu…
Browse files Browse the repository at this point in the history
…ledExecutor
  • Loading branch information
mdogan committed May 8, 2019
1 parent adb12cd commit aafee4c
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 268 deletions.
Expand Up @@ -33,14 +33,12 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.internal.metrics.ProbeLevel.MANDATORY;
import static com.hazelcast.spi.properties.GroupProperty.TASK_SCHEDULER_REMOVE_ON_CANCEL;
import static java.lang.Thread.currentThread;

public final class ClientExecutionServiceImpl implements ClientExecutionService, MetricsProvider {
Expand All @@ -66,24 +64,17 @@ public ClientExecutionServiceImpl(String name, ClassLoader classLoader,
}
logger = loggingService.getLogger(ClientExecutionService.class);
internalExecutor = new LoggingScheduledExecutor(logger, internalPoolSize,
new PoolExecutorThreadFactory(name + ".internal-", classLoader),
properties.getBoolean(TASK_SCHEDULER_REMOVE_ON_CANCEL),
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String message = "Internal executor rejected task: " + r + ", because client is shutting down...";
logger.finest(message);
throw new RejectedExecutionException(message);
}
new PoolExecutorThreadFactory(name + ".internal-", classLoader), (r, executor) -> {
String message = "Internal executor rejected task: " + r + ", because client is shutting down...";
logger.finest(message);
throw new RejectedExecutionException(message);
});
userExecutor = new ThreadPoolExecutor(executorPoolSize, executorPoolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new PoolExecutorThreadFactory(name + ".user-", classLoader),
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String message = "User executor rejected task: " + r + ", because client is shutting down...";
logger.finest(message);
throw new RejectedExecutionException(message);
}
new LinkedBlockingQueue<>(),
new PoolExecutorThreadFactory(name + ".user-", classLoader), (r, executor) -> {
String message = "User executor rejected task: " + r + ", because client is shutting down...";
logger.finest(message);
throw new RejectedExecutionException(message);
});
}

Expand Down
Expand Up @@ -28,7 +28,6 @@
import com.hazelcast.spi.TaskScheduler;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.executor.CachedExecutorServiceDelegate;
Expand All @@ -45,7 +44,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -76,14 +74,11 @@ public final class ExecutionServiceImpl implements InternalExecutionService {
private final ILogger logger;
private final CompletableFutureTask completableFutureTask;

private final ConcurrentMap<String, ManagedExecutorService> executors
= new ConcurrentHashMap<String, ManagedExecutorService>();
private final ConcurrentMap<String, ManagedExecutorService> executors = new ConcurrentHashMap<>();

private final ConcurrentMap<String, ManagedExecutorService> durableExecutors
= new ConcurrentHashMap<String, ManagedExecutorService>();
private final ConcurrentMap<String, ManagedExecutorService> durableExecutors = new ConcurrentHashMap<>();

private final ConcurrentMap<String, ManagedExecutorService> scheduleDurableExecutors
= new ConcurrentHashMap<String, ManagedExecutorService>();
private final ConcurrentMap<String, ManagedExecutorService> scheduleDurableExecutors = new ConcurrentHashMap<>();

private final ConstructorFunction<String, ManagedExecutorService> constructor =
new ConstructorFunction<String, ManagedExecutorService>() {
Expand Down Expand Up @@ -128,21 +123,16 @@ public ExecutionServiceImpl(NodeEngineImpl nodeEngine) {
ThreadFactory threadFactory = new PoolExecutorThreadFactory(createThreadPoolName(hzName, "cached"),
configClassLoader);
this.cachedExecutorService = new ThreadPoolExecutor(
CORE_POOL_SIZE, Integer.MAX_VALUE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (logger.isFinestEnabled()) {
logger.finest("Node is shutting down; discarding the task: " + r);
}
}
}
);
CORE_POOL_SIZE, Integer.MAX_VALUE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(),
threadFactory, (r, executor) -> {
if (logger.isFinestEnabled()) {
logger.finest("Node is shutting down; discarding the task: " + r);
}
});

ThreadFactory singleExecutorThreadFactory = new SingleExecutorThreadFactory(configClassLoader,
createThreadPoolName(hzName, "scheduled"));
this.scheduledExecutorService = new LoggingScheduledExecutor(logger, 1,
singleExecutorThreadFactory,
nodeEngine.getProperties().getBoolean(GroupProperty.TASK_SCHEDULER_REMOVE_ON_CANCEL));
this.scheduledExecutorService = new LoggingScheduledExecutor(logger, 1, singleExecutorThreadFactory);

int coreSize = Math.max(RuntimeAvailableProcessors.get(), 2);
// default executors
Expand Down Expand Up @@ -217,10 +207,7 @@ private ManagedExecutorService createExecutor(String name, int poolSize, int que
}

NamedThreadPoolExecutor pool = new NamedThreadPoolExecutor(name, poolSize, poolSize,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
threadFactory
);
KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueCapacity), threadFactory);
pool.allowCoreThreadTimeOut(true);
executor = pool;
} else {
Expand Down
Expand Up @@ -984,29 +984,6 @@ public final class GroupProperty {
public static final HazelcastProperty BIND_SPOOFING_CHECKS =
new HazelcastProperty("hazelcast.nio.tcp.spoofing.checks", false);

/**
* This is a Java 6 specific property. In Java 7+ tasks are always removed
* on cancellation due to the explicit
* {@code java.util.concurrent.ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}
* and constant time removal.
*
* In Java 6 there is no out-of-the-box support for removal of cancelled tasks,
* and the only way to implement this is using a linear scan of all pending
* tasks. Therefore in Java 6 there is a performance penalty.
*
* Using this property, in Java 6, one can control if cancelled tasks are removed.
* By default tasks are removed, because it can lead to temporary retention
* of memory if there a large volume of pending cancelled tasks. And this can
* lead to gc/performance problems as we saw with the transaction tests.
*
* However if this automatic removal of cancelled tasks start to become a
* performance problem, it can be disabled in Java 6.
*
* For more information see the {@link com.hazelcast.util.executor.LoggingScheduledExecutor}.
*/
public static final HazelcastProperty TASK_SCHEDULER_REMOVE_ON_CANCEL =
new HazelcastProperty("hazelcast.executionservice.taskscheduler.remove.oncancel", true);

/**
* By default, search for data structures config is performed within static configuration first:
* <ul>
Expand Down
Expand Up @@ -17,22 +17,15 @@
package com.hazelcast.util.executor;

import com.hazelcast.logging.ILogger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;

import static com.hazelcast.util.Preconditions.checkNotNull;
Expand All @@ -43,102 +36,40 @@
/**
* Logs execution exceptions by overriding {@link ScheduledThreadPoolExecutor#afterExecute}
* and {@link ScheduledThreadPoolExecutor#decorateTask} methods.
*
* <p>
* Reasoning is given tasks to {@link ScheduledThreadPoolExecutor} stops silently if there is an
* execution exception.
*
* <p>
* Note: Task decoration is only needed to call given tasks {@code toString} methods.
*
* <p>
* {@link java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay}
*
* <p>
* Remove on cancel:
* To prevent running into an accumulation of cancelled task which can cause gc related problems
* (e.g. transactions in combination with LockResource eviction), it is best that tasks are
* removed from the scheduler on cancellation.
*
* <p>
* In Java 7+ the there is a method {@code ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}
* which removes the runnable on cancellation. Removal of tasks is done in logarithmic time
* (see ScheduledThreadPoolExecutor.DelayedWorkQueue.indexOf where there is a direct lookup
* instead of a linear scan over the queue). So in Java 7 we try to set this method using
* reflection. In Java 7+ the manualRemoveOnCancel is ignored, since it has an efficient removal
* of cancelled tasks and therefore it will always apply it. If we would wrap the task with the
* RemoveOnCancelFuture, it would even prevent constant time removal.
*
* In Java 6 there is no such method setRemoveOnCancelPolicy and therefore the task is
* wrapped in a RemoveOnCancelFuture which will remove itself from the work-queue
* on cancellation. However this removal isn't done in constant time, but in linear time
* because Java 6 ScheduledThreadPoolExecutor doesn't support a constant time removal.
*
* By default in Java 6, the removal of this task is done based on the 'removeOnCancel'
* property which gets its value from the
* {@link com.hazelcast.spi.properties.GroupProperty#TASK_SCHEDULER_REMOVE_ON_CANCEL}
* instead of a linear scan over the queue).
*/
public class LoggingScheduledExecutor extends ScheduledThreadPoolExecutor {

// this property is accessible for for testing purposes.
boolean manualRemoveOnCancel;
private final ILogger logger;
private volatile boolean shutdownInitiated;

public LoggingScheduledExecutor(ILogger logger, int corePoolSize, ThreadFactory threadFactory) {
this(logger, corePoolSize, threadFactory, false);
}

public LoggingScheduledExecutor(ILogger logger, int corePoolSize, ThreadFactory threadFactory,
boolean removeOnCancel) {
super(corePoolSize, threadFactory);
this.logger = checkNotNull(logger, "logger cannot be null");
this.manualRemoveOnCancel = manualRemoveOnCancel(removeOnCancel);
}

public LoggingScheduledExecutor(ILogger logger, int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this(logger, corePoolSize, threadFactory, false, handler);
setRemoveOnCancelPolicy(true);
}

public LoggingScheduledExecutor(ILogger logger, int corePoolSize, ThreadFactory threadFactory,
boolean removeOnCancel,
RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
this.logger = checkNotNull(logger, "logger cannot be null");
this.manualRemoveOnCancel = manualRemoveOnCancel(removeOnCancel);
}

private boolean manualRemoveOnCancel(boolean removeOnCancel) {
if (trySetRemoveOnCancelPolicy()) {
return false;
} else {
return removeOnCancel;
}
}

@SuppressFBWarnings("REC_CATCH_EXCEPTION")
private boolean trySetRemoveOnCancelPolicy() {
try {
Method method = ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE);
method.invoke(this, true);
return true;
} catch (Exception e) {
return false;
}
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
if (!manualRemoveOnCancel) {
return super.decorateTask(runnable, task);
}

return new RemoveOnCancelFuture<V>(runnable, task, this);
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
if (!manualRemoveOnCancel) {
return super.decorateTask(callable, task);
}

return new RemoveOnCancelFuture<V>(callable, task, this);
setRemoveOnCancelPolicy(true);
}

@Override
Expand Down Expand Up @@ -172,94 +103,4 @@ protected void afterExecute(Runnable runnable, Throwable throwable) {
public void notifyShutdownInitiated() {
shutdownInitiated = true;
}

/**
* The only goal of this task is to enable removal of the task from the executor
* when the future is cancelled.
*/
static class RemoveOnCancelFuture<V> implements RunnableScheduledFuture<V> {

private final Object task;
private final RunnableScheduledFuture<V> delegate;
private final LoggingScheduledExecutor executor;

RemoveOnCancelFuture(Object task, RunnableScheduledFuture<V> delegate, LoggingScheduledExecutor executor) {
this.task = task;
this.delegate = delegate;
this.executor = executor;
}

@Override
public boolean isPeriodic() {
return delegate.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
return delegate.getDelay(unit);
}

@Override
public void run() {
delegate.run();
}

@Override
public int compareTo(Delayed o) {
return delegate.compareTo(o);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof RemoveOnCancelFuture)) {
return false;
}
RemoveOnCancelFuture<?> that = (RemoveOnCancelFuture<?>) o;
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return delegate.hashCode();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean removeOnCancel = !executor.isShutdown();
boolean cancelled = delegate.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel) {
executor.remove(this);
}

return cancelled;
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}

@Override
public String toString() {
return "RemoveOnCancelFuture{task=" + task + '}';
}
}
}

0 comments on commit aafee4c

Please sign in to comment.