Skip to content

Commit

Permalink
[sre] InternaleSchedules capacity provides the executeBlockingTasks f…
Browse files Browse the repository at this point in the history
…unction.

Signed-off-by: Stéphane Galland <galland@arakhne.org>
  • Loading branch information
gallandarakhneorg committed Mar 24, 2020
1 parent 2f408bf commit 613b7b0
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 83 deletions.
Expand Up @@ -23,6 +23,7 @@ package io.sarl.sre.capacities
import io.sarl.core.Schedules
import io.sarl.lang.core.Behavior
import io.sarl.core.AgentTask
import java.util.Collection

/**
* Capacity for executing tasks with specific functions for the SRE platform.
Expand Down Expand Up @@ -55,4 +56,20 @@ capacity InternalSchedules extends Schedules {
*/
def executeAsap(task : Runnable) : AgentTask

/**
* Submit tasks to the executor service and wait for the termination of all the tasks.
* This function ensures that the caller's thread is blocked until all the given tasks have been finished.
*
* <p>According to the implementation of the service, the given tasks may be run in the same or separated thread
* than the one of the caller.
*
* <p>If an exception occurs into the given task, the exception is thrown if {@code thrownExceptions}
* evaluates to {@code true}. If it is evaluates to {@code false}, the exception is logged.
*
* @param task the task to submit.
* @param thrownExceptions indicates if the exceptions in the given tasks are thrown forward by this function.
* @since 0.11
*/
def executeBlockingTasks(task : Collection<Runnable>, thrownExceptions : boolean = false)

}
Expand Up @@ -125,21 +125,7 @@ abstract class AbstractExecutorService extends AbstractSreService implements Exe
try {
doneSignal.await
} catch (ex : InterruptedException) {
// XXX: Improve because:
// This exception occurs when one of the launched task kills the agent before all the
// submitted tasks are finished. Keep in mind that killing an agent should kill the
// launched tasks.
// Example of code that is generating this issue:
//
// on Initialize {
// in (100) [
// killMe
// ]
// }
//
// In this example, the killMe is launched before the Initialize code is finished;
// and because the Initialize event is fired through the current function, it
// causes the InterruptedException.

}
}

Expand Down
Expand Up @@ -131,7 +131,7 @@ final class AgentLife {
* @return the logger.
*/
protected final def getAgentLogger : Logging {
// This imlementation is lock-free because it is assumed that the skill definition will not change a lot.
// This implementation is lock-free because it is assumed that the skill definition will not change a lot.
// The code is inspired from the similar code that is generated by the SARL compiler.
var capRef = this.loggingCapacity
if (capRef === null ||
Expand All @@ -146,8 +146,8 @@ final class AgentLife {
*
* @return the event bus.
*/
final def getEventBus : InternalEventBusCapacity {
// This imlementation is lock-free because it is assumed that the skill definition will not change a lot.
protected final def getEventBus : InternalEventBusCapacity {
// This implementation is lock-free because it is assumed that the skill definition will not change a lot.
// The code is inspired from the similar code that is generated by the SARL compiler.
var capRef = this.eventBusCapacity
if (capRef === null || capRef.get() === null) {
Expand Down
Expand Up @@ -33,8 +33,7 @@ import io.sarl.sre.capacities.InternalSchedules
import io.sarl.sre.services.executor.ExecutorService
import io.sarl.sre.services.executor.SreRunnable
import java.lang.ref.WeakReference
import java.util.Map
import java.util.UUID
import java.util.Collection
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.ExecutionException
Expand Down Expand Up @@ -86,31 +85,23 @@ skill SchedulesSkill extends Skill implements InternalSchedules {

protected override prepareUninstallation {
// Cancel the tasks as soon as possible in the uninstallation process
cancelAllRunningTasks
// The future submitted tasks will not be cancelled in order to let "on Destroy"
// to be run.
cancelAllRunningTasks(false)
}

protected override uninstall {
// Cancel the tasks that were creating during the destruction stage (in the Destroy event handler)
cancelAllRunningTasks
}

/** Replies the map that store the active tasks.
* If the map is not created before, this function creates it.
*
* <p>This function is not thread-safe.
*/
private def getRepository : Map<String, TaskDescription> {
if (this.activeTaskRepository === null) {
this.activeTaskRepository = new ConcurrentHashMap<String,TaskDescription>
}
this.activeTaskRepository
cancelAllRunningTasks(true)
}

/** Cancel all the running tasks.
*
* <p>This function is not thread-safe.
*
* @param interruptThreads indicates if the threads must be interrupted or not.
*/
private def cancelAllRunningTasks {
private def cancelAllRunningTasks(interruptThreads : boolean) {
var activeTasks = this.activeTaskRepository
this.activeTaskRepository = null
if (activeTasks !== null) {
Expand All @@ -119,13 +110,34 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
if (description !== null) {
var task = description.task
if (task !== null) {
finishTask(task, true, false, false)
finishTask(task, true, interruptThreads, false, true)
}
}
}
}
}

/** Replies the map that store the active tasks.
* If the map is not created before, this function creates it.
*
* <p>This function is not thread-safe.
*/
private def getRepository : ConcurrentHashMap<String, TaskDescription> {
if (this.activeTaskRepository === null) {
this.activeTaskRepository = new ConcurrentHashMap<String,TaskDescription>
}
return this.activeTaskRepository
}

private def ensureTaskName(name : String = null) : String {
if (name.isNullOrEmpty) {
// see http://www.cowtowncoder.com/blog/archives/2010/10/entry_429.html
// Don't use UUID::randomUUID way too slow
return "t" + this.taskCountID.incrementAndGet
}
return name
}

/** Create a task.
* If the task was already launched, this function replies the active task.
*
Expand All @@ -143,22 +155,14 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
updateBehaviorReference : boolean) : TaskDescription {

var description : TaskDescription = null
var realName : String
if (name.isNullOrEmpty) {
// see http://www.cowtowncoder.com/blog/archives/2010/10/entry_429.html
// Don't use UUID::randomUUID way too slow
taskCountID.incrementAndGet
realName = "task-" + taskCountID //+ uuidTimeBasedGenerator.generate().toString
} else {
realName = name

description = this.activeTaskRepository?.get(realName)

var realName = name.ensureTaskName
if (realName === name) {
description = getRepository.get(realName)
}

if (description === null) {
val caller = Capacities::getCaller ?: this.owner
val task = new AgentTask(caller)
val task = new AgentTask(realName, caller)
task.taskName = realName
task.guard = AgentTask::TRUE_GUARD
description = new TaskDescription(task)
Expand Down Expand Up @@ -207,7 +211,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
SREutils::setSreSpecificData(newTask, description, typeof(TaskDescription))
val name = newTask.name
if (name.isNullOrEmpty) {
newTask.taskName = "task-" + UUID::randomUUID.toString
newTask.taskName = ensureTaskName
}
description.task = newTask
description.future = null
Expand All @@ -222,12 +226,14 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
*
* @param name the task.
* @param cancelTask indicates if the task should be canceled by this function.
* @param forceThreadInterruption if {@code cancelTask} evaluates to {@code true}, this parameter permits
* to enable or disable the interruption of the thread's task.
* @param updateTaskList indicates if the task list must be updated by removing the task.
* @param updateBehaviorReference indicates if the behavior reference to the task must be updated by removing the reference.
* @return {@code true} if the task was successfully finished.
*/
private def finishTask(task : AgentTask, cancelTask : boolean, updateTaskList : boolean,
updateBehaviorReference : boolean) : boolean {
private def finishTask(task : AgentTask, cancelTask : boolean, forceThreadInterruption : boolean,
updateTaskList : boolean, updateBehaviorReference : boolean) : boolean {
assert task !== null
// Remove the reference of the behavior to the task.
if (updateBehaviorReference) {
Expand All @@ -236,7 +242,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
// Remove the task from the global list.
var description : TaskDescription = null
if (updateTaskList && !task.name.nullOrEmpty) {
description = this.activeTaskRepository?.remove(task.name)
description = getRepository.remove(task.name)
}
if (cancelTask) {
// Force the stop of the task.
Expand All @@ -249,7 +255,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
description.future = null
if (future !== null && !future.done && !future.cancelled) {
// Task is running. Force its stop. Canceling the not running tasks and removing the running ones
var b1 = future.cancel(true);
var b1 = future.cancel(forceThreadInterruption);
if (b1) {
if (future instanceof Runnable) {
this.executorService.remove(future)
Expand All @@ -274,7 +280,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
}
var description = SREutils::getSreSpecificData(task, typeof(TaskDescription))
if (description === null && !task.name.nullOrEmpty) {
description = this.activeTaskRepository?.get(task.name)
description = getRepository.get(task.name)
}
return description
}
Expand All @@ -297,7 +303,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
description = null
} else {

description = this.activeTaskRepository?.get(task.name)
description = getRepository.get(task.name)

}
var registered = description !== null
Expand Down Expand Up @@ -341,7 +347,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
if (tasksToCancel !== null) {
for (taskToCancel : tasksToCancel.map[it.get]) {
if (taskToCancel !== null) {
taskToCancel.finishTask(true, true, false)
taskToCancel.finishTask(true, true, true, false)
}
}
}
Expand All @@ -362,31 +368,20 @@ skill SchedulesSkill extends Skill implements InternalSchedules {

def setName(task : AgentTask, name : String) {
if (getLife(owner).state.alive) {
val realName = if(name.isNullOrEmpty) "task-" + UUID::randomUUID.toString else name
val realName = name.ensureTaskName
val prefix = realName + "-"
var i = 0
var nm = realName
var atr : ConcurrentHashMap<String, TaskDescription>

atr = this.activeTaskRepository

if (atr !== null) {
val atr = getRepository

atr = this.activeTaskRepository
if (atr !== null) {
var desc = atr.remove(task.name)
if (desc !== null) {
while (atr.containsKey(nm)) {
i++
nm = prefix + i
}
task.taskName = nm
atr.put(nm, desc)
}
} else {
task.taskName = nm
var desc = atr.remove(task.name)
if (desc !== null) {
while (atr.containsKey(nm)) {
i++
nm = prefix + i
}

task.taskName = nm
atr.put(nm, desc)
} else {
task.taskName = nm
}
Expand All @@ -395,7 +390,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {

def executeAsap(task : Runnable) : AgentTask {
if (task !== null) {
var description = preRunTask(null) [task.run]
var description = preRunTask(null)[task.run]
var logger = getLogger
val future = this.executorService.executeAsap(logger,
new SingleRunner(this, this.owner, description, logger))
Expand All @@ -404,6 +399,10 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
}
}

def executeBlockingTasks(task : Collection<Runnable>, thrownExceptions : boolean) {
this.executorService.executeBlockingTasks(logger, thrownExceptions, task)
}

def execute(task : AgentTask = null, procedure : (Agent)=>void) : AgentTask {
var description = preRunTask(task, procedure)
var logger = getLogger
Expand Down Expand Up @@ -480,18 +479,15 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
if (description !== null) {
var future = description.future
if (future !== null && !future.done && !future.cancelled && future.cancel(mayInterruptIfRunning)) {
return finishTask(task, true, true, true)
return finishTask(task, true, true, true, true)
}
}
}
return false
}

def getActiveTasks : ConcurrentSkipListSet<String> {
if (this.activeTaskRepository !== null) {
return new ConcurrentSkipListSet<String>(this.activeTaskRepository.keySet)
}
return new ConcurrentSkipListSet<String>();
return new ConcurrentSkipListSet<String>(getRepository.keySet)
}

/**
Expand Down Expand Up @@ -712,7 +708,7 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
assert task !== null
val ^skill = this.^skill?.get
assert ^skill !== null
^skill.finishTask(task, false, true, true);
^skill.finishTask(task, false, false, true, true);
}
}

Expand Down

0 comments on commit 613b7b0

Please sign in to comment.