Skip to content

Commit

Permalink
[sre] Evaluate guards in parallel.
Browse files Browse the repository at this point in the history
Signed-off-by: Stéphane Galland <galland@arakhne.org>
  • Loading branch information
gallandarakhneorg committed Sep 10, 2016
1 parent e497d90 commit 0ba0167
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 323 deletions.
Expand Up @@ -31,6 +31,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.arakhne.afc.util.MultiCollection;
import org.eclipse.xtext.xbase.lib.Pair;

import io.sarl.lang.core.DeadEvent;
Expand Down Expand Up @@ -163,30 +164,27 @@ public void immediateDispatch(Event event) {
* </p>
* @param event - an event to dispatch asynchronously.
*/
@SuppressWarnings("synthetic-access")
public void asyncDispatch(Event event) {
@Override
public void run() {
Collection<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
synchronized (AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry) {
behaviorGuardEvaluators = AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry
.getBehaviorGuardEvaluators(event);
}
if (behaviorGuardEvaluators != null && !behaviorGuardEvaluators.isEmpty()) {

final Collection<Runnable> behaviorsMethodsToExecute;
try {
behaviorsMethodsToExecute = evaluateGuards(event, behaviorGuardEvaluators);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
executeAsynchronouslyBehaviorMethods(event, behaviorsMethodsToExecute);
assert event != null;
this.executor.execute(() -> {
Collection<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
synchronized (AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry) {
behaviorGuardEvaluators = AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry
.getBehaviorGuardEvaluators(event);
}
if (behaviorGuardEvaluators != null && !behaviorGuardEvaluators.isEmpty()) {

} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
asyncDispatch(new DeadEvent(event));
final Collection<Runnable> behaviorsMethodsToExecute;
try {
behaviorsMethodsToExecute = evaluateGuards(event, behaviorGuardEvaluators);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
executeAsynchronouslyBehaviorMethods(event, behaviorsMethodsToExecute);

} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
asyncDispatch(new DeadEvent(event));
}
});
}
Expand All @@ -203,14 +201,26 @@ public void run() {
private static Collection<Runnable> evaluateGuards(final Object event,
final Collection<BehaviorGuardEvaluator> behaviorGuardEvaluators) throws InvocationTargetException {

final Collection<Runnable> behaviorsMethodsToExecute = Lists.newLinkedList();
final MultiCollection<Runnable> behaviorsMethodsToExecute = new MultiCollection<>();

Collection<Runnable> behaviorsMethodsToExecutePerTarget = null;
for (final BehaviorGuardEvaluator evaluator : behaviorGuardEvaluators) {
// TODO Maybe we can parallelize this loop, could be interesting when the number of guardEvaluators increase
behaviorsMethodsToExecutePerTarget = Lists.newLinkedList();
evaluator.evaluateGuard(event, behaviorsMethodsToExecutePerTarget);
behaviorsMethodsToExecute.addAll(behaviorsMethodsToExecutePerTarget);
try {
behaviorGuardEvaluators.parallelStream().forEach((evaluator) -> {
try {
final Collection<Runnable> behaviorsMethodsToExecutePerTarget = Lists.newLinkedList();
evaluator.evaluateGuard(event, behaviorsMethodsToExecutePerTarget);
synchronized (behaviorsMethodsToExecute) {
behaviorsMethodsToExecute.addCollection(behaviorsMethodsToExecutePerTarget);
}
} catch (InvocationTargetException exception) {
throw new RuntimeException(exception);
}
});
} catch (RuntimeException exception) {
final Throwable t = exception.getCause();
if (t instanceof InvocationTargetException) {
throw (InvocationTargetException) t;
}
throw exception;
}

return behaviorsMethodsToExecute;
Expand All @@ -231,49 +241,17 @@ private void executeBehaviorMethodsInParalellWithSynchroAtTheEnd(Event event, Co
final CountDownLatch doneSignal = new CountDownLatch(behaviorsMethodsToExecute.size());

for (final Runnable runnable : behaviorsMethodsToExecute) {
this.executor.execute(new Runnable() {

@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
doneSignal.countDown();
}
this.executor.execute(() -> {
try {
runnable.run();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
doneSignal.countDown();
}

});
}

/*Queue<Pair<Event, Collection<Runnable>>> queueForThread = this.queue.get();
queueForThread.offer(new Pair<>(event, behaviorsMethodsToExecute));
if (!this.dispatching.get().booleanValue()) {
this.dispatching.set(Boolean.TRUE);
try {
Pair<Event, Collection<Runnable>> nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
for (Runnable runnable : nextEvent.getValue()) {
this.executor.execute(new Runnable() {
@Override
public void run() {
runnable.run();
doneSignal.countDown();
}
});
}
}
} finally {
this.dispatching.remove();
this.queue.remove();
}
}*/

// Wait for all Behaviors runnable to complete before continuing
doneSignal.await();
}
Expand Down
Expand Up @@ -88,7 +88,6 @@ private void invokeBehaviorGuardEvaluatorMethod(Object event, Collection<Runnabl
try {
this.method.setAccessible(true);
this.method.invoke(this.target, event, behaviorsMethodsToExecute);
this.method.setAccessible(false);
} catch (IllegalArgumentException e) {
throw new Error(Locale.getString(getClass(), "EVALUATOR_REJECTION", event), e); //$NON-NLS-1$
} catch (IllegalAccessException e) {
Expand Down

0 comments on commit 0ba0167

Please sign in to comment.