Skip to content

Commit

Permalink
Add leader elector resource.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 9, 2017
1 parent cc39a21 commit b81b28e
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 212 deletions.
Expand Up @@ -58,9 +58,10 @@ default DistributedPrimitive.Type primitiveType() {
/**
* Withdraws from leadership race for a topic.
*
* @param identifier instance identifier of the node to withdraw
* @return CompletableFuture that is completed when the withdraw is done
*/
CompletableFuture<Void> withdraw();
CompletableFuture<Void> withdraw(T identifier);

/**
* Attempts to promote a node to leadership displacing the current leader.
Expand Down
Expand Up @@ -39,8 +39,10 @@ default DistributedPrimitive.Type primitiveType() {

/**
* Withdraws from leadership race for a topic.
*
* @param identifier identifier of the node to withdraw
*/
void withdraw();
void withdraw(T identifier);

/**
* Attempts to promote a node to leadership displacing the current leader.
Expand Down
Expand Up @@ -49,8 +49,8 @@ public Leadership run(T identifier) {
}

@Override
public void withdraw() {
complete(asyncElector.withdraw());
public void withdraw(T identifier) {
complete(asyncElector.withdraw(identifier));
}

@Override
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Evict;
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Promote;
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Run;
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Withdraw;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.serializer.Serializer;
import io.atomix.serializer.kryo.KryoNamespace;
Expand Down Expand Up @@ -74,8 +75,8 @@ public CompletableFuture<Leadership<byte[]>> run(byte[] id) {
}

@Override
public CompletableFuture<Void> withdraw() {
return proxy.invoke(WITHDRAW);
public CompletableFuture<Void> withdraw(byte[] id) {
return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(id));
}

@Override
Expand Down
Expand Up @@ -63,6 +63,7 @@ public OperationType type() {
.register(Leadership.class)
.register(Leader.class)
.register(Run.class)
.register(Withdraw.class)
.register(Anoint.class)
.register(Promote.class)
.register(Evict.class)
Expand Down Expand Up @@ -118,6 +119,19 @@ public Run(byte[] id) {
}
}

/**
* Command for withdrawing a candidate from an election.
*/
@SuppressWarnings("serial")
public static class Withdraw extends ElectionChangeOperation {
private Withdraw() {
}

public Withdraw(byte[] id) {
super(id);
}
}

/**
* Command for administratively anoint a node as leader.
*/
Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Evict;
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Promote;
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Run;
import io.atomix.primitives.leadership.impl.RaftLeaderElectorOperations.Withdraw;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
Expand Down Expand Up @@ -109,7 +110,7 @@ protected void configure(RaftServiceExecutor executor) {
executor.register(REMOVE_LISTENER, this::unlisten);
// Commands
executor.register(RUN, SERIALIZER::decode, this::run, SERIALIZER::encode);
executor.register(WITHDRAW, this::withdraw);
executor.register(WITHDRAW, SERIALIZER::decode, this::withdraw);
executor.register(ANOINT, SERIALIZER::decode, this::anoint, SERIALIZER::encode);
executor.register(PROMOTE, SERIALIZER::decode, this::promote, SERIALIZER::encode);
executor.register(EVICT, SERIALIZER::decode, this::evict);
Expand Down Expand Up @@ -172,10 +173,10 @@ protected Leadership<byte[]> run(Commit<? extends Run> commit) {
/**
* Applies a withdraw commit.
*/
protected void withdraw(Commit<Void> commit) {
protected void withdraw(Commit<? extends Withdraw> commit) {
try {
Leadership<byte[]> oldLeadership = leadership();
cleanup(commit.session());
cleanup(commit.value().id());
Leadership<byte[]> newLeadership = leadership();
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
Expand Down Expand Up @@ -350,6 +351,30 @@ public String toString() {
}
}

protected void cleanup(byte[] id) {
Optional<Registration> registration =
registrations.stream().filter(r -> Arrays.equals(r.id(), id)).findFirst();
if (registration.isPresent()) {
List<Registration> updatedRegistrations =
registrations.stream()
.filter(r -> !Arrays.equals(r.id(), id))
.collect(Collectors.toList());
if (Arrays.equals(leader.id(), id)) {
if (!updatedRegistrations.isEmpty()) {
this.registrations = updatedRegistrations;
this.leader = updatedRegistrations.get(0);
this.term = termCounter.incrementAndGet();
this.termStartTime = context().wallClock().getTime().unixTimestamp();
} else {
this.registrations = updatedRegistrations;
this.leader = null;
}
} else {
this.registrations = updatedRegistrations;
}
}
}

protected void cleanup(RaftSession session) {
Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst();
Expand Down
Expand Up @@ -54,8 +54,8 @@ public CompletableFuture<Leadership<V1>> run(V1 identifier) {
}

@Override
public CompletableFuture<Void> withdraw() {
return backingElector.withdraw();
public CompletableFuture<Void> withdraw(V1 identifier) {
return backingElector.withdraw(valueEncoder.apply(identifier));
}

@Override
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void testWithdraw() throws Throwable {
LeaderEventListener listener2 = new LeaderEventListener();
elector2.addListener(listener2).join();

elector1.withdraw().join();
elector1.withdraw(node1).join();

listener1.nextEvent().thenAccept(result -> {
assertArrayEquals(node2, result.newLeadership().leader().id());
Expand Down
126 changes: 46 additions & 80 deletions rest/src/main/java/io/atomix/rest/impl/EventLog.java
Expand Up @@ -15,118 +15,84 @@
*/
package io.atomix.rest.impl;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Session registry.
*/
public class EventLog<T> implements Consumer<T> {
private volatile EventSession<T> globalSession;
private final Map<Integer, EventSession<T>> sessions = new ConcurrentHashMap<>();
private final AtomicInteger sessionId = new AtomicInteger();
private final AtomicBoolean registered = new AtomicBoolean();
public class EventLog<L, E> {
private final L listener;
private final AtomicBoolean open = new AtomicBoolean();
private final Queue<E> events = new ConcurrentLinkedQueue<>();
private final Queue<CompletableFuture<E>> futures = new ConcurrentLinkedQueue<>();

EventLog(Function<EventLog<L, E>, L> listenerFactory) {
this.listener = listenerFactory.apply(this);
}

/**
* Returns a boolean indicating whether the event consumer needs to be registered.
*
* @return indicates whether the event consumer needs to be registered
*/
boolean register() {
return registered.compareAndSet(false, true);
public boolean open() {
return open.compareAndSet(false, true);
}

/**
* Returns a boolean indicating whether the event consumer needs to be unregistered.
* Returns the event listener.
*
* @return indicates whether the event consumer needs to be unregistered
* @return the event listener
*/
synchronized boolean unregister() {
return globalSession == null && sessions.isEmpty() && registered.compareAndSet(true, false);
public L listener() {
return listener;
}

/**
* Returns the global session.
* Completes the given response with the next event.
*
* @return the global session
* @return a future to be completed with the next event
*/
EventSession<T> getGlobalSession() {
EventSession<T> session = globalSession;
if (session == null) {
synchronized (this) {
if (globalSession == null) {
globalSession = new EventSession<>();
session = globalSession;
}
}
public CompletableFuture<E> nextEvent() {
E event = events.poll();
if (event != null) {
return CompletableFuture.completedFuture(event);
} else {
CompletableFuture<E> future = new CompletableFuture<>();
futures.add(future);
return future;
}
return session;
}

/**
* Deletes the global session.
*/
void deleteGlobalSession() {
EventSession<T> session = globalSession;
if (session != null) {
session.close();
globalSession = null;
}
}

/**
* Returns an event session by ID.
*
* @param sessionId the session identifier
* @return the event session
*/
EventSession<T> getSession(Integer sessionId) {
return sessions.get(sessionId);
}

/**
* Deletes the given session.
* Adds an event to the log.
*
* @param sessionId the session identifier
* @param event the event to add
*/
void deleteSession(int sessionId) {
EventSession<T> session = sessions.remove(sessionId);
if (session != null) {
session.close();
public void addEvent(E event) {
CompletableFuture<E> future = futures.poll();
if (future != null) {
future.complete(event);
} else {
events.add(event);
if (events.size() > 100) {
events.remove();
}
}
}

/**
* Returns all registered sessions.
*
* @return all registered sessions
* Closes the session.
*/
Collection<EventSession<T>> getSessions() {
return sessions.values();
}

/**
* Registers a new session and returns the session identifier.
*
* @return the registered session identifier
*/
int newSession() {
int sessionId = this.sessionId.incrementAndGet();
EventSession<T> session = new EventSession<>();
sessions.put(sessionId, session);
return sessionId;
}

@Override
public void accept(T event) {
EventSession<T> globalSession = this.globalSession;
if (globalSession != null) {
globalSession.addEvent(event);
public boolean close() {
if (open.compareAndSet(true, false)) {
futures.forEach(future -> future.completeExceptionally(new IllegalStateException("Closed session")));
return true;
}
sessions.values().forEach(session -> session.addEvent(event));
return false;
}
}
43 changes: 25 additions & 18 deletions rest/src/main/java/io/atomix/rest/impl/EventManager.java
Expand Up @@ -17,6 +17,7 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

/**
* Rest event manager.
Expand All @@ -25,39 +26,45 @@ public class EventManager {
private final Map<Class<?>, Map<String, EventLog>> eventRegistries = new ConcurrentHashMap<>();

/**
* Returns an event registry if it already exists.
* Returns an event log if it already exists.
*
* @param type the registry type
* @param name the registry name
* @param <T> the event type
* @return the event registry
* @param type the log type
* @param name the log name
* @param <L> the listener type
* @param <E> the event type
* @return the event log
*/
@SuppressWarnings("unchecked")
public <T> EventLog<T> getRegistry(Class<?> type, String name) {
public <L, E> EventLog<L, E> getEventLog(Class<?> type, String name) {
return eventRegistries.computeIfAbsent(type, t -> new ConcurrentHashMap<>()).get(name);
}

/**
* Returns an event registry.
* Returns an event log, creating a new log if none exists.
*
* @param type the registry type
* @param name the registry name
* @param <T> the event type
* @return the event registry
* @param type the log type
* @param name the log name
* @param <L> the listener type
* @param <E> the event type
* @return the event log
*/
@SuppressWarnings("unchecked")
public <T> EventLog<T> getOrCreateRegistry(Class<?> type, String name) {
public <L, E> EventLog<L, E> getOrCreateEventLog(Class<?> type, String name, Function<EventLog<L, E>, L> listenerFactory) {
return eventRegistries.computeIfAbsent(type, t -> new ConcurrentHashMap<>())
.computeIfAbsent(name, n -> new EventLog<>());
.computeIfAbsent(name, n -> new EventLog<>(listenerFactory));
}

/**
* Deletes the given event registry.
* Removes and returns the given event log.
*
* @param type the registry type
* @param name the registry name
* @param type the log type
* @param name the log name
* @param <L> the listener type
* @param <E> the event type
* @return the removed event log or {@code null} if non exits
*/
public void deleteRegistry(Class<?> type, String name) {
eventRegistries.computeIfAbsent(type, t -> new ConcurrentHashMap<>()).remove(name);
@SuppressWarnings("unchecked")
public <L, E> EventLog<L, E> removeEventLog(Class<?> type, String name) {
return eventRegistries.computeIfAbsent(type, t -> new ConcurrentHashMap<>()).remove(name);
}
}

0 comments on commit b81b28e

Please sign in to comment.