Skip to content

Commit

Permalink
Removing the last remaining parallel stream
Browse files Browse the repository at this point in the history
Removing this stream that was causing a deadlock
  • Loading branch information
ngaud committed Dec 17, 2019
1 parent 7c2458a commit b91cf33
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 35 deletions.
Expand Up @@ -21,6 +21,17 @@

package io.sarl.eventdispatching

import com.google.common.base.Objects
import com.google.common.base.Throwables
import com.google.common.cache.CacheBuilder
import com.google.common.cache.LoadingCache
import com.google.common.collect.Collections2
import com.google.common.collect.ImmutableMap
import com.google.common.collect.ImmutableSet
import com.google.common.collect.Maps
import com.google.common.reflect.TypeToken
import io.sarl.lang.annotation.PerceptGuardEvaluator
import io.sarl.lang.core.Event
import java.lang.reflect.Method
import java.text.MessageFormat
import java.util.Arrays
Expand All @@ -30,24 +41,11 @@ import java.util.List
import java.util.Map
import java.util.Map.Entry
import java.util.Set
import java.util.TreeSet
import java.util.TreeMap
import java.util.TreeSet
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.CopyOnWriteArraySet

import com.google.common.base.Objects
import com.google.common.base.Throwables
import com.google.common.cache.CacheBuilder
import com.google.common.cache.LoadingCache
import com.google.common.collect.Collections2
import com.google.common.collect.ImmutableMap
import com.google.common.collect.ImmutableSet
import com.google.common.collect.Maps
import com.google.common.reflect.TypeToken
import org.eclipse.xtext.xbase.lib.Pair

import io.sarl.lang.^annotation.PerceptGuardEvaluator
import io.sarl.lang.core.Event

/**
* Registry of all {@code BehaviorGuardEvaluator} classes containing a method to evaluate the guard of a given behavior (on clause in SARL behavior).
* This class has been inspired by the {@code com.google.common.eventbus.SuscriberRegistry} class of Google Guava library.
Expand Down Expand Up @@ -557,8 +555,9 @@ class BehaviorGuardEvaluatorRegistry {
* -the event to process
* @return the set of guard evaluators associated to the specified event
*/
def getBehaviorGuardEvaluators(^event : Event) : Iterable<BehaviorGuardEvaluator> {
// final ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
def getBehaviorGuardEvaluators(^event : Event) : ConcurrentLinkedDeque<BehaviorGuardEvaluator> {
// Original java code from Nico
// final ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
//
// final List<BehaviorGuardEvaluator> iBehaviorGuardEvaluators = Lists.newArrayListWithCapacity(eventTypes.size());
//
Expand All @@ -572,9 +571,25 @@ class BehaviorGuardEvaluatorRegistry {
// }
//
// return iBehaviorGuardEvaluators;
val eventTypes = flattenHierarchy(^event.class)
return [ new EvaluatorCollectionIterator(this.behaviorGuardEvaluators, ^event,
eventTypes.iterator, false) ]

var eventTypes : ImmutableSet<Class<?>> = flattenHierarchy(^event.getClass());

var iBehaviorGuardEvaluators : ConcurrentLinkedDeque<BehaviorGuardEvaluator> = new ConcurrentLinkedDeque();

for (eventType : eventTypes) {

var eventSubscribers : Pair<(Event) => Boolean, Set<BehaviorGuardEvaluator>> = this.behaviorGuardEvaluators.get(eventType);
if (eventSubscribers != null && eventSubscribers.getValue() != null
&& (eventSubscribers.getKey() == null || eventSubscribers.getKey().apply(^event))) {
iBehaviorGuardEvaluators.addAll(eventSubscribers.getValue());
}
}

return iBehaviorGuardEvaluators;

//val eventTypes = flattenHierarchy(^event.class)
//return [ new EvaluatorCollectionIterator(this.behaviorGuardEvaluators, ^event,
// eventTypes.iterator, false) ]
}

/**
Expand All @@ -589,12 +604,33 @@ class BehaviorGuardEvaluatorRegistry {
* @return the set of guard evaluators associated to the specified event
* @since 0.5
*/
def getBehaviorGuardEvaluatorsFor(^event : Event, listener : Object) : Iterable<BehaviorGuardEvaluator> {
val eventTypes = flattenHierarchy(^event.class)
def getBehaviorGuardEvaluatorsFor(^event : Event, listener : Object) : ConcurrentLinkedDeque<BehaviorGuardEvaluator> {
/*val eventTypes = flattenHierarchy(^event.class)
return [
val base = new EvaluatorCollectionIterator(this.behaviorGuardEvaluators, ^event, eventTypes.iterator, true)
new EvaluatorCollectionFilteringIterator(base, listener)
]
]*/
val eventTypes = flattenHierarchy(^event.class)
var iBehaviorGuardEvaluators : ConcurrentLinkedDeque<BehaviorGuardEvaluator> = new ConcurrentLinkedDeque();

if(listener != null) {
for (eventType : eventTypes) {
var eventSubscribers : Pair<(Event) => Boolean, Set<BehaviorGuardEvaluator>> = this.behaviorGuardEvaluators.get(eventType);
if (eventSubscribers != null && eventSubscribers.getValue() != null
&& eventSubscribers === listener && (eventSubscribers.getKey() == null || eventSubscribers.getKey().apply(^event))) {
iBehaviorGuardEvaluators.addAll(eventSubscribers.getValue());
}
}
} else {
for (eventType : eventTypes) {
var eventSubscribers : Pair<(Event) => Boolean, Set<BehaviorGuardEvaluator>> = this.behaviorGuardEvaluators.get(eventType);
if (eventSubscribers != null && eventSubscribers.getValue() != null
&& (eventSubscribers.getKey() == null || eventSubscribers.getKey().apply(^event))) {
iBehaviorGuardEvaluators.addAll(eventSubscribers.getValue());
}
}
}
return iBehaviorGuardEvaluators;
}

/** Replies if a listener with the given type is registered.
Expand Down
Expand Up @@ -23,12 +23,13 @@

import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.StreamSupport;

import com.google.common.collect.Lists;
import com.google.inject.Inject;

import org.arakhne.afc.util.MultiCollection;
import org.arakhne.afc.util.OutputParameter;
import org.eclipse.xtext.xbase.lib.Functions.Function1;
Expand All @@ -37,7 +38,6 @@
import io.janusproject.services.executor.EarlyExitException;
import io.janusproject.services.executor.ExecutorService;
import io.janusproject.services.executor.JanusRunnable;

import io.sarl.eventdispatching.BehaviorGuardEvaluator;
import io.sarl.eventdispatching.BehaviorGuardEvaluatorRegistry;
import io.sarl.lang.core.Event;
Expand Down Expand Up @@ -156,7 +156,7 @@ public void unregisterAll(Procedure1<? super Object> callback) {
*/
public void immediateDispatch(Event event) {
assert event != null;
Iterable<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
ConcurrentLinkedDeque<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
synchronized (this.behaviorGuardEvaluatorRegistry) {
behaviorGuardEvaluators = AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry
.getBehaviorGuardEvaluators(event);
Expand All @@ -168,7 +168,7 @@ public void immediateDispatch(Event event) {
executeBehaviorMethodsInParalellWithSynchroAtTheEnd(behaviorsMethodsToExecute);
} catch (RuntimeException exception) {
throw exception;
} catch (InterruptedException | ExecutionException | InvocationTargetException e) {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

Expand All @@ -186,7 +186,7 @@ public void immediateDispatch(Event event) {
*/
public void immediateDispatchTo(Object listener, Event event) {
assert event != null;
Iterable<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
ConcurrentLinkedDeque<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
synchronized (this.behaviorGuardEvaluatorRegistry) {
behaviorGuardEvaluators = AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry
.getBehaviorGuardEvaluatorsFor(event, listener);
Expand All @@ -198,10 +198,9 @@ public void immediateDispatchTo(Object listener, Event event) {
executeBehaviorMethodsInParalellWithSynchroAtTheEnd(behaviorsMethodsToExecute);
} catch (RuntimeException exception) {
throw exception;
} catch (InterruptedException | ExecutionException | InvocationTargetException e) {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

}
}

Expand All @@ -216,7 +215,7 @@ public void immediateDispatchTo(Object listener, Event event) {
public void asyncDispatch(Event event) {
assert event != null;
this.executor.execute(() -> {
Iterable<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
ConcurrentLinkedDeque<BehaviorGuardEvaluator> behaviorGuardEvaluators = null;
synchronized (AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry) {
behaviorGuardEvaluators = AgentInternalEventsDispatcher.this.behaviorGuardEvaluatorRegistry
.getBehaviorGuardEvaluators(event);
Expand All @@ -225,7 +224,7 @@ public void asyncDispatch(Event event) {
final Collection<Runnable> behaviorsMethodsToExecute;
try {
behaviorsMethodsToExecute = evaluateGuards(event, behaviorGuardEvaluators);
} catch (InvocationTargetException e) {
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
executeAsynchronouslyBehaviorMethods(behaviorsMethodsToExecute);
Expand All @@ -242,12 +241,12 @@ public void asyncDispatch(Event event) {
* @return the collection of couple associating a object and its collection of behavior methods that must be executed
* @throws InvocationTargetException - exception when you try to execute a method by reflection and this method doesn't exist.
*/
private static Collection<Runnable> evaluateGuards(final Event event,
final Iterable<BehaviorGuardEvaluator> behaviorGuardEvaluators) throws InvocationTargetException {
private Collection<Runnable> evaluateGuards(final Event event,
final ConcurrentLinkedDeque<BehaviorGuardEvaluator> behaviorGuardEvaluators) throws ExecutionException {

final MultiCollection<Runnable> behaviorsMethodsToExecute = new MultiCollection<>();

try {
/*try {
StreamSupport.stream(behaviorGuardEvaluators.spliterator(), true).forEach(evaluator -> {
final Collection<Runnable> behaviorsMethodsToExecutePerTarget = Lists.newLinkedList();
evaluator.evaluateGuard(event, behaviorsMethodsToExecutePerTarget);
Expand All @@ -264,8 +263,68 @@ private static Collection<Runnable> evaluateGuards(final Event event,
throw (InvocationTargetException) t;
}
throw exception;
}*/

final CountDownLatch doneSignal = new CountDownLatch(behaviorGuardEvaluators.size());

final OutputParameter<Throwable> runException = new OutputParameter<>();

for (final BehaviorGuardEvaluator evaluator : behaviorGuardEvaluators) {
this.executor.execute(new JanusRunnable() {
@Override
public void run() {
try {
final Collection<Runnable> behaviorsMethodsToExecutePerTarget = Lists.newLinkedList();
evaluator.evaluateGuard(event, behaviorsMethodsToExecutePerTarget);
synchronized (behaviorsMethodsToExecute) {
behaviorsMethodsToExecute.addCollection(behaviorsMethodsToExecutePerTarget);
}
} catch (EarlyExitException e) {
// Ignore this exception
} catch (RuntimeException e) {
// Catch exception for notifying the caller
runException.set(e);
// Do the standard behavior too -> logging
throw e;
} catch (Exception e) {
// Catch exception for notifying the caller
runException.set(e);
// Do the standard behavior too -> logging
throw new RuntimeException(e);
} finally {
doneSignal.countDown();
}
}
});
}

// Wait for all Behaviors runnable to complete before continuing
try {
doneSignal.await();
} catch (InterruptedException ex) {
// This exception occurs when one of the launched task kills the agent before all the
// submitted tasks are finished. Keep in mind that killing an agent should kill the
// launched tasks.
// Example of code that is generating this issue:
//
// on Initialize {
// in (100) [
// killMe
// ]
// }
//
// In this example, the killMe is launched before the Initialize code is finished;
// and because the Initialize event is fired through the current function, it
// causes the InterruptedException.
}

// Re-throw the run-time exception
if (runException.get() != null) {
throw new ExecutionException(runException.get());
}



return behaviorsMethodsToExecute;
}

Expand Down

0 comments on commit b91cf33

Please sign in to comment.