Skip to content

Commit

Permalink
Refactor Raft proxy events to use core event interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 23, 2017
1 parent ccd9359 commit fc836fa
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 26 deletions.
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.protocols.raft.proxy; package io.atomix.protocols.raft.proxy;


import io.atomix.event.Event;
import io.atomix.event.EventListener;
import io.atomix.protocols.raft.CommunicationStrategies; import io.atomix.protocols.raft.CommunicationStrategies;
import io.atomix.protocols.raft.CommunicationStrategy; import io.atomix.protocols.raft.CommunicationStrategy;
import io.atomix.protocols.raft.RaftCommand; import io.atomix.protocols.raft.RaftCommand;
Expand Down Expand Up @@ -96,7 +98,7 @@ enum State {
* *
* @return The client proxy type. * @return The client proxy type.
*/ */
String getType(); String getTypeName();


/** /**
* Returns the session state. * Returns the session state.
Expand Down Expand Up @@ -191,14 +193,14 @@ default <T> CompletableFuture<T> submit(RaftOperation<T> operation) {
* @param listener The session receive callback. * @param listener The session receive callback.
* @throws NullPointerException if {@code event} or {@code callback} is null * @throws NullPointerException if {@code event} or {@code callback} is null
*/ */
<T> void addEventListener(Consumer<T> listener); <E extends Event> void addEventListener(EventListener<E> listener);


/** /**
* Removes an event listener. * Removes an event listener.
* *
* @param listener the event listener callback to remove * @param listener the event listener callback to remove
*/ */
<T> void removeEventListener(Consumer<T> listener); <E extends Event> void removeEventListener(EventListener<E> listener);


/** /**
* Returns a boolean indicating whether the session is open. * Returns a boolean indicating whether the session is open.
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.protocols.raft.proxy; package io.atomix.protocols.raft.proxy;


import io.atomix.event.Event;
import io.atomix.event.EventListener;
import io.atomix.protocols.raft.RaftCommand; import io.atomix.protocols.raft.RaftCommand;
import io.atomix.protocols.raft.RaftQuery; import io.atomix.protocols.raft.RaftQuery;


Expand All @@ -40,8 +42,8 @@ public String getName() {
} }


@Override @Override
public String getType() { public String getTypeName() {
return delegate.getType(); return delegate.getTypeName();
} }


@Override @Override
Expand Down Expand Up @@ -70,12 +72,12 @@ public <T> CompletableFuture<T> submit(RaftQuery<T> query) {
} }


@Override @Override
public <T> void addEventListener(Consumer<T> listener) { public <E extends Event> void addEventListener(EventListener<E> listener) {
delegate.addEventListener(listener); delegate.addEventListener(listener);
} }


@Override @Override
public <T> void removeEventListener(Consumer<T> listener) { public <E extends Event> void removeEventListener(EventListener<E> listener) {
delegate.removeEventListener(listener); delegate.removeEventListener(listener);
} }


Expand Down
Expand Up @@ -16,6 +16,8 @@
package io.atomix.protocols.raft.proxy.impl; package io.atomix.protocols.raft.proxy.impl;


import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.atomix.event.Event;
import io.atomix.event.EventListener;
import io.atomix.protocols.raft.RaftCommand; import io.atomix.protocols.raft.RaftCommand;
import io.atomix.protocols.raft.RaftQuery; import io.atomix.protocols.raft.RaftQuery;
import io.atomix.protocols.raft.proxy.RaftProxy; import io.atomix.protocols.raft.proxy.RaftProxy;
Expand All @@ -25,7 +27,6 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Consumer;


import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;


Expand All @@ -34,24 +35,24 @@
*/ */
public class BlockingAwareRaftProxy extends RaftProxyDelegate { public class BlockingAwareRaftProxy extends RaftProxyDelegate {
private final Executor executor; private final Executor executor;
private final Map<Consumer, Consumer> listenerMap = Maps.newConcurrentMap(); private final Map<EventListener, EventListener> listenerMap = Maps.newConcurrentMap();


public BlockingAwareRaftProxy(RaftProxy delegate, Executor executor) { public BlockingAwareRaftProxy(RaftProxy delegate, Executor executor) {
super(delegate); super(delegate);
this.executor = checkNotNull(executor, "executor cannot be null"); this.executor = checkNotNull(executor, "executor cannot be null");
} }


@Override @Override
public <T> void addEventListener(Consumer<T> listener) { public <E extends Event> void addEventListener(EventListener<E> listener) {
Consumer<T> wrappedListener = event -> executor.execute(() -> listener.accept(event)); EventListener<E> wrappedListener = event -> executor.execute(() -> listener.onEvent(event));
listenerMap.put(listener, wrappedListener); listenerMap.put(listener, wrappedListener);
super.addEventListener(wrappedListener); super.addEventListener(wrappedListener);
} }


@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> void removeEventListener(Consumer<T> listener) { public <E extends Event> void removeEventListener(EventListener<E> listener) {
Consumer<T> wrappedListener = listenerMap.remove(listener); EventListener<E> wrappedListener = listenerMap.remove(listener);
if (wrappedListener != null) { if (wrappedListener != null) {
super.removeEventListener(wrappedListener); super.removeEventListener(wrappedListener);
} }
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.protocols.raft.proxy.impl; package io.atomix.protocols.raft.proxy.impl;


import io.atomix.event.Event;
import io.atomix.event.EventListener;
import io.atomix.protocols.raft.CommunicationStrategies; import io.atomix.protocols.raft.CommunicationStrategies;
import io.atomix.protocols.raft.CommunicationStrategy; import io.atomix.protocols.raft.CommunicationStrategy;
import io.atomix.protocols.raft.RaftCommand; import io.atomix.protocols.raft.RaftCommand;
Expand Down Expand Up @@ -101,7 +103,7 @@ public String getName() {
} }


@Override @Override
public String getType() { public String getTypeName() {
return state.getSessionType(); return state.getSessionType();
} }


Expand Down Expand Up @@ -160,12 +162,12 @@ public <T> CompletableFuture<T> submit(RaftQuery<T> query) {
} }


@Override @Override
public <T> void addEventListener(Consumer<T> listener) { public <E extends Event> void addEventListener(EventListener<E> listener) {
proxyListener.addEventListener(listener); proxyListener.addEventListener(listener);
} }


@Override @Override
public <T> void removeEventListener(Consumer<T> listener) { public <E extends Event> void removeEventListener(EventListener<E> listener) {
proxyListener.removeEventListener(listener); proxyListener.removeEventListener(listener);
} }


Expand Down
Expand Up @@ -16,6 +16,8 @@
package io.atomix.protocols.raft.proxy.impl; package io.atomix.protocols.raft.proxy.impl;


import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.atomix.event.Event;
import io.atomix.event.EventListener;
import io.atomix.logging.Logger; import io.atomix.logging.Logger;
import io.atomix.logging.LoggerFactory; import io.atomix.logging.LoggerFactory;
import io.atomix.protocols.raft.protocol.PublishRequest; import io.atomix.protocols.raft.protocol.PublishRequest;
Expand All @@ -26,7 +28,6 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Consumer;


import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;


Expand All @@ -38,7 +39,7 @@ final class RaftProxyListener {


private final RaftClientProtocol protocol; private final RaftClientProtocol protocol;
private final RaftProxyState state; private final RaftProxyState state;
private final Set<Consumer> listeners = Sets.newLinkedHashSet(); private final Set<EventListener> listeners = Sets.newLinkedHashSet();
private final RaftProxySequencer sequencer; private final RaftProxySequencer sequencer;
private final Serializer serializer; private final Serializer serializer;
private final Executor executor; private final Executor executor;
Expand All @@ -57,7 +58,7 @@ public RaftProxyListener(RaftClientProtocol protocol, RaftProxyState state, Raft
* *
* @param listener the event listener callback * @param listener the event listener callback
*/ */
public void addEventListener(Consumer listener) { public void addEventListener(EventListener listener) {
executor.execute(() -> listeners.add(listener)); executor.execute(() -> listeners.add(listener));
} }


Expand All @@ -66,7 +67,7 @@ public void addEventListener(Consumer listener) {
* *
* @param listener the event listener callback * @param listener the event listener callback
*/ */
public void removeEventListener(Consumer listener) { public void removeEventListener(EventListener listener) {
executor.execute(() -> listeners.remove(listener)); executor.execute(() -> listeners.remove(listener));
} }


Expand Down Expand Up @@ -113,9 +114,9 @@ private void handlePublish(PublishRequest request) {


sequencer.sequenceEvent(request, () -> { sequencer.sequenceEvent(request, () -> {
for (byte[] bytes : request.events()) { for (byte[] bytes : request.events()) {
Object event = serializer.decode(bytes); Event event = serializer.decode(bytes);
for (Consumer listener : listeners) { for (EventListener listener : listeners) {
listener.accept(event); listener.onEvent(event);
} }
} }
}); });
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/ */
package io.atomix.protocols.raft; package io.atomix.protocols.raft;


import io.atomix.event.AbstractEvent;
import io.atomix.protocols.raft.cluster.MemberId; import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.cluster.RaftClusterEvent; import io.atomix.protocols.raft.cluster.RaftClusterEvent;
import io.atomix.protocols.raft.cluster.RaftMember; import io.atomix.protocols.raft.cluster.RaftMember;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class RaftTest extends ConcurrentTestCase {
.register(TestEvent.class) .register(TestEvent.class)
.register(TestExpire.class) .register(TestExpire.class)
.register(TestClose.class) .register(TestClose.class)
.register(IndexEvent.class)
.register(RaftQuery.ConsistencyLevel.class) .register(RaftQuery.ConsistencyLevel.class)
.build()); .build());


Expand Down Expand Up @@ -800,10 +802,10 @@ private void testSequenceOperations(int nodes, RaftQuery.ConsistencyLevel consis


RaftClient client = createClient(); RaftClient client = createClient();
RaftProxy session = createSession(client); RaftProxy session = createSession(client);
session.<Long>addEventListener(message -> { session.<IndexEvent>addEventListener(event -> {
threadAssertEquals(counter.incrementAndGet(), 3); threadAssertEquals(counter.incrementAndGet(), 3);
threadAssertTrue(message >= index.get()); threadAssertTrue(event.getSubject() >= index.get());
index.set(message); index.set(event.getSubject());
resume(); resume();
}); });


Expand Down Expand Up @@ -1387,6 +1389,23 @@ public void expire(RaftCommit<TestExpire> commit) {
} }
} }


/**
* Index event.
*/
public static class IndexEvent extends AbstractEvent<IndexEvent.Type, Long> {
public enum Type {
CHANGE,
}

public IndexEvent(Type type, Long subject) {
super(type, subject);
}

public IndexEvent(Type type, Long subject, long time) {
super(type, subject, time);
}
}

/** /**
* Test command. * Test command.
*/ */
Expand Down

0 comments on commit fc836fa

Please sign in to comment.