Skip to content

Commit

Permalink
Refactor Listener API to use Java Consumers.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 15, 2015
1 parent e73d4f3 commit bc16844
Show file tree
Hide file tree
Showing 24 changed files with 145 additions and 191 deletions.
Expand Up @@ -15,20 +15,20 @@
*/
package net.kuujo.copycat.atomic;

import net.kuujo.copycat.util.Listener;
import net.kuujo.copycat.util.ListenerContext;
import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.atomic.state.ReferenceCommands;
import net.kuujo.copycat.atomic.state.ReferenceState;
import net.kuujo.copycat.raft.protocol.ConsistencyLevel;
import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.protocol.ConsistencyLevel;
import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.util.Listener;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Distributed atomic value.
Expand All @@ -37,7 +37,7 @@
*/
public class DistributedAtomicValue<T> extends Resource {
private ConsistencyLevel defaultConsistency = ConsistencyLevel.LINEARIZABLE_LEASE;
private final java.util.Set<Listener<T>> changeListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final java.util.Set<Consumer<T>> changeListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Class<? extends StateMachine> stateMachine() {
Expand All @@ -48,7 +48,7 @@ protected Class<? extends StateMachine> stateMachine() {
protected void open(ResourceContext context) {
super.open(context);
context.session().<T>onReceive(event -> {
for (Listener<T> listener : changeListeners) {
for (Consumer<T> listener : changeListeners) {
listener.accept(event);
}
});
Expand Down Expand Up @@ -390,24 +390,24 @@ public CompletableFuture<Boolean> compareAndSet(T expect, T update, long ttl, Ti
* @param listener The change listener.
* @return A completable future to be completed once the change listener has been registered.
*/
public synchronized CompletableFuture<ListenerContext<T>> onChange(Listener<T> listener) {
public synchronized CompletableFuture<Listener<T>> onChange(Consumer<T> listener) {
if (!changeListeners.isEmpty()) {
changeListeners.add(listener);
return CompletableFuture.completedFuture(new ChangeListenerContext(listener));
return CompletableFuture.completedFuture(new ChangeListener(listener));
}

changeListeners.add(listener);
return submit(ReferenceCommands.Listen.builder().build())
.thenApply(v -> new ChangeListenerContext(listener));
.thenApply(v -> new ChangeListener(listener));
}

/**
* Change listener context.
*/
private class ChangeListenerContext implements ListenerContext<T> {
private final Listener<T> listener;
private class ChangeListener implements Listener<T> {
private final Consumer<T> listener;

private ChangeListenerContext(Listener<T> listener) {
private ChangeListener(Consumer<T> listener) {
this.listener = listener;
}

Expand Down
Expand Up @@ -24,7 +24,6 @@
import net.kuujo.copycat.raft.protocol.Query;
import net.kuujo.copycat.raft.session.ClientSession;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.util.Listener;
import net.kuujo.copycat.util.Managed;
import net.kuujo.copycat.util.concurrent.Context;
import net.kuujo.copycat.util.concurrent.Futures;
Expand Down
Expand Up @@ -27,7 +27,6 @@
import net.kuujo.copycat.raft.protocol.request.*;
import net.kuujo.copycat.raft.protocol.response.*;
import net.kuujo.copycat.util.Listener;
import net.kuujo.copycat.util.ListenerContext;
import net.kuujo.copycat.util.Listeners;
import net.kuujo.copycat.util.Managed;
import net.kuujo.copycat.util.concurrent.Context;
Expand All @@ -46,6 +45,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Client session.
Expand Down Expand Up @@ -416,7 +416,7 @@ private void onOpen(long sessionId) {
LOGGER.debug("Registered session: {}", sessionId);
this.id = sessionId;
this.state = State.OPEN;
for (Listener<Session> listener : openListeners) {
for (Consumer<Session> listener : openListeners) {
listener.accept(this);
}
}
Expand All @@ -443,14 +443,14 @@ public boolean isOpen() {
}

@Override
public ListenerContext<Session> onOpen(Listener<Session> listener) {
public Listener<Session> onOpen(Consumer<Session> listener) {
return openListeners.add(listener);
}

@Override
public CompletableFuture<Void> publish(Object message) {
return CompletableFuture.runAsync(() -> {
for (Listener<Object> listener : receiveListeners) {
for (Consumer<Object> listener : receiveListeners) {
listener.accept(message);
}
}, context);
Expand All @@ -476,7 +476,7 @@ private CompletableFuture<PublishResponse> handlePublish(PublishRequest request)

eventSequence = request.eventSequence();

for (Listener listener : receiveListeners) {
for (Consumer listener : receiveListeners) {
listener.accept(request.message());
}

Expand All @@ -488,7 +488,7 @@ private CompletableFuture<PublishResponse> handlePublish(PublishRequest request)

@Override
@SuppressWarnings("unchecked")
public ListenerContext<?> onReceive(Listener listener) {
public Listener<?> onReceive(Consumer listener) {
return receiveListeners.add(listener);
}

Expand Down Expand Up @@ -524,7 +524,7 @@ public boolean isClosed() {
}

@Override
public ListenerContext<Session> onClose(Listener<Session> listener) {
public Listener<Session> onClose(Consumer<Session> listener) {
return closeListeners.add(listener);
}

Expand Down
25 changes: 11 additions & 14 deletions common/src/main/java/net/kuujo/copycat/util/Listener.java
Expand Up @@ -15,28 +15,25 @@
*/
package net.kuujo.copycat.util;

import java.util.function.Consumer;

/**
* Event listener.
* <p>
* This is a simple functional event listener interface used to listen for events from a variety of objects. When a listener
* is registered, a {@link ListenerContext} is typically returned. The context can be used to unregister
* the listener at any time via {@link ListenerContext#close()}.
* Context for unregistering a registered listener.
* <p>
* In all cases Copycat will ensure that a registered listener will <em>always</em> be {@link #accept(Object) invoked}
* on the same {@link net.kuujo.copycat.util.concurrent.CopycatThread Copycat thread}.
* The listener context represents a registered listener. The context is normally returned when a {@link Consumer} is
* registered and can be used to unregister the listener via {@link #close()}.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
@FunctionalInterface
public interface Listener<T> {
public interface Listener<T> extends Consumer<T>, AutoCloseable {

/**
* Calls the listener.
* Closes the listener.
* <p>
* The listener will always be called on the same {@link net.kuujo.copycat.util.concurrent.CopycatThread Copycat thread}.
*
* @param event The event that occurred.
* When the listener is closed, the listener will be unregistered and will no longer receive events for which it was
* listening.
*/
void accept(T event);
@Override
void close();

}
37 changes: 0 additions & 37 deletions common/src/main/java/net/kuujo/copycat/util/ListenerContext.java

This file was deleted.

15 changes: 8 additions & 7 deletions common/src/main/java/net/kuujo/copycat/util/Listeners.java
Expand Up @@ -20,14 +20,15 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

/**
* Utility for managing a set of listeners.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class Listeners<T> implements Iterable<ListenerContext<T>> {
private final List<ListenerContext<T>> listeners = new CopyOnWriteArrayList<>();
public class Listeners<T> implements Iterable<Listener<T>> {
private final List<Listener<T>> listeners = new CopyOnWriteArrayList<>();

/**
* Returns the number of registered listeners.
Expand All @@ -44,13 +45,13 @@ public int size() {
* @param listener The listener to add.
* @return The listener context.
*/
public ListenerContext<T> add(Listener<T> listener) {
public Listener<T> add(Consumer<T> listener) {
if (listener == null)
throw new NullPointerException("listener cannot be null");

Context context = Context.currentContext();

ListenerContext<T> listenerContext = new ListenerContext<T>() {
Listener<T> wrapper = new Listener<T>() {
@Override
public void accept(T event) {
if (context != null) {
Expand All @@ -66,12 +67,12 @@ public void close() {
}
};

listeners.add(listenerContext);
return listenerContext;
listeners.add(wrapper);
return wrapper;
}

@Override
public Iterator<ListenerContext<T>> iterator() {
public Iterator<Listener<T>> iterator() {
return listeners.iterator();
}

Expand Down
Expand Up @@ -15,26 +15,26 @@
*/
package net.kuujo.copycat.coordination;

import net.kuujo.copycat.util.Listener;
import net.kuujo.copycat.util.ListenerContext;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.coordination.state.LeaderElectionCommands;
import net.kuujo.copycat.coordination.state.LeaderElectionState;
import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.util.Listener;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
* Asynchronous leader election resource.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class DistributedLeaderElection extends Resource {
private final Set<Listener<Void>> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Consumer<Void>> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Class<? extends StateMachine> stateMachine() {
Expand All @@ -45,7 +45,7 @@ protected Class<? extends StateMachine> stateMachine() {
protected void open(ResourceContext context) {
super.open(context);
context.session().onReceive(v -> {
for (Listener<Void> listener : listeners) {
for (Consumer<Void> listener : listeners) {
listener.accept(null);
}
});
Expand All @@ -57,24 +57,24 @@ protected void open(ResourceContext context) {
* @param listener The listener to register.
* @return A completable future to be completed with the listener context.
*/
public CompletableFuture<ListenerContext<Void>> onElection(Listener<Void> listener) {
public CompletableFuture<Listener<Void>> onElection(Consumer<Void> listener) {
if (!listeners.isEmpty()) {
listeners.add(listener);
return CompletableFuture.completedFuture(new ElectionListenerContext(listener));
return CompletableFuture.completedFuture(new ElectionListener(listener));
}

listeners.add(listener);
return submit(LeaderElectionCommands.Listen.builder().build())
.thenApply(v -> new ElectionListenerContext(listener));
.thenApply(v -> new ElectionListener(listener));
}

/**
* Change listener context.
*/
private class ElectionListenerContext implements ListenerContext<Void> {
private final Listener<Void> listener;
private class ElectionListener implements Listener<Void> {
private final Consumer<Void> listener;

private ElectionListenerContext(Listener<Void> listener) {
private ElectionListener(Consumer<Void> listener) {
this.listener = listener;
}

Expand Down

0 comments on commit bc16844

Please sign in to comment.