Skip to content

Commit

Permalink
Register leader election listeners when election is opened.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 2, 2016
1 parent ac220c1 commit 5663e46
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 45 deletions.
Expand Up @@ -95,6 +95,7 @@ public class DistributedLeaderElection extends Resource<DistributedLeaderElectio
public static final ResourceType<DistributedLeaderElection> TYPE = new ResourceType<>(DistributedLeaderElection.class); public static final ResourceType<DistributedLeaderElection> TYPE = new ResourceType<>(DistributedLeaderElection.class);


private final Set<Consumer<Long>> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<Consumer<Long>> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
private volatile long epoch;


public DistributedLeaderElection(CopycatClient client) { public DistributedLeaderElection(CopycatClient client) {
super(client); super(client);
Expand All @@ -107,14 +108,15 @@ public ResourceType<DistributedLeaderElection> type() {


@Override @Override
public CompletableFuture<DistributedLeaderElection> open() { public CompletableFuture<DistributedLeaderElection> open() {
return super.open().thenApply(result -> { return super.open().thenRun(() -> {
client.<Long>onEvent("elect", epoch -> { client.<Long>onEvent("elect", epoch -> {
this.epoch = epoch;
for (Consumer<Long> listener : listeners) { for (Consumer<Long> listener : listeners) {
listener.accept(epoch); listener.accept(epoch);
} }
}); });
return result; }).thenCompose(v -> submit(new LeaderElectionCommands.Listen()))
}); .thenApply(v -> this);
} }


/** /**
Expand Down Expand Up @@ -145,18 +147,26 @@ public CompletableFuture<DistributedLeaderElection> open() {
* } * }
* </pre> * </pre>
* *
* @param listener The listener to register. * @param callback The callback to register.
* @return A completable future to be completed with the listener context. * @return A completable future to be completed with the listener context.
*/ */
public CompletableFuture<Listener<Long>> onElection(Consumer<Long> listener) { public Listener<Long> onElection(Consumer<Long> callback) {
if (!listeners.isEmpty()) { Listener<Long> listener = new Listener<Long>() {
listeners.add(listener); @Override
return CompletableFuture.completedFuture(new ElectionListener(listener)); public void accept(Long epoch) {
} callback.accept(epoch);

}
@Override
public void close() {
listeners.remove(this);
}
};
listeners.add(listener); listeners.add(listener);
return submit(new LeaderElectionCommands.Listen())
.thenApply(v -> new ElectionListener(listener)); if (epoch != 0) {
listener.accept(epoch);
}
return listener;
} }


/** /**
Expand Down Expand Up @@ -188,33 +198,8 @@ public CompletableFuture<Boolean> isLeader(long epoch) {
* @return A completable future to be completed once the instance has resigned from leadership for the given epoch. * @return A completable future to be completed once the instance has resigned from leadership for the given epoch.
*/ */
public CompletableFuture<Void> resign(long epoch) { public CompletableFuture<Void> resign(long epoch) {
return submit(new LeaderElectionCommands.Resign(epoch)); return submit(new LeaderElectionCommands.Resign(epoch))
} .whenComplete((result, error) -> this.epoch = 0);

/**
* Election listener context.
*/
private class ElectionListener implements Listener<Long> {
private final Consumer<Long> listener;

private ElectionListener(Consumer<Long> listener) {
this.listener = listener;
}

@Override
public void accept(Long epoch) {
listener.accept(epoch);
}

@Override
public void close() {
synchronized (DistributedLeaderElection.this) {
listeners.remove(listener);
if (listeners.isEmpty()) {
submit(new LeaderElectionCommands.Unlisten());
}
}
}
} }


} }
Expand Up @@ -42,8 +42,8 @@ public void testElection() throws Throwable {


DistributedLeaderElection election = createResource(); DistributedLeaderElection election = createResource();


election.onElection(v -> resume()).thenRun(this::resume); election.onElection(v -> resume());
await(0, 2); await(10000);
} }


/** /**
Expand All @@ -60,15 +60,15 @@ public void testNextElection() throws Throwable {
threadAssertTrue(epoch > lastEpoch.get()); threadAssertTrue(epoch > lastEpoch.get());
lastEpoch.set(epoch); lastEpoch.set(epoch);
resume(); resume();
}).join(); });


await(10000); await(10000);


election2.onElection(epoch -> { election2.onElection(epoch -> {
threadAssertTrue(epoch > lastEpoch.get()); threadAssertTrue(epoch > lastEpoch.get());
lastEpoch.set(epoch); lastEpoch.set(epoch);
resume(); resume();
}).join(); });


election1.close(); election1.close();


Expand All @@ -89,15 +89,15 @@ public void testResignElection() throws Throwable {
threadAssertTrue(epoch > lastEpoch.get()); threadAssertTrue(epoch > lastEpoch.get());
lastEpoch.set(epoch); lastEpoch.set(epoch);
resume(); resume();
}).join(); });


await(10000); await(10000);


election2.onElection(epoch -> { election2.onElection(epoch -> {
threadAssertTrue(epoch > lastEpoch.get()); threadAssertTrue(epoch > lastEpoch.get());
lastEpoch.set(epoch); lastEpoch.set(epoch);
resume(); resume();
}).join(); });


election1.resign(lastEpoch.get()); election1.resign(lastEpoch.get());


Expand Down

0 comments on commit 5663e46

Please sign in to comment.