Skip to content

Commit

Permalink
Provide epoch for leader elections.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 16, 2015
1 parent 0e64f4d commit 8e2aee8
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 16 deletions.
Expand Up @@ -34,7 +34,7 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class DistributedLeaderElection extends Resource {
private final Set<Consumer<Void>> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Consumer<Long>> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Class<? extends StateMachine> stateMachine() {
Expand All @@ -44,9 +44,9 @@ protected Class<? extends StateMachine> stateMachine() {
@Override
protected void open(ResourceContext context) {
super.open(context);
context.session().onReceive(v -> {
for (Consumer<Void> listener : listeners) {
listener.accept(null);
context.session().<Long>onReceive(epoch -> {
for (Consumer<Long> listener : listeners) {
listener.accept(epoch);
}
});
}
Expand All @@ -57,7 +57,7 @@ protected void open(ResourceContext context) {
* @param listener The listener to register.
* @return A completable future to be completed with the listener context.
*/
public CompletableFuture<Listener<Void>> onElection(Consumer<Void> listener) {
public CompletableFuture<Listener<Long>> onElection(Consumer<Long> listener) {
if (!listeners.isEmpty()) {
listeners.add(listener);
return CompletableFuture.completedFuture(new ElectionListener(listener));
Expand All @@ -68,19 +68,30 @@ public CompletableFuture<Listener<Void>> onElection(Consumer<Void> listener) {
.thenApply(v -> new ElectionListener(listener));
}

/**
* Verifies that the client is the current leader.
*
* @param epoch The epoch for which to check if this client is the leader.
* @return A completable future to be completed with a boolean value indicating whether the
* client is the current leader.
*/
public CompletableFuture<Boolean> isLeader(long epoch) {
return submit(LeaderElectionCommands.IsLeader.builder().withEpoch(epoch).build());
}

/**
* Change listener context.
*/
private class ElectionListener implements Listener<Void> {
private final Consumer<Void> listener;
private class ElectionListener implements Listener<Long> {
private final Consumer<Long> listener;

private ElectionListener(Consumer<Void> listener) {
private ElectionListener(Consumer<Long> listener) {
this.listener = listener;
}

@Override
public void accept(Void event) {
listener.accept(event);
public void accept(Long epoch) {
listener.accept(epoch);
}

@Override
Expand Down
Expand Up @@ -15,14 +15,16 @@
*/
package net.kuujo.copycat.coordination.state;

import net.kuujo.copycat.util.BuilderPool;
import net.kuujo.copycat.raft.protocol.Command;
import net.kuujo.copycat.raft.protocol.Operation;
import net.kuujo.copycat.io.BufferInput;
import net.kuujo.copycat.io.BufferOutput;
import net.kuujo.copycat.io.serializer.CopycatSerializable;
import net.kuujo.copycat.io.serializer.SerializeWith;
import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.raft.protocol.Command;
import net.kuujo.copycat.raft.protocol.ConsistencyLevel;
import net.kuujo.copycat.raft.protocol.Operation;
import net.kuujo.copycat.raft.protocol.Query;
import net.kuujo.copycat.util.BuilderPool;

/**
* Leader election commands.
Expand All @@ -34,6 +36,28 @@ public class LeaderElectionCommands {
private LeaderElectionCommands() {
}

/**
* Abstract election query.
*/
public static abstract class ElectionQuery<V> implements Query<V>, CopycatSerializable {
@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {
}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {
}

/**
* Base reference command builder.
*/
public static abstract class Builder<T extends Builder<T, U, V>, U extends ElectionQuery<V>, V> extends Query.Builder<T, U, V> {
protected Builder(BuilderPool<T, U> pool) {
super(pool);
}
}
}

/**
* Abstract election command.
*/
Expand Down Expand Up @@ -118,4 +142,63 @@ protected Unlisten create() {
}
}

/**
* Is leader query.
*/
@SerializeWith(id=512)
public static class IsLeader extends ElectionQuery<Boolean> {

/**
* Returns a new is leader query builder.
*
* @return A new is leader query builder.
*/
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

private long epoch;

/**
* Returns the epoch to check.
*
* @return The epoch to check.
*/
public long epoch() {
return epoch;
}

@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.LINEARIZABLE;
}

/**
* Is leader query builder.
*/
public static class Builder extends ElectionQuery.Builder<Builder, IsLeader, Boolean> {
public Builder(BuilderPool<Builder, IsLeader> pool) {
super(pool);
}

@Override
protected IsLeader create() {
return new IsLeader();
}

/**
* Sets the epoch to check.
*
* @param epoch The epoch to check.
* @return The query builder.
*/
public Builder withEpoch(long epoch) {
if (epoch <= 0)
throw new IllegalArgumentException("epoch must be positive");
query.epoch = epoch;
return this;
}
}
}

}
Expand Up @@ -15,10 +15,10 @@
*/
package net.kuujo.copycat.coordination.state;

import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.StateMachineExecutor;
import net.kuujo.copycat.raft.session.Session;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -30,12 +30,14 @@
*/
public class LeaderElectionState extends StateMachine {
private Session leader;
private long epoch;
private final List<Commit<LeaderElectionCommands.Listen>> listeners = new ArrayList<>();

@Override
public void configure(StateMachineExecutor executor) {
executor.register(LeaderElectionCommands.Listen.class, this::listen);
executor.register(LeaderElectionCommands.Unlisten.class, this::unlisten);
executor.register(LeaderElectionCommands.IsLeader.class, this::isLeader);
}

@Override
Expand All @@ -45,6 +47,7 @@ public void close(Session session) {
if (!listeners.isEmpty()) {
Commit<LeaderElectionCommands.Listen> leader = listeners.remove(0);
this.leader = leader.session();
this.epoch = leader.index();
this.leader.publish(true);
}
}
Expand All @@ -56,7 +59,8 @@ public void close(Session session) {
protected void listen(Commit<LeaderElectionCommands.Listen> commit) {
if (leader == null) {
leader = commit.session();
leader.publish(true);
epoch = commit.index();
leader.publish(epoch);
commit.clean();
} else {
listeners.add(commit);
Expand All @@ -72,12 +76,20 @@ protected void unlisten(Commit<LeaderElectionCommands.Unlisten> commit) {
if (!listeners.isEmpty()) {
Commit<LeaderElectionCommands.Listen> leader = listeners.remove(0);
this.leader = leader.session();
this.leader.publish(true);
this.epoch = commit.index();
this.leader.publish(epoch);
leader.clean();
}
} else {
commit.clean();
}
}

/**
* Applies an isLeader query.
*/
protected boolean isLeader(Commit<LeaderElectionCommands.IsLeader> commit) {
return leader != null && leader.equals(commit.session()) && epoch == commit.operation().epoch();
}

}

0 comments on commit 8e2aee8

Please sign in to comment.