Skip to content

Commit

Permalink
Add no-topic listener to LeaderElector. (#579)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed May 30, 2018
1 parent d9519a0 commit 5c5e46a
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 12 deletions.
18 changes: 18 additions & 0 deletions core/src/main/java/io/atomix/core/election/AsyncLeaderElector.java
Expand Up @@ -100,6 +100,24 @@ public interface AsyncLeaderElector<T> extends AsyncPrimitive {
*/
CompletableFuture<Map<String, Leadership<T>>> getLeaderships();

/**
* Registers a listener to be notified of Leadership changes for all topics.
*
* @param listener listener to notify
* @return CompletableFuture that is completed when the operation completes
*/
CompletableFuture<Void> addListener(LeadershipEventListener<T> listener);

/**
* Unregisters a previously registered change notification listener.
* <p>
* If the specified listener was not previously registered, this operation will be a noop.
*
* @param listener listener to remove
* @return CompletableFuture that is completed when the operation completes
*/
CompletableFuture<Void> removeListener(LeadershipEventListener<T> listener);

/**
* Registers a listener to be notified of Leadership changes for all topics.
*
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/io/atomix/core/election/LeaderElector.java
Expand Up @@ -88,6 +88,22 @@ public interface LeaderElector<T> extends SyncPrimitive {
*/
Map<String, Leadership<T>> getLeaderships();

/**
* Registers a listener to be notified of Leadership changes for all topics.
*
* @param listener listener to notify
*/
void addListener(LeadershipEventListener<T> listener);

/**
* Unregisters a previously registered change notification listener.
* <p>
* If the specified listener was not previously registered, this operation will be a noop.
*
* @param listener listener to remove
*/
void removeListener(LeadershipEventListener<T> listener);

/**
* Registers a listener to be notified of Leadership changes for all topics.
*
Expand Down
Expand Up @@ -80,6 +80,16 @@ public Map<String, Leadership<T>> getLeaderships() {
return complete(asyncElector.getLeaderships());
}

@Override
public void addListener(LeadershipEventListener<T> listener) {
complete(asyncElector.addListener(listener));
}

@Override
public void removeListener(LeadershipEventListener<T> listener) {
complete(asyncElector.removeListener(listener));
}

@Override
public void addListener(String topic, LeadershipEventListener<T> listener) {
complete(asyncElector.addListener(topic, listener));
Expand Down
Expand Up @@ -29,8 +29,8 @@
import io.atomix.core.election.impl.LeaderElectorOperations.Promote;
import io.atomix.core.election.impl.LeaderElectorOperations.Run;
import io.atomix.core.election.impl.LeaderElectorOperations.Withdraw;
import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.AbstractAsyncPrimitive;
import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.primitive.proxy.Proxy;
Expand Down Expand Up @@ -63,15 +63,17 @@ public class LeaderElectorProxy extends AbstractAsyncPrimitive<AsyncLeaderElecto
.register(LeaderElectorEvents.NAMESPACE)
.build());

private final Map<String, Set<LeadershipEventListener<byte[]>>> leadershipChangeListeners = Maps.newConcurrentMap();
private final Set<LeadershipEventListener<byte[]>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
private final Map<String, Set<LeadershipEventListener<byte[]>>> topicListeners = Maps.newConcurrentMap();

public LeaderElectorProxy(PrimitiveProxy proxy, PrimitiveRegistry registry) {
super(proxy, registry);
}

private void handleEvent(PartitionId partitionId, List<LeadershipEvent<byte[]>> changes) {
changes.forEach(change -> {
Set<LeadershipEventListener<byte[]>> listenerSet = leadershipChangeListeners.get(change.topic());
leadershipChangeListeners.forEach(l -> l.onEvent(change));
Set<LeadershipEventListener<byte[]>> listenerSet = topicListeners.get(change.topic());
if (listenerSet != null) {
listenerSet.forEach(l -> l.onEvent(change));
}
Expand Down Expand Up @@ -123,10 +125,24 @@ public CompletableFuture<Map<String, Leadership<byte[]>>> getLeaderships() {
});
}

@Override
public synchronized CompletableFuture<Void> addListener(LeadershipEventListener<byte[]> listener) {
leadershipChangeListeners.add(listener);
return invokeAll(ADD_LISTENER).thenApply(v -> null);
}

@Override
public synchronized CompletableFuture<Void> removeListener(LeadershipEventListener<byte[]> listener) {
if (leadershipChangeListeners.remove(listener)) {
return invokeAll(REMOVE_LISTENER).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> addListener(String topic, LeadershipEventListener<byte[]> listener) {
boolean empty = leadershipChangeListeners.isEmpty();
leadershipChangeListeners.compute(topic, (t, s) -> {
boolean empty = topicListeners.isEmpty();
topicListeners.compute(topic, (t, s) -> {
if (s == null) {
s = Sets.newCopyOnWriteArraySet();
}
Expand All @@ -142,18 +158,18 @@ public synchronized CompletableFuture<Void> addListener(String topic, Leadership

@Override
public synchronized CompletableFuture<Void> removeListener(String topic, LeadershipEventListener<byte[]> listener) {
leadershipChangeListeners.computeIfPresent(topic, (t, s) -> {
topicListeners.computeIfPresent(topic, (t, s) -> {
s.remove(listener);
return s.size() == 0 ? null : s;
});
if (leadershipChangeListeners.isEmpty()) {
if (topicListeners.isEmpty()) {
return invokeBy(topic, REMOVE_LISTENER).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}

private boolean isListening() {
return !leadershipChangeListeners.isEmpty();
return !topicListeners.isEmpty();
}

@Override
Expand Down
Expand Up @@ -94,6 +94,27 @@ public CompletableFuture<Map<String, Leadership<V1>>> getLeaderships() {
.thenApply(leaderships -> Maps.transformValues(leaderships, leadership -> leadership.map(valueDecoder)));
}

@Override
public CompletableFuture<Void> addListener(LeadershipEventListener<V1> listener) {
synchronized (listeners) {
InternalLeadershipEventListener internalListener =
listeners.computeIfAbsent(listener, k -> new InternalLeadershipEventListener(listener));
return backingElector.addListener(internalListener);
}
}

@Override
public CompletableFuture<Void> removeListener(LeadershipEventListener<V1> listener) {
synchronized (listeners) {
InternalLeadershipEventListener internalListener = listeners.remove(listener);
if (internalListener != null) {
return backingElector.removeListener(internalListener);
} else {
return CompletableFuture.completedFuture(null);
}
}
}

@Override
public CompletableFuture<Void> addListener(String topic, LeadershipEventListener<V1> listener) {
synchronized (listeners) {
Expand Down
Expand Up @@ -123,7 +123,7 @@ public void testAnoint() throws Throwable {
LeaderEventListener listener1 = new LeaderEventListener();
elector1.addListener("foo", listener1).join();
LeaderEventListener listener2 = new LeaderEventListener();
elector2.addListener("foo", listener2);
elector2.addListener(listener2);
LeaderEventListener listener3 = new LeaderEventListener();
elector3.addListener("foo", listener3).join();

Expand Down Expand Up @@ -169,9 +169,9 @@ public void testPromote() throws Throwable {
LeaderEventListener listener1 = new LeaderEventListener();
elector1.addListener("foo", listener1).join();
LeaderEventListener listener2 = new LeaderEventListener();
elector2.addListener("foo", listener2).join();
elector2.addListener(listener2).join();
LeaderEventListener listener3 = new LeaderEventListener();
elector3.addListener("foo", listener3).join();
elector3.addListener(listener3).join();

elector3.promote("foo", node3).thenAccept(result -> {
assertFalse(result);
Expand Down Expand Up @@ -231,7 +231,7 @@ public void testNonLeaderSessionClose() throws Throwable {
AsyncLeaderElector<MemberId> elector2 = atomix().<MemberId>leaderElectorBuilder("test-elector-non-leader-session-close", protocol()).build().async();
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector1.addListener("foo", listener).join();
elector1.addListener(listener).join();
elector2.close().join();
listener.nextEvent().thenAccept(result -> {
assertEquals(node1, result.newLeadership().leader().id());
Expand Down

0 comments on commit 5c5e46a

Please sign in to comment.