Skip to content

Commit

Permalink
[sre] Fire TaskFailure event when a background task thrown an exception.
Browse files Browse the repository at this point in the history
see #1001

Signed-off-by: Stéphane Galland <galland@arakhne.org>
  • Loading branch information
gallandarakhneorg committed Nov 29, 2020
1 parent caf611f commit 74139cd
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 52 deletions.
Expand Up @@ -60,8 +60,6 @@ import org.eclipse.xtend.lib.annotations.Accessors
* <p>The Kernel is assimilated to an agent that is omniscient and distributed other the network. It is containing all the other
* agents.
*
* <p>To create a Kernel, you should use the function {@link #create(Module...)}.
*
* @author $Author: srodriguez$
* @author $Author: ngaud$
* @author $Author: sgalland$
Expand Down
Expand Up @@ -31,6 +31,7 @@ import io.sarl.sre.internal.ContextMemberEventEmitter
import io.sarl.sre.internal.SpaceEventEmitterFactory
import io.sarl.sre.internal.SpaceParticipantEventEmitterFactory
import io.sarl.sre.internal.SubHolonContextEventEmitterFactory
import io.sarl.sre.internal.TaskAgentFailureEventEmitterFactory
import io.sarl.sre.services.context.ExternalContextMemberListener
import io.sarl.sre.services.context.InternalContextMembershipListenerFactory
import io.sarl.sre.services.context.SpaceRepositoryListenerFactory
Expand All @@ -39,6 +40,7 @@ import io.sarl.sre.services.logging.LoggingService
import io.sarl.sre.spaces.SpaceParticipantListenerFactory
import javax.inject.Provider
import javax.inject.Singleton
import io.sarl.sre.skills.bic.FailureListenerFactory

/**
* Module for configuring the methods for firing the specific events that are defined into the SARL API.
Expand Down Expand Up @@ -87,6 +89,14 @@ class PlatformEventEmitterModule extends AbstractModule {
static def provideLifecycleServiceListener(logging : Provider<LoggingService>) : LifecycleServiceListener {
new AgentEventEmitter(logging.get.getKernelLogger)
}


@Provides
@KernelScope
@Singleton
static def provideFailureListenerFactory : FailureListenerFactory {
new TaskAgentFailureEventEmitterFactory
}

}

Expand Down
Expand Up @@ -23,6 +23,7 @@ package io.sarl.sre.internal

import io.sarl.core.AgentKilled
import io.sarl.core.AgentSpawned
import io.sarl.core.AgentTask
import io.sarl.core.Behaviors
import io.sarl.core.ContextJoined
import io.sarl.core.ContextLeft
Expand All @@ -33,6 +34,7 @@ import io.sarl.core.ParticipantJoined
import io.sarl.core.ParticipantLeft
import io.sarl.core.SpaceCreated
import io.sarl.core.SpaceDestroyed
import io.sarl.core.TaskFailure
import io.sarl.lang.annotation.PrivateAPI
import io.sarl.lang.core.Address
import io.sarl.lang.core.Agent
Expand All @@ -50,16 +52,18 @@ import io.sarl.sre.services.context.SpaceRepositoryListener
import io.sarl.sre.services.context.SpaceRepositoryListenerFactory
import io.sarl.sre.services.lifecycle.ContextReference
import io.sarl.sre.services.lifecycle.LifecycleServiceListener
import io.sarl.sre.skills.bic.FailureListener
import io.sarl.sre.skills.bic.FailureListenerFactory
import io.sarl.sre.spaces.Participant
import io.sarl.sre.spaces.SpaceParticipantListener
import io.sarl.sre.spaces.SpaceParticipantListenerFactory
import java.lang.ref.WeakReference
import java.text.MessageFormat
import java.util.UUID
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.logging.Logger

import static io.sarl.sre.services.lifecycle.AgentLife.*
import java.util.concurrent.ConcurrentLinkedDeque

/** Emit the space platform events.
*
Expand Down Expand Up @@ -381,9 +385,65 @@ class SubHolonContextEventEmitter implements InternalContextMembershipListener {
*/
class SubHolonContextEventEmitterFactory implements InternalContextMembershipListenerFactory {

def create(owner : Agent) : InternalContextMembershipListener {
override create(owner : Agent) : InternalContextMembershipListener {
new SubHolonContextEventEmitter(owner)
}

}

/**
* Events related to agent tasks' failures.
*
* @author $Author: sgalland$
* @version $FullVersion$
* @mavengroupid $GroupId$
* @mavenartifactid $ArtifactId$
* @since 0.12
*/
@SuppressWarnings("use_reserved_sarl_annotation")
@PrivateAPI(isCallerOnly = true)
class TaskAgentFailureEventEmitter implements FailureListener {

val owner : WeakReference<Agent>

var behaviorSkill : AtomicSkillReference

new (owner : Agent) {
this.owner = new WeakReference(owner)
}

private def getBehaviors : Behaviors {
var o = this.owner.get
if (o !== null) {
if (this.behaviorSkill?.get === null) {
this.behaviorSkill = SREutils::getInternalSkillReference(o, typeof(Behaviors))
}
return SREutils::castInternalSkillReference(o, this.behaviorSkill, typeof(Behaviors))
}
return null
}

override taskFailed(task : AgentTask, cause : Throwable) {
val ^event = new TaskFailure(task, cause)
this.behaviors?.wake(^event)
}

}

/**
* Factory of listener for events related to agent tasks' failures.
*
* @author $Author: sgalland$
* @version $FullVersion$
* @mavengroupid $GroupId$
* @mavenartifactid $ArtifactId$
* @since 0.12
*/
class TaskAgentFailureEventEmitterFactory implements FailureListenerFactory {

override create(owner : Agent) : FailureListener {
new TaskAgentFailureEventEmitter(owner)
}

}

Expand Up @@ -112,9 +112,12 @@ abstract class SreExecutable {
* The default implementation does nothing.
*
* @param error the error.
* @return {@code true} if the error should be thrown in the current thread.
* {@code false} if the exception is not thrown.
* @since 0.11
*/
protected def onError(error : Throwable) {
protected def onError(error : Throwable) : boolean {
return true
}

/** Invoked when the task is finished whatever it is successful or failing.
Expand Down Expand Up @@ -203,13 +206,12 @@ class SreRunnable extends SreExecutable implements Runnable {
// Clear the interrupted flag
Thread::interrupted
} else {
this.exception = cause;
this.exception = cause
val doThrown = onError(cause)
val log = getLogger
if (log !== null) {
log.log(Level::SEVERE, cause) [cause.exceptionMessage]
}
onError(cause)
if (log === null) {
} else if (doThrown) {
throw cause
}
}
Expand Down Expand Up @@ -307,13 +309,12 @@ class SreCallable<T> extends SreExecutable implements Callable<T> {
} else if (cause instanceof InterruptedException) {
// Ignore this exception
} else {
this.exception = cause;
this.exception = cause
val doThrown = onError(cause)
val log = getLogger
if (log !== null) {
log.log(Level::SEVERE, cause) [cause.exceptionMessage]
}
onError(cause)
if (log === null) {
} else if (doThrown) {
throw cause
}
}
Expand Down Expand Up @@ -404,13 +405,12 @@ class SreConsumer<T> extends SreExecutable implements Consumer<T> {
} else if (cause instanceof InterruptedException) {
// Ignore this exception
} else {
this.exception = cause;
this.exception = cause
val doThrown = onError(cause)
val log = getLogger
if (log !== null) {
log.log(Level::SEVERE, cause) [cause.exceptionMessage]
}
onError(cause)
if (log === null) {
} else if (doThrown) {
throw cause
}
}
Expand Down

0 comments on commit 74139cd

Please sign in to comment.