Skip to content

Commit

Permalink
Tuning the every tasks stopping process
Browse files Browse the repository at this point in the history
Already running every tasks weren't correctly stop when an agent was
dying. We have to explicitly remove the currently running tasks from the
executorservice during the agent killing process.
  • Loading branch information
ngaud committed Feb 13, 2020
1 parent 2c49996 commit aa2f379
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
Expand Up @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -172,9 +173,10 @@ class ExecutorServiceModule extends AbstractModule {
val minPoolSize = executorsConfig.minThreads
val maxPoolSize = executorsConfig.maxThreads
var max = Math::max(1, Math::min(minPoolSize, maxPoolSize))
var executor = Executors.newScheduledThreadPool(max)
var executor = Executors.newScheduledThreadPool(max)
if (rejectedExecutionHandler !== null && executor instanceof ThreadPoolExecutor) {
(executor as ThreadPoolExecutor).rejectedExecutionHandler = rejectedExecutionHandler
(executor as ThreadPoolExecutor).rejectedExecutionHandler = rejectedExecutionHandler;
(executor as ScheduledThreadPoolExecutor).removeOnCancelPolicy = true
}
return executor
}
Expand Down
Expand Up @@ -184,6 +184,14 @@ interface ExecutorService extends Service {
* Remove any canceled/terminated tasks from the lists of tasks.
*/
def purge


/**
* Removes this task from the executor's internal queue if it is present, thus causing it not to be run if it has not already started.
* @param task - the task to remove
* @return rue if the task was removed
*/
def remove(task: Runnable) : Boolean

/**
* Submit tasks to the executor service and wait for the termination of all the tasks.
Expand Down
Expand Up @@ -217,6 +217,17 @@ class JreExecutorService extends AbstractExecutorService implements PreReleasabl
scheduledExecutorService.scheduleWithFixedDelay(command.protectRunnable(logger), initialDelay, delay, unit)
}

def remove(task : Runnable) : Boolean {
var es = executorService
if (es instanceof ThreadPoolExecutor) {
es.remove(task)
}
var ses = scheduledExecutorService
if (ses instanceof ThreadPoolExecutor) {
ses.remove(task)
}
}

def purge {
var es = executorService
if (es instanceof ThreadPoolExecutor) {
Expand Down Expand Up @@ -285,5 +296,5 @@ class JreExecutorService extends AbstractExecutorService implements PreReleasabl
}

}

}
Expand Up @@ -240,8 +240,10 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
var future = description.future
description.future = null
if (future !== null && !future.done && !future.cancelled) {
// Task is running. Force its stop.
return future.cancel(true)
// Task is running. Force its stop. Canceling the not running tasks and removing the running ones
var b1 = future.cancel(true);
var b2 = this.executorService.remove(future as Runnable)
return b1 && b2
}
}
}
Expand Down

0 comments on commit aa2f379

Please sign in to comment.