Skip to content

Commit

Permalink
[sre] Parallel execution of the "on" handlers is now done through the…
Browse files Browse the repository at this point in the history
… agent's Schedules capacity.

Signed-off-by: Stéphane Galland <galland@arakhne.org>
  • Loading branch information
gallandarakhneorg committed Feb 18, 2020
1 parent d01c609 commit 5343414
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 65 deletions.
Expand Up @@ -21,14 +21,12 @@
package io.sarl.sre.boot.internal.skills

import com.google.inject.AbstractModule
import com.google.inject.Injector
import com.google.inject.Provides
import io.sarl.sre.services.executor.ExecutorService
import io.sarl.sre.skills.internal.EventBus
import javax.inject.Provider
import io.bootique.BQModuleProvider
import com.google.inject.Module
import io.bootique.BQModule
import io.bootique.BQModuleProvider
import io.sarl.sre.skills.internal.DefaultEventBusFactory
import io.sarl.sre.skills.internal.EventBusFactory
import javax.inject.Singleton

/**
* Module for configuring the agents' event buses.
Expand All @@ -41,16 +39,7 @@ import io.bootique.BQModule
class EventBusModule extends AbstractModule {

protected override configure {
//
}

@Provides
def provideEventBus(executorService : Provider<ExecutorService>, injector : Injector) : EventBus {
// Ensure a new instance is created at each injection.
val aeb = new EventBus(executorService.get)
// to be able to inject the ExecutorService and SubscriberFindingStrategy
injector.injectMembers(aeb)
return aeb
typeof(EventBusFactory).bind.to(typeof(DefaultEventBusFactory)).in(typeof(Singleton))
}

}
Expand Down
Expand Up @@ -25,7 +25,7 @@ import io.sarl.sre.services.lifecycle.AgentState
import io.sarl.sre.skills.internal.EventBus
import java.util.concurrent.ConcurrentLinkedDeque

/**
/**
* Capacity that provides an event bus to notify the different components of an agent.
*
* <p><strong>This capacity is provided by the SRE kernel, not SARL.</strong>
Expand Down
Expand Up @@ -22,6 +22,7 @@ package io.sarl.sre.capacities

import io.sarl.core.Schedules
import io.sarl.lang.core.Behavior
import io.sarl.core.AgentTask

/**
* Capacity for executing tasks with specific functions for the SRE platform.
Expand All @@ -30,7 +31,7 @@ import io.sarl.lang.core.Behavior
* @version $FullVersion$
* @mavengroupid $GroupId$
* @mavenartifactid $ArtifactId$
* @since 0.6.0
* @since 0.6
*/
capacity InternalSchedules extends Schedules {

Expand All @@ -46,4 +47,12 @@ capacity InternalSchedules extends Schedules {
*/
def releaseInternalResources(^behavior : Behavior)

/** Submit the given task for being run as soon as possible.
*
* @param task the task to run.
* @return the definition of the task reference.
* @since 0.11
*/
def executeAsap(task : Runnable) : AgentTask

}
Expand Up @@ -384,6 +384,17 @@ skill SchedulesSkill extends Skill implements InternalSchedules {
}
}

def executeAsap(task : Runnable) : AgentTask {
if (task !== null) {
var description = preRunTask(null) [task.run]
var logger = getLogger
val future = this.executorService.executeAsap(logger,
new SingleRunner(this, this.owner, description, logger))
description = postRunTask(description, null, future)
return description.task
}
}

def execute(task : AgentTask = null, procedure : (Agent)=>void) : AgentTask {
var description = preRunTask(task, procedure)
var logger = getLogger
Expand Down
@@ -0,0 +1,42 @@
/*
* $Id$
*
* SARL is an general-purpose agent programming language.
* More details on http://www.sarl.io
*
* Copyright (C) 2014-2020 the original authors or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.sarl.sre.skills.internal

import io.sarl.sre.capacities.InternalSchedules
import io.sarl.sre.internal.BehaviorGuardEvaluatorRegistry
import java.util.function.Supplier

/**
* Factory of event bus.
*
* @author $Author: sgalland$
* @version $FullVersion$
* @mavengroupid $GroupId$
* @mavenartifactid $ArtifactId$
* @since 0.11
*/
class DefaultEventBusFactory implements EventBusFactory {

override createEventBus(provider : Supplier<InternalSchedules>) : EventBus {
new EventBus(provider, new BehaviorGuardEvaluatorRegistry)
}

}
Expand Up @@ -21,17 +21,17 @@
package io.sarl.sre.skills.internal

import io.sarl.lang.core.Event
import io.sarl.sre.capacities.InternalSchedules
import io.sarl.sre.internal.BehaviorGuardEvaluator
import io.sarl.sre.internal.BehaviorGuardEvaluatorRegistry
import io.sarl.sre.services.executor.EarlyExitException
import io.sarl.sre.services.executor.ExecutorService
import io.sarl.sre.services.executor.SreRunnable
import java.util.Collection
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.function.Supplier
import java.util.logging.Logger
import javax.inject.Inject
import org.arakhne.afc.util.OutputParameter

/**
Expand All @@ -54,35 +54,32 @@ class EventBus {
val behaviorGuardEvaluatorRegistry : BehaviorGuardEvaluatorRegistry

/**
* The executor used to execute behavior methods in dedicated thread.
* The skill that gives access to the scheduling capacity of the agent
*/
val executor : ExecutorService
val taskSchedulerSupplier : Supplier<InternalSchedules>

/**
* Instantiates a dispatcher.
*
* @param executor the executor service.
* @param provider of synchronization locks.
*/
@Inject
new (executor : ExecutorService) {
this(executor, new BehaviorGuardEvaluatorRegistry)
}

/**
* Instantiates a dispatcher.
*
* @param executor the executor service.
* @param provider of synchronization locks.
* @param taskScheduler the scheduler of agent tasks.
* @param dispatcher the event dispatcher.
*/
new (executor : ExecutorService, dispatcher : BehaviorGuardEvaluatorRegistry) {
assert executor !== null
new (taskScheduler : Supplier<InternalSchedules>, dispatcher : BehaviorGuardEvaluatorRegistry) {
assert taskScheduler !== null
assert dispatcher !== null
this.executor = executor
this.taskSchedulerSupplier = taskScheduler
this.behaviorGuardEvaluatorRegistry = dispatcher
}

/** Replies the agent's task scheduler that must be used by the event bus.
*
* @return the executor.
* @since 0.11
*/
def getExecutor : InternalSchedules {
this.taskSchedulerSupplier.get
}

/** Replies if a listener with the given type is registered.
*
* @param type the type of listener.
Expand Down Expand Up @@ -117,9 +114,7 @@ class EventBus {
* @param callback function which is invoked just after the first registration of the object. It could be {@code null}.
*/
def register(object : Object, filter : (Event)=>boolean, callback : (Object)=>void) {

this.behaviorGuardEvaluatorRegistry.register(object, filter, callback)

}

/**
Expand Down Expand Up @@ -224,7 +219,7 @@ class EventBus {
// Could be null when the corresponding events is not listen by an agent, i.e. system event like ParticpantJoined
var behaviorsMethodsToExecute = ^event.evaluateGuards(behaviorGuardEvaluators, logger)
if (behaviorsMethodsToExecute !== null && !behaviorsMethodsToExecute.empty) {
behaviorsMethodsToExecute.executeAsynchronouslyBehaviorMethods(logger)
behaviorsMethodsToExecute.executeAsynchronouslyBehaviorMethods
}
}
]
Expand Down Expand Up @@ -413,16 +408,15 @@ class EventBus {
*
* <p>Errors are logged by the executor service. They are not thrown by this function.
*
* @param logger the logger to use for notifying the errors.
* @param behaviorsMethodsToExecute the collection of Behaviors runnable that must be executed.
*/
protected def executeAsynchronouslyBehaviorMethods(behaviorsMethodsToExecute : Collection<Runnable>,
logger : Logger) {
protected def executeAsynchronouslyBehaviorMethods(behaviorsMethodsToExecute : Collection<Runnable>) {
assert behaviorsMethodsToExecute !== null
assert behaviorsMethodsToExecute.size() > 0

val exec = this.executor
for (runnable : behaviorsMethodsToExecute) {
this.executor.executeAsap(logger, runnable)
exec.executeAsap(runnable)
}
}

Expand Down
@@ -0,0 +1,40 @@
/*
* $Id$
*
* SARL is an general-purpose agent programming language.
* More details on http://www.sarl.io
*
* Copyright (C) 2014-2020 the original authors or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.sarl.sre.skills.internal

import io.sarl.sre.capacities.InternalSchedules
import java.util.function.Supplier

/**
* Factory of event bus.
*
* @author $Author: sgalland$
* @version $FullVersion$
* @mavengroupid $GroupId$
* @mavenartifactid $ArtifactId$
* @since 0.11
*/
interface EventBusFactory {

/** Create the event bus. */
def createEventBus(provider : Supplier<InternalSchedules>) : EventBus

}
Expand Up @@ -21,11 +21,15 @@
package io.sarl.sre.skills.internal

import io.sarl.core.Logging
import io.sarl.lang.annotation.PrivateAPI
import io.sarl.lang.core.Agent
import io.sarl.lang.core.Event
import io.sarl.lang.core.SREutils
import io.sarl.lang.core.Skill
import io.sarl.lang.util.ClearableReference
import io.sarl.sre.capacities.InformedEventListener
import io.sarl.sre.capacities.InternalEventBusCapacity
import io.sarl.sre.capacities.InternalSchedules
import java.util.UUID
import java.util.concurrent.ConcurrentLinkedDeque
import javax.inject.Inject
Expand Down Expand Up @@ -56,8 +60,26 @@ skill InternalEventBusSkill extends Skill implements InternalEventBusCapacity {

var eventBuffer : ConcurrentLinkedDeque<Event> = null

var scheduleSkillReference : ClearableReference<InternalSchedules>

@SuppressWarnings("raw_type")
@Inject
@PrivateAPI(isCallerOnly = true)
new (busFactory : EventBusFactory) {
this.eventBus = busFactory.createEventBus [
var sk = this.scheduleSkillReference?.get
if (sk === null) {
var ref : ClearableReference = SREutils::getInternalSkillReference(getOwner, typeof(InternalSchedules))
assert ref !== null
this.scheduleSkillReference = ref as ClearableReference<InternalSchedules>
sk = this.scheduleSkillReference.get
}
return sk
]
}

new (bus : EventBus) {
assert bus !== null
this.eventBus = bus
}

Expand Down
@@ -0,0 +1,54 @@
/*
* $Id$
*
* SARL is an general-purpose agent programming language.
* More details on http://www.sarl.io
*
* Copyright (C) 2014-2020 the original authors or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sarl.sre.tests.runtime.spaces.mocks

import io.sarl.core.Behaviors
import io.sarl.core.Lifecycle
import io.sarl.core.MemberLeft
import io.sarl.core.Schedules
import io.sarl.sre.tests.testutils.TestingAgent
import io.sarl.core.InnerContextAccess

/**
* @author $Author: sgalland$
* @version $FullVersion$
* @mavengroupid $GroupId$
* @mavenartifactid $ArtifactId$
*/
agent SourceAgent2 extends TestingAgent {

uses Behaviors, Lifecycle, Schedules, InnerContextAccess

def runAgentTest : RunPolicy {
var id = typeof(TargetAgent).spawnInContext(innerContext, agentInitializationParameters)
addResult(id)
in(1000) [
wake(new TestEvent(ID))
]
return RunPolicy::STANDARD
}

on MemberLeft {
killMe
}

}
Expand Up @@ -74,6 +74,10 @@ skill MyInternalSchedulesSkill implements InternalSchedules {
null
}

override executeAsap(task : Runnable) : AgentTask {
null
}

override unregisterTasksForBehavior(^behavior : Behavior) {
}

Expand Down

0 comments on commit 5343414

Please sign in to comment.