From fc836fadfedca0347911c89ea1d1365cb0d34035 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Thu, 22 Jun 2017 18:31:56 -0700 Subject: [PATCH] Refactor Raft proxy events to use core event interfaces. --- .../protocols/raft/proxy/RaftProxy.java | 8 +++--- .../raft/proxy/RaftProxyDelegate.java | 10 +++++--- .../proxy/impl/BlockingAwareRaftProxy.java | 13 +++++----- .../raft/proxy/impl/DefaultRaftProxy.java | 8 +++--- .../raft/proxy/impl/RaftProxyListener.java | 15 +++++------ .../io/atomix/protocols/raft/RaftTest.java | 25 ++++++++++++++++--- 6 files changed, 53 insertions(+), 26 deletions(-) diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxy.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxy.java index 3c7165a8a9..db04ef436c 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxy.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxy.java @@ -15,6 +15,8 @@ */ package io.atomix.protocols.raft.proxy; +import io.atomix.event.Event; +import io.atomix.event.EventListener; import io.atomix.protocols.raft.CommunicationStrategies; import io.atomix.protocols.raft.CommunicationStrategy; import io.atomix.protocols.raft.RaftCommand; @@ -96,7 +98,7 @@ enum State { * * @return The client proxy type. */ - String getType(); + String getTypeName(); /** * Returns the session state. @@ -191,14 +193,14 @@ default CompletableFuture submit(RaftOperation operation) { * @param listener The session receive callback. * @throws NullPointerException if {@code event} or {@code callback} is null */ - void addEventListener(Consumer listener); + void addEventListener(EventListener listener); /** * Removes an event listener. * * @param listener the event listener callback to remove */ - void removeEventListener(Consumer listener); + void removeEventListener(EventListener listener); /** * Returns a boolean indicating whether the session is open. diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxyDelegate.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxyDelegate.java index 6ad41fa637..55b96137b9 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxyDelegate.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/RaftProxyDelegate.java @@ -15,6 +15,8 @@ */ package io.atomix.protocols.raft.proxy; +import io.atomix.event.Event; +import io.atomix.event.EventListener; import io.atomix.protocols.raft.RaftCommand; import io.atomix.protocols.raft.RaftQuery; @@ -40,8 +42,8 @@ public String getName() { } @Override - public String getType() { - return delegate.getType(); + public String getTypeName() { + return delegate.getTypeName(); } @Override @@ -70,12 +72,12 @@ public CompletableFuture submit(RaftQuery query) { } @Override - public void addEventListener(Consumer listener) { + public void addEventListener(EventListener listener) { delegate.addEventListener(listener); } @Override - public void removeEventListener(Consumer listener) { + public void removeEventListener(EventListener listener) { delegate.removeEventListener(listener); } diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/BlockingAwareRaftProxy.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/BlockingAwareRaftProxy.java index 3371943433..9394349bac 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/BlockingAwareRaftProxy.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/BlockingAwareRaftProxy.java @@ -16,6 +16,8 @@ package io.atomix.protocols.raft.proxy.impl; import com.google.common.collect.Maps; +import io.atomix.event.Event; +import io.atomix.event.EventListener; import io.atomix.protocols.raft.RaftCommand; import io.atomix.protocols.raft.RaftQuery; import io.atomix.protocols.raft.proxy.RaftProxy; @@ -25,7 +27,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkNotNull; @@ -34,7 +35,7 @@ */ public class BlockingAwareRaftProxy extends RaftProxyDelegate { private final Executor executor; - private final Map listenerMap = Maps.newConcurrentMap(); + private final Map listenerMap = Maps.newConcurrentMap(); public BlockingAwareRaftProxy(RaftProxy delegate, Executor executor) { super(delegate); @@ -42,16 +43,16 @@ public BlockingAwareRaftProxy(RaftProxy delegate, Executor executor) { } @Override - public void addEventListener(Consumer listener) { - Consumer wrappedListener = event -> executor.execute(() -> listener.accept(event)); + public void addEventListener(EventListener listener) { + EventListener wrappedListener = event -> executor.execute(() -> listener.onEvent(event)); listenerMap.put(listener, wrappedListener); super.addEventListener(wrappedListener); } @Override @SuppressWarnings("unchecked") - public void removeEventListener(Consumer listener) { - Consumer wrappedListener = listenerMap.remove(listener); + public void removeEventListener(EventListener listener) { + EventListener wrappedListener = listenerMap.remove(listener); if (wrappedListener != null) { super.removeEventListener(wrappedListener); } diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/DefaultRaftProxy.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/DefaultRaftProxy.java index 47f7903611..6d3737efb6 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/DefaultRaftProxy.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/DefaultRaftProxy.java @@ -15,6 +15,8 @@ */ package io.atomix.protocols.raft.proxy.impl; +import io.atomix.event.Event; +import io.atomix.event.EventListener; import io.atomix.protocols.raft.CommunicationStrategies; import io.atomix.protocols.raft.CommunicationStrategy; import io.atomix.protocols.raft.RaftCommand; @@ -101,7 +103,7 @@ public String getName() { } @Override - public String getType() { + public String getTypeName() { return state.getSessionType(); } @@ -160,12 +162,12 @@ public CompletableFuture submit(RaftQuery query) { } @Override - public void addEventListener(Consumer listener) { + public void addEventListener(EventListener listener) { proxyListener.addEventListener(listener); } @Override - public void removeEventListener(Consumer listener) { + public void removeEventListener(EventListener listener) { proxyListener.removeEventListener(listener); } diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyListener.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyListener.java index 371af55c93..6ba2db03fa 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyListener.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyListener.java @@ -16,6 +16,8 @@ package io.atomix.protocols.raft.proxy.impl; import com.google.common.collect.Sets; +import io.atomix.event.Event; +import io.atomix.event.EventListener; import io.atomix.logging.Logger; import io.atomix.logging.LoggerFactory; import io.atomix.protocols.raft.protocol.PublishRequest; @@ -26,7 +28,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkNotNull; @@ -38,7 +39,7 @@ final class RaftProxyListener { private final RaftClientProtocol protocol; private final RaftProxyState state; - private final Set listeners = Sets.newLinkedHashSet(); + private final Set listeners = Sets.newLinkedHashSet(); private final RaftProxySequencer sequencer; private final Serializer serializer; private final Executor executor; @@ -57,7 +58,7 @@ public RaftProxyListener(RaftClientProtocol protocol, RaftProxyState state, Raft * * @param listener the event listener callback */ - public void addEventListener(Consumer listener) { + public void addEventListener(EventListener listener) { executor.execute(() -> listeners.add(listener)); } @@ -66,7 +67,7 @@ public void addEventListener(Consumer listener) { * * @param listener the event listener callback */ - public void removeEventListener(Consumer listener) { + public void removeEventListener(EventListener listener) { executor.execute(() -> listeners.remove(listener)); } @@ -113,9 +114,9 @@ private void handlePublish(PublishRequest request) { sequencer.sequenceEvent(request, () -> { for (byte[] bytes : request.events()) { - Object event = serializer.decode(bytes); - for (Consumer listener : listeners) { - listener.accept(event); + Event event = serializer.decode(bytes); + for (EventListener listener : listeners) { + listener.onEvent(event); } } }); diff --git a/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java b/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java index 6536d3da23..17154df973 100644 --- a/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java +++ b/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java @@ -15,6 +15,7 @@ */ package io.atomix.protocols.raft; +import io.atomix.event.AbstractEvent; import io.atomix.protocols.raft.cluster.MemberId; import io.atomix.protocols.raft.cluster.RaftClusterEvent; import io.atomix.protocols.raft.cluster.RaftMember; @@ -97,6 +98,7 @@ public class RaftTest extends ConcurrentTestCase { .register(TestEvent.class) .register(TestExpire.class) .register(TestClose.class) + .register(IndexEvent.class) .register(RaftQuery.ConsistencyLevel.class) .build()); @@ -800,10 +802,10 @@ private void testSequenceOperations(int nodes, RaftQuery.ConsistencyLevel consis RaftClient client = createClient(); RaftProxy session = createSession(client); - session.addEventListener(message -> { + session.addEventListener(event -> { threadAssertEquals(counter.incrementAndGet(), 3); - threadAssertTrue(message >= index.get()); - index.set(message); + threadAssertTrue(event.getSubject() >= index.get()); + index.set(event.getSubject()); resume(); }); @@ -1387,6 +1389,23 @@ public void expire(RaftCommit commit) { } } + /** + * Index event. + */ + public static class IndexEvent extends AbstractEvent { + public enum Type { + CHANGE, + } + + public IndexEvent(Type type, Long subject) { + super(type, subject); + } + + public IndexEvent(Type type, Long subject, long time) { + super(type, subject, time); + } + } + /** * Test command. */