From aa2f37987afe854b2415a167fee265c2a64b188f Mon Sep 17 00:00:00 2001 From: Nicolas Gaud Date: Thu, 13 Feb 2020 10:57:31 +0100 Subject: [PATCH] Tuning the every tasks stopping process Already running every tasks weren't correctly stop when an agent was dying. We have to explicitly remove the currently running tasks from the executorservice during the agent killing process. --- .../internal/services/ExecutorServiceModule.sarl | 6 ++++-- .../sarl/sre/services/executor/ExecutorService.sarl | 8 ++++++++ .../sre/services/executor/JreExecutorService.sarl | 13 ++++++++++++- .../src/io/sarl/sre/skills/bic/SchedulesSkill.sarl | 6 ++++-- 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/boot/internal/services/ExecutorServiceModule.sarl b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/boot/internal/services/ExecutorServiceModule.sarl index 61d9b84886..8f610b011e 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/boot/internal/services/ExecutorServiceModule.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/boot/internal/services/ExecutorServiceModule.sarl @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -172,9 +173,10 @@ class ExecutorServiceModule extends AbstractModule { val minPoolSize = executorsConfig.minThreads val maxPoolSize = executorsConfig.maxThreads var max = Math::max(1, Math::min(minPoolSize, maxPoolSize)) - var executor = Executors.newScheduledThreadPool(max) + var executor = Executors.newScheduledThreadPool(max) if (rejectedExecutionHandler !== null && executor instanceof ThreadPoolExecutor) { - (executor as ThreadPoolExecutor).rejectedExecutionHandler = rejectedExecutionHandler + (executor as ThreadPoolExecutor).rejectedExecutionHandler = rejectedExecutionHandler; + (executor as ScheduledThreadPoolExecutor).removeOnCancelPolicy = true } return executor } diff --git a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/ExecutorService.sarl b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/ExecutorService.sarl index a0dd92b93b..75282da72d 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/ExecutorService.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/ExecutorService.sarl @@ -184,6 +184,14 @@ interface ExecutorService extends Service { * Remove any canceled/terminated tasks from the lists of tasks. */ def purge + + + /** + * Removes this task from the executor's internal queue if it is present, thus causing it not to be run if it has not already started. + * @param task - the task to remove + * @return rue if the task was removed + */ + def remove(task: Runnable) : Boolean /** * Submit tasks to the executor service and wait for the termination of all the tasks. diff --git a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/JreExecutorService.sarl b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/JreExecutorService.sarl index 0dd4882875..b849fdba1c 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/JreExecutorService.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/services/executor/JreExecutorService.sarl @@ -217,6 +217,17 @@ class JreExecutorService extends AbstractExecutorService implements PreReleasabl scheduledExecutorService.scheduleWithFixedDelay(command.protectRunnable(logger), initialDelay, delay, unit) } + def remove(task : Runnable) : Boolean { + var es = executorService + if (es instanceof ThreadPoolExecutor) { + es.remove(task) + } + var ses = scheduledExecutorService + if (ses instanceof ThreadPoolExecutor) { + ses.remove(task) + } + } + def purge { var es = executorService if (es instanceof ThreadPoolExecutor) { @@ -285,5 +296,5 @@ class JreExecutorService extends AbstractExecutorService implements PreReleasabl } } - + } diff --git a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/skills/bic/SchedulesSkill.sarl b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/skills/bic/SchedulesSkill.sarl index cc31a5a0a9..707b69c5e1 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/skills/bic/SchedulesSkill.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/io/sarl/sre/skills/bic/SchedulesSkill.sarl @@ -240,8 +240,10 @@ skill SchedulesSkill extends Skill implements InternalSchedules { var future = description.future description.future = null if (future !== null && !future.done && !future.cancelled) { - // Task is running. Force its stop. - return future.cancel(true) + // Task is running. Force its stop. Canceling the not running tasks and removing the running ones + var b1 = future.cancel(true); + var b2 = this.executorService.remove(future as Runnable) + return b1 && b2 } } }