Skip to content

Commit

Permalink
Use canonical event types in event listener maps.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 27, 2018
1 parent 36a95f7 commit 19db2bc
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 20 deletions.
11 changes: 10 additions & 1 deletion primitive/src/main/java/io/atomix/primitive/event/EventType.java
Expand Up @@ -39,7 +39,16 @@ static EventType from(String name) {
* @param eventType the event type to simplify * @param eventType the event type to simplify
* @return the simplified event type * @return the simplified event type
*/ */
static EventType simplify(EventType eventType) { static EventType canonical(EventType eventType) {
return new DefaultEventType(eventType.id()); return new DefaultEventType(eventType.id());
} }

/**
* Returns an identical event type in canonical form.
*
* @return an identical event type in canonical form
*/
default EventType canonicalize() {
return canonical(this);
}
} }
Expand Up @@ -45,7 +45,7 @@ public static PrimitiveEvent event(EventType eventType) {
* @return the primitive event * @return the primitive event
*/ */
public static PrimitiveEvent event(EventType eventType, byte[] value) { public static PrimitiveEvent event(EventType eventType, byte[] value) {
return new PrimitiveEvent(EventType.simplify(eventType), value); return new PrimitiveEvent(EventType.canonical(eventType), value);
} }


private final EventType type; private final EventType type;
Expand Down
Expand Up @@ -242,7 +242,7 @@ private CompletableFuture<Stream<byte[]>> executeAll(PrimitiveOperation operatio
protected <T> void listenAll(EventType eventType, BiConsumer<PartitionId, T> listener) { protected <T> void listenAll(EventType eventType, BiConsumer<PartitionId, T> listener) {
getPartitions().forEach(partition -> { getPartitions().forEach(partition -> {
Consumer<PrimitiveEvent> partitionListener = event -> listener.accept(partition.partitionId(), decode(event.value())); Consumer<PrimitiveEvent> partitionListener = event -> listener.accept(partition.partitionId(), decode(event.value()));
eventListeners.computeIfAbsent(eventType, t -> Maps.newHashMap()) eventListeners.computeIfAbsent(eventType.canonicalize(), t -> Maps.newHashMap())
.computeIfAbsent(partition.partitionId(), p -> Maps.newIdentityHashMap()) .computeIfAbsent(partition.partitionId(), p -> Maps.newIdentityHashMap())
.put(listener, partitionListener); .put(listener, partitionListener);
partition.addEventListener(eventType, partitionListener); partition.addEventListener(eventType, partitionListener);
Expand All @@ -258,7 +258,7 @@ protected <T> void listenAll(EventType eventType, BiConsumer<PartitionId, T> lis
protected void listenAll(EventType eventType, Consumer<PartitionId> listener) { protected void listenAll(EventType eventType, Consumer<PartitionId> listener) {
getPartitions().forEach(partition -> { getPartitions().forEach(partition -> {
Consumer<PrimitiveEvent> partitionListener = event -> listener.accept(partition.partitionId()); Consumer<PrimitiveEvent> partitionListener = event -> listener.accept(partition.partitionId());
eventListeners.computeIfAbsent(eventType, t -> Maps.newHashMap()) eventListeners.computeIfAbsent(eventType.canonicalize(), t -> Maps.newHashMap())
.computeIfAbsent(partition.partitionId(), p -> Maps.newIdentityHashMap()) .computeIfAbsent(partition.partitionId(), p -> Maps.newIdentityHashMap())
.put(listener, partitionListener); .put(listener, partitionListener);
partition.addEventListener(eventType, partitionListener); partition.addEventListener(eventType, partitionListener);
Expand All @@ -272,7 +272,7 @@ protected void listenAll(EventType eventType, Consumer<PartitionId> listener) {
* @param listener the event listener to add * @param listener the event listener to add
*/ */
protected void unlistenAll(EventType eventType, Consumer<PartitionId> listener) { protected void unlistenAll(EventType eventType, Consumer<PartitionId> listener) {
Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType); Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType.canonicalize());
if (eventTypeListeners != null) { if (eventTypeListeners != null) {
getPartitions().forEach(partition -> { getPartitions().forEach(partition -> {
Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partition.partitionId()); Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partition.partitionId());
Expand All @@ -286,7 +286,7 @@ protected void unlistenAll(EventType eventType, Consumer<PartitionId> listener)
} }
} }
if (eventTypeListeners.isEmpty()) { if (eventTypeListeners.isEmpty()) {
eventListeners.remove(eventType); eventListeners.remove(eventType.canonicalize());
} }
}); });
} }
Expand All @@ -299,7 +299,7 @@ protected void unlistenAll(EventType eventType, Consumer<PartitionId> listener)
* @param listener the event listener to remove * @param listener the event listener to remove
*/ */
protected void unlistenAll(EventType eventType, BiConsumer listener) { protected void unlistenAll(EventType eventType, BiConsumer listener) {
Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType); Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType.canonicalize());
if (eventTypeListeners != null) { if (eventTypeListeners != null) {
getPartitions().forEach(partition -> { getPartitions().forEach(partition -> {
Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partition.partitionId()); Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partition.partitionId());
Expand All @@ -313,7 +313,7 @@ protected void unlistenAll(EventType eventType, BiConsumer listener) {
} }
} }
if (eventTypeListeners.isEmpty()) { if (eventTypeListeners.isEmpty()) {
eventListeners.remove(eventType); eventListeners.remove(eventType.canonicalize());
} }
}); });
} }
Expand Down Expand Up @@ -396,7 +396,7 @@ private CompletableFuture<byte[]> executeOn(PartitionId partitionId, PrimitiveOp
*/ */
protected <T> void listenOn(PartitionId partitionId, EventType eventType, Consumer<T> listener) { protected <T> void listenOn(PartitionId partitionId, EventType eventType, Consumer<T> listener) {
Consumer<PrimitiveEvent> partitionListener = event -> listener.accept(decode(event.value())); Consumer<PrimitiveEvent> partitionListener = event -> listener.accept(decode(event.value()));
eventListeners.computeIfAbsent(eventType, t -> Maps.newHashMap()) eventListeners.computeIfAbsent(eventType.canonicalize(), t -> Maps.newHashMap())
.computeIfAbsent(partitionId, p -> Maps.newIdentityHashMap()) .computeIfAbsent(partitionId, p -> Maps.newIdentityHashMap())
.put(listener, partitionListener); .put(listener, partitionListener);
getPartition(partitionId).addEventListener(eventType, partitionListener); getPartition(partitionId).addEventListener(eventType, partitionListener);
Expand All @@ -411,7 +411,7 @@ protected <T> void listenOn(PartitionId partitionId, EventType eventType, Consum
*/ */
protected void listenOn(PartitionId partitionId, EventType eventType, Runnable listener) { protected void listenOn(PartitionId partitionId, EventType eventType, Runnable listener) {
Consumer<PrimitiveEvent> partitionListener = event -> listener.run(); Consumer<PrimitiveEvent> partitionListener = event -> listener.run();
eventListeners.computeIfAbsent(eventType, t -> Maps.newHashMap()) eventListeners.computeIfAbsent(eventType.canonicalize(), t -> Maps.newHashMap())
.computeIfAbsent(partitionId, p -> Maps.newIdentityHashMap()) .computeIfAbsent(partitionId, p -> Maps.newIdentityHashMap())
.put(listener, partitionListener); .put(listener, partitionListener);
getPartition(partitionId).addEventListener(eventType, partitionListener); getPartition(partitionId).addEventListener(eventType, partitionListener);
Expand All @@ -425,7 +425,7 @@ protected void listenOn(PartitionId partitionId, EventType eventType, Runnable l
* @param listener the event listener to add * @param listener the event listener to add
*/ */
protected void unlistenOn(PartitionId partitionId, EventType eventType, Runnable listener) { protected void unlistenOn(PartitionId partitionId, EventType eventType, Runnable listener) {
Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType); Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType.canonicalize());
if (eventTypeListeners != null) { if (eventTypeListeners != null) {
Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partitionId); Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partitionId);
if (partitionListeners != null) { if (partitionListeners != null) {
Expand All @@ -438,7 +438,7 @@ protected void unlistenOn(PartitionId partitionId, EventType eventType, Runnable
} }
} }
if (eventTypeListeners.isEmpty()) { if (eventTypeListeners.isEmpty()) {
eventListeners.remove(eventType); eventListeners.remove(eventType.canonicalize());
} }
} }
} }
Expand All @@ -451,7 +451,7 @@ protected void unlistenOn(PartitionId partitionId, EventType eventType, Runnable
* @param listener the event listener to remove * @param listener the event listener to remove
*/ */
protected void unlistenOn(PartitionId partitionId, EventType eventType, Consumer listener) { protected void unlistenOn(PartitionId partitionId, EventType eventType, Consumer listener) {
Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType); Map<PartitionId, Map<Object, Consumer>> eventTypeListeners = eventListeners.get(eventType.canonicalize());
if (eventTypeListeners != null) { if (eventTypeListeners != null) {
Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partitionId); Map<Object, Consumer> partitionListeners = eventTypeListeners.get(partitionId);
if (partitionListeners != null) { if (partitionListeners != null) {
Expand All @@ -464,7 +464,7 @@ protected void unlistenOn(PartitionId partitionId, EventType eventType, Consumer
} }
} }
if (eventTypeListeners.isEmpty()) { if (eventTypeListeners.isEmpty()) {
eventListeners.remove(eventType); eventListeners.remove(eventType.canonicalize());
} }
} }
} }
Expand Down
Expand Up @@ -207,7 +207,7 @@ public CompletableFuture<byte[]> execute(PrimitiveOperation operation) {
@Override @Override
public synchronized void addEventListener(EventType eventType, Consumer<PrimitiveEvent> consumer) { public synchronized void addEventListener(EventType eventType, Consumer<PrimitiveEvent> consumer) {
checkOpen(); checkOpen();
eventListeners.put(eventType, consumer); eventListeners.put(eventType.canonicalize(), consumer);
PartitionProxy proxy = this.proxy; PartitionProxy proxy = this.proxy;
if (proxy != null) { if (proxy != null) {
proxy.addEventListener(eventType, consumer); proxy.addEventListener(eventType, consumer);
Expand All @@ -217,7 +217,7 @@ public synchronized void addEventListener(EventType eventType, Consumer<Primitiv
@Override @Override
public synchronized void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> consumer) { public synchronized void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> consumer) {
checkOpen(); checkOpen();
eventListeners.remove(eventType, consumer); eventListeners.remove(eventType.canonicalize(), consumer);
PartitionProxy proxy = this.proxy; PartitionProxy proxy = this.proxy;
if (proxy != null) { if (proxy != null) {
proxy.removeEventListener(eventType, consumer); proxy.removeEventListener(eventType, consumer);
Expand Down
Expand Up @@ -207,12 +207,12 @@ private void execute(PrimitiveOperation operation, ComposableFuture<byte[]> futu


@Override @Override
public void addEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) { public void addEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) {
eventListeners.computeIfAbsent(eventType, t -> Sets.newLinkedHashSet()).add(listener); eventListeners.computeIfAbsent(eventType.canonicalize(), t -> Sets.newLinkedHashSet()).add(listener);
} }


@Override @Override
public void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) { public void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) {
eventListeners.computeIfAbsent(eventType, t -> Sets.newLinkedHashSet()).remove(listener); eventListeners.computeIfAbsent(eventType.canonicalize(), t -> Sets.newLinkedHashSet()).remove(listener);
} }


/** /**
Expand Down
Expand Up @@ -67,7 +67,7 @@ public RaftProxyListener(RaftClientProtocol protocol, MemberSelector memberSelec
* @param listener the event listener callback * @param listener the event listener callback
*/ */
public void addEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) { public void addEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) {
executor.execute(() -> eventListeners.computeIfAbsent(eventType, e -> Sets.newLinkedHashSet()).add(listener)); executor.execute(() -> eventListeners.computeIfAbsent(eventType.canonicalize(), e -> Sets.newLinkedHashSet()).add(listener));
} }


/** /**
Expand All @@ -76,7 +76,7 @@ public void addEventListener(EventType eventType, Consumer<PrimitiveEvent> liste
* @param listener the event listener callback * @param listener the event listener callback
*/ */
public void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) { public void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> listener) {
executor.execute(() -> eventListeners.computeIfAbsent(eventType, e -> Sets.newLinkedHashSet()).remove(listener)); executor.execute(() -> eventListeners.computeIfAbsent(eventType.canonicalize(), e -> Sets.newLinkedHashSet()).remove(listener));
} }


/** /**
Expand Down

0 comments on commit 19db2bc

Please sign in to comment.