Skip to content

Commit

Permalink
Refactor DistributedTopic to ensure topic messages are only sent to c…
Browse files Browse the repository at this point in the history
…lients with registered listeners.
  • Loading branch information
kuujo committed Aug 16, 2015
1 parent 8e2aee8 commit a9774cc
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 59 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import net.kuujo.copycat.resource.ResourceContext; import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.util.Listener; import net.kuujo.copycat.util.Listener;


import java.util.List; import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer; import java.util.function.Consumer;


/** /**
Expand All @@ -33,18 +33,19 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class DistributedTopic<T> extends Resource { public class DistributedTopic<T> extends Resource {
private final List<TopicListener<T>> listeners = new CopyOnWriteArrayList<>(); private final Set<Consumer<T>> listeners = new HashSet<>();


@Override @Override
protected Class<? extends StateMachine> stateMachine() { protected Class<? extends StateMachine> stateMachine() {
return TopicState.class; return TopicState.class;
} }


@Override @Override
@SuppressWarnings("unchecked")
protected void open(ResourceContext context) { protected void open(ResourceContext context) {
super.open(context); super.open(context);
context.session().onReceive(message -> { context.session().onReceive(message -> {
for (TopicListener<T> listener : listeners) { for (Consumer<T> listener : listeners) {
listener.accept((T) message); listener.accept((T) message);
} }
}); });
Expand All @@ -68,30 +69,40 @@ public CompletableFuture<Void> publish(T message) {
* @param listener The message listener. * @param listener The message listener.
* @return The listener context. * @return The listener context.
*/ */
public Listener<T> onMessage(Consumer<T> listener) { public CompletableFuture<Listener<T>> onMessage(Consumer<T> listener) {
TopicListener<T> context = new TopicListener<T>(listener); if (!listeners.isEmpty()) {
listeners.add(context); listeners.add(listener);
return context; return CompletableFuture.completedFuture(new TopicListener(listener));
}

listeners.add(listener);
return submit(TopicCommands.Listen.builder().build())
.thenApply(v -> new TopicListener(listener));
} }


/** /**
* Topic listener context. * Topic listener.
*/ */
private class TopicListener<T> implements Listener<T> { private class TopicListener implements Listener<T> {
private final Consumer<T> listener; private final Consumer<T> listener;


private TopicListener(Consumer<T> listener) { private TopicListener(Consumer<T> listener) {
this.listener = listener; this.listener = listener;
} }


@Override @Override
public void accept(T event) { public void accept(T message) {
listener.accept(event); listener.accept(message);
} }


@Override @Override
public void close() { public void close() {
listeners.remove(this); synchronized (DistributedTopic.this) {
listeners.remove(listener);
if (listeners.isEmpty()) {
submit(TopicCommands.Unlisten.builder().build());
}
}
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/ */
package net.kuujo.copycat.coordination.state; package net.kuujo.copycat.coordination.state;


import net.kuujo.copycat.util.BuilderPool;
import net.kuujo.copycat.raft.protocol.Command;
import net.kuujo.copycat.raft.protocol.Operation;
import net.kuujo.copycat.io.BufferInput; import net.kuujo.copycat.io.BufferInput;
import net.kuujo.copycat.io.BufferOutput; import net.kuujo.copycat.io.BufferOutput;
import net.kuujo.copycat.io.serializer.CopycatSerializable; import net.kuujo.copycat.io.serializer.CopycatSerializable;
import net.kuujo.copycat.io.serializer.SerializeWith;
import net.kuujo.copycat.io.serializer.Serializer; import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.raft.protocol.Command;
import net.kuujo.copycat.raft.protocol.Operation;
import net.kuujo.copycat.util.BuilderPool;


/** /**
* Topic commands. * Topic commands.
Expand All @@ -37,6 +38,15 @@ private TopicCommands() {
* Abstract topic command. * Abstract topic command.
*/ */
public static abstract class TopicCommand<V> implements Command<V>, CopycatSerializable { public static abstract class TopicCommand<V> implements Command<V>, CopycatSerializable {
@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {

}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {

}


/** /**
* Base map command builder. * Base map command builder.
Expand All @@ -48,6 +58,68 @@ protected Builder(BuilderPool<T, U> pool) {
} }
} }


/**
* Listen command.
*/
@SerializeWith(id=510)
public static class Listen extends TopicCommand<Void> {

/**
* Returns a new listen command builder.
*
* @return A new listen command builder.
*/
@SuppressWarnings("unchecked")
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

/**
* Listen command builder.
*/
public static class Builder extends TopicCommand.Builder<Builder, Listen, Void> {
public Builder(BuilderPool<Builder, Listen> pool) {
super(pool);
}

@Override
protected Listen create() {
return new Listen();
}
}
}

/**
* Unlisten command.
*/
@SerializeWith(id=511)
public static class Unlisten extends TopicCommand<Void> {

/**
* Returns a new unlisten command builder.
*
* @return A new unlisten command builder.
*/
@SuppressWarnings("unchecked")
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

/**
* Unlisten command builder.
*/
public static class Builder extends TopicCommand.Builder<Builder, Unlisten, Void> {
public Builder(BuilderPool<Builder, Unlisten> pool) {
super(pool);
}

@Override
protected Unlisten create() {
return new Unlisten();
}
}
}

/** /**
* Publish command. * Publish command.
*/ */
Expand Down Expand Up @@ -112,44 +184,4 @@ protected Publish<T> create() {
} }
} }


/**
* Subscribe command.
*/
public static class Subscribe extends TopicCommand<Void> {

/**
* Returns a new publish command builder.
*
* @return The publish command builder.
*/
@SuppressWarnings("unchecked")
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {

}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {

}

/**
* Publish command builder.
*/
public static class Builder extends TopicCommand.Builder<Builder, Subscribe, Void> {
public Builder(BuilderPool<Builder, Subscribe> pool) {
super(pool);
}

@Override
protected Subscribe create() {
return new Subscribe();
}
}
}

} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,29 +15,55 @@
*/ */
package net.kuujo.copycat.coordination.state; package net.kuujo.copycat.coordination.state;


import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine; import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.StateMachineExecutor; import net.kuujo.copycat.raft.StateMachineExecutor;


import java.util.HashMap;
import java.util.Map;

/** /**
* Topic state machine. * Topic state machine.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class TopicState extends StateMachine { public class TopicState extends StateMachine {
private final Map<Long, Commit<TopicCommands.Listen>> listeners = new HashMap<>();


@Override @Override
public void configure(StateMachineExecutor executor) { public void configure(StateMachineExecutor executor) {
executor.register(TopicCommands.Publish.class, this::publish); executor.register(TopicCommands.Publish.class, this::publish);
} }


/**
* Applies listen commits.
*/
protected void listen(Commit<TopicCommands.Listen> commit) {
if (!listeners.containsKey(commit.session().id())) {
listeners.put(commit.session().id(), commit);
} else {
commit.clean();
}
}

/**
* Applies listen commits.
*/
protected void unlisten(Commit<LeaderElectionCommands.Unlisten> commit) {
Commit<TopicCommands.Listen> listener = listeners.remove(commit.session().id());
if (listener != null) {
listener.clean();
} else {
commit.clean();
}
}

/** /**
* Handles a publish commit. * Handles a publish commit.
*/ */
protected void publish(Commit<TopicCommands.Publish> commit) { protected void publish(Commit<TopicCommands.Publish> commit) {
for (Session session : context().sessions()) { for (Commit<TopicCommands.Listen> listener : listeners.values()) {
session.publish(commit.operation().message()); listener.session().publish(commit.operation().message());
} }
commit.clean(); commit.clean();
} }
Expand Down

0 comments on commit a9774cc

Please sign in to comment.