Skip to content

Commit

Permalink
[sre] Change synchronization of event spaces.
Browse files Browse the repository at this point in the history
see #470
see #546

Signed-off-by: Stéphane Galland <galland@arakhne.org>
  • Loading branch information
gallandarakhneorg committed Jan 25, 2017
1 parent 2886070 commit 3c7648c
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 35 deletions.
Expand Up @@ -71,6 +71,11 @@ public MultipleAddressParticipantRepository(String distributedParticipantMapName
this.participants = repositoryImplFactory.getMultiMap(this.distributedParticipantMapName, null);
}

@Override
public Object mutex() {
return this.participants;
}

/**
* Add a participant in this repository.
*
Expand Down
Expand Up @@ -191,8 +191,6 @@ protected Set<Entry<ADDRESST, EventListener>> listenersEntrySet() {
*
* @return the mutex.
*/
public final Object mutex() {
return this;
}
public abstract Object mutex();

}
Expand Up @@ -73,6 +73,11 @@ public UniqueAddressParticipantRepository(String distributedParticipantMapName,
this.participants = repositoryImplFactory.getMap(this.distributedParticipantMapName, null);
}

@Override
public Object mutex() {
return this.participants;
}

/**
* Registers a new participant in this repository.
* @param address - the address of the participant
Expand Down
Expand Up @@ -28,6 +28,8 @@
/**
* A view on a Guava multimap that provides the API for the DMultiMap.
*
* <p>This class is not thread-safe.
*
* @param <K> - type of the keys.
* @param <V> - type of the values.
* @author $Author: sgalland$
Expand Down
Expand Up @@ -28,6 +28,8 @@
/**
* View from a Hazelcast map to DMap.
*
* <p>This class is not thread-safe.
*
* @param <K> - type of the keys.
* @param <V> - type of the values.
* @author $Author: sgalland$
Expand Down
Expand Up @@ -45,6 +45,8 @@
/**
* A view from the Hazelcast multimap to DMultiMap.
*
* <p>This class is not thread-safe.
*
* @param <K> - type of the keys.
* @param <V> - type of the values.
* @author $Author: sgalland$
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.sarl.lang.core.EventListener;
import io.sarl.lang.core.Scope;
import io.sarl.lang.core.SpaceID;
import io.sarl.lang.util.SynchronizedCollection;
import io.sarl.lang.util.SynchronizedSet;
import io.sarl.util.Collections3;
import io.sarl.util.Scopes;
Expand All @@ -52,11 +53,6 @@
*/
public abstract class AbstractEventSpace extends SpaceBase {

/**
* List of participants in this space. DO MISS TO BE SYNCHRONIZED ON THE PARTICIPANT REPOSITORY.
*/
protected final UniqueAddressParticipantRepository<Address> participants;

/**
* Logging service.
*/
Expand All @@ -75,6 +71,11 @@ public abstract class AbstractEventSpace extends SpaceBase {
@Inject
private NetworkService network;

/**
* List of participants in this space.
*/
private final UniqueAddressParticipantRepository<Address> participants;

/**
* Constructs an event space.
*
Expand All @@ -87,6 +88,14 @@ public AbstractEventSpace(SpaceID id, DistributedDataStructureService factory) {
factory);
}

/** Replies the internal datastructure that stores the participants to this space.
*
* @return the internal data structure.
*/
protected UniqueAddressParticipantRepository<Address> getParticipantInternalDataStructure() {
return this.participants;
}

/**
* Replies the address associated to the given participant.
*
Expand All @@ -104,9 +113,7 @@ public final Address getAddress(EventListener entity) {
* @return the address.
*/
public Address getAddress(UUID id) {
synchronized (this.participants) {
return this.participants.getAddress(id);
}
return getParticipantInternalDataStructure().getAddress(id);
}

/**
Expand Down Expand Up @@ -153,8 +160,10 @@ public final void emit(Event event) {
* @param scope - description of the scope of the event, i.e. the receivers of the event.
*/
protected void doEmit(Event event, Scope<? super Address> scope) {
synchronized (this.participants) {
for (final EventListener agent : this.participants.getListeners()) {
final UniqueAddressParticipantRepository<Address> particips = getParticipantInternalDataStructure();
final SynchronizedCollection<EventListener> listeners = particips.getListeners();
synchronized (listeners.mutex()) {
for (final EventListener agent : listeners) {
if (scope.matches(getAddress(agent))) {
// TODO Verify the agent is still alive and running
this.executorService.submit(new AsyncRunner(agent, event));
Expand All @@ -165,9 +174,7 @@ protected void doEmit(Event event, Scope<? super Address> scope) {

@Override
public SynchronizedSet<UUID> getParticipants() {
synchronized (this.participants) {
return Collections3.unmodifiableSynchronizedSet(this.participants.getParticipantIDs());
}
return Collections3.unmodifiableSynchronizedSet(getParticipantInternalDataStructure().getParticipantIDs());
}

@Override
Expand Down
Expand Up @@ -52,17 +52,12 @@ public EventSpaceImpl(SpaceID id, DistributedDataStructureService factory) {

@Override
public Address register(EventListener entity) {
final Address a = new Address(getSpaceID(), entity.getID());
synchronized (this.participants) {
return this.participants.registerParticipant(a, entity);
}
return getParticipantInternalDataStructure().registerParticipant(new Address(getSpaceID(), entity.getID()), entity);
}

@Override
public Address unregister(EventListener entity) {
synchronized (this.participants) {
return this.participants.unregisterParticipant(entity);
}
return getParticipantInternalDataStructure().unregisterParticipant(entity);
}

}
Expand Up @@ -86,10 +86,7 @@ public Permission getRegistrationPermission() {
@Override
public Address register(EventListener entity, Principal principal) {
if (this.acl.checkPermission(principal, this.accessPermission)) {
final Address a = new Address(getSpaceID(), entity.getID());
synchronized (this.participants) {
return this.participants.registerParticipant(a, entity);
}
return getParticipantInternalDataStructure().registerParticipant(new Address(getSpaceID(), entity.getID()), entity);
}
return null;
}
Expand All @@ -101,9 +98,7 @@ public final <P extends EventListener & Principal> Address register(P entity) {

@Override
public Address unregister(EventListener entity) {
synchronized (this.participants) {
return this.participants.unregisterParticipant(entity);
}
return getParticipantInternalDataStructure().unregisterParticipant(entity);
}

}
Expand Up @@ -36,10 +36,10 @@
/**
* A view on a standard Map that provides the API for DMap.
*
* @param <K>
* - type of the keys.
* @param <V>
* - type of the values.
* <p>This class is not thread-safe.
*
* @param <K> type of the keys.
* @param <V> type of the values.
* @author $Author: sgalland$
* @version $FullVersion$
* @mavengroupid $GroupId$
Expand Down
Expand Up @@ -47,6 +47,8 @@
/**
* A view on a Map that provides the API for the DMultiMap.
*
* <p>This class is not thread-safe.
*
* @param <K> - type of the keys.
* @param <V> - type of the values.
* @author $Author: sgalland$
Expand Down
Expand Up @@ -26,6 +26,8 @@
/**
* Abstract implementation of a view on a Map for a distributed map.
*
* <p>This class is not thread-safe.
*
* @param <K> - type of the keys.
* @param <V> - type of the values.
* @author $Author: sgalland$
Expand Down
Expand Up @@ -39,6 +39,8 @@
/**
* A view if the multiset of the keys in a {@link AbstractDMultiMapView}.
*
* <p>This class is not thread-safe.
*
* @param <K> - the keys.
* @param <V> - the values.
* @author $Author: sgalland$
Expand Down
Expand Up @@ -58,7 +58,10 @@ public class ParticipantRepositoryTest extends AbstractJanusTest {
@Before
public void setUp() {
this.repository = new ParticipantRepository<String>() {
//
@Override
public Object mutex() {
return this;
}
};
this.listeners = new TreeMap<>();
this.listeners.put("a", Mockito.mock(EventListener.class)); //$NON-NLS-1$
Expand Down

0 comments on commit 3c7648c

Please sign in to comment.