Skip to content

Commit

Permalink
Ensure resources can send events to clients via sessions.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 23, 2015
1 parent c54092d commit 1387c3a
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 24 deletions.
169 changes: 165 additions & 4 deletions atomic/src/main/java/net/kuujo/copycat/atomic/AsyncReference.java
Expand Up @@ -20,18 +20,17 @@
import net.kuujo.alleycat.SerializeWith;
import net.kuujo.alleycat.io.BufferInput;
import net.kuujo.alleycat.io.BufferOutput;
import net.kuujo.copycat.BuilderPool;
import net.kuujo.copycat.Mode;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.Stateful;
import net.kuujo.copycat.*;
import net.kuujo.copycat.log.Compaction;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.server.Apply;
import net.kuujo.copycat.raft.server.Commit;
import net.kuujo.copycat.raft.server.Filter;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,9 +42,15 @@
@Stateful(AsyncReference.StateMachine.class)
public class AsyncReference<T> extends Resource {
private ConsistencyLevel defaultConsistency = ConsistencyLevel.LINEARIZABLE_LEASE;
private final java.util.Set<Listener<T>> changeListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());

public AsyncReference(Raft protocol) {
super(protocol);
protocol.session().<T>onReceive(event -> {
for (Listener<T> listener : changeListeners) {
listener.accept(event);
}
});
}

/**
Expand Down Expand Up @@ -378,6 +383,49 @@ public CompletableFuture<Boolean> compareAndSet(T expect, T update, long ttl, Ti
.build());
}

/**
* Registers a change listener.
*
* @param listener The change listener.
* @return A completable future to be completed once the change listener has been registered.
*/
public synchronized CompletableFuture<ListenerContext<T>> onChange(Listener<T> listener) {
if (!changeListeners.isEmpty()) {
changeListeners.add(listener);
return CompletableFuture.completedFuture(new ChangeListenerContext(listener));
}

changeListeners.add(listener);
return submit(ChangeListen.builder().build())
.thenApply(v -> new ChangeListenerContext(listener));
}

/**
* Change listener context.
*/
private class ChangeListenerContext implements ListenerContext<T> {
private final Listener<T> listener;

private ChangeListenerContext(Listener<T> listener) {
this.listener = listener;
}

@Override
public void accept(T event) {
listener.accept(event);
}

@Override
public void close() {
synchronized (AsyncReference.this) {
changeListeners.remove(listener);
if (changeListeners.isEmpty()) {
submit(ChangeUnlisten.builder().build());
}
}
}
}

/**
* Abstract reference command.
*/
Expand Down Expand Up @@ -765,11 +813,92 @@ public Builder<T> withValue(Object value) {
}
}

/**
* Change listen.
*/
@SerializeWith(id=464)
public static class ChangeListen implements Command<Void>, AlleycatSerializable {

/**
* Returns a new change listen builder.
*
* @return A new change listen builder.
*/
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

@Override
public void writeObject(BufferOutput buffer, Alleycat alleycat) {

}

@Override
public void readObject(BufferInput buffer, Alleycat alleycat) {

}

/**
* Change listen builder.
*/
public static class Builder extends Command.Builder<Builder, ChangeListen, Void> {
public Builder(BuilderPool<Builder, ChangeListen> pool) {
super(pool);
}

@Override
protected ChangeListen create() {
return new ChangeListen();
}
}
}

/**
* Change unlisten.
*/
@SerializeWith(id=465)
public static class ChangeUnlisten implements Command<Void>, AlleycatSerializable {

/**
* Returns a new change unlisten builder.
*
* @return A new change unlisten builder.
*/
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

@Override
public void writeObject(BufferOutput buffer, Alleycat alleycat) {

}

@Override
public void readObject(BufferInput buffer, Alleycat alleycat) {

}

/**
* Change unlisten builder.
*/
public static class Builder extends Command.Builder<Builder, ChangeUnlisten, Void> {
public Builder(BuilderPool<Builder, ChangeUnlisten> pool) {
super(pool);
}

@Override
protected ChangeUnlisten create() {
return new ChangeUnlisten();
}
}
}

/**
* Async reference state machine.
*/
public static class StateMachine extends net.kuujo.copycat.raft.server.StateMachine {
private final java.util.Set<Long> sessions = new HashSet<>();
private final java.util.Set<Session> listeners = new HashSet<>();
private final AtomicReference<Object> value = new AtomicReference<>();
private Commit<? extends ReferenceCommand> command;
private long version;
Expand Down Expand Up @@ -811,6 +940,33 @@ private boolean isActive(Commit<? extends ReferenceCommand> commit) {
return true;
}

/**
* Handles a listen commit.
*/
@Apply(ChangeListen.class)
protected void listen(Commit<ChangeListen> commit) {
updateTime(commit);
listeners.add(commit.session());
}

/**
* Handles an unlisten commit.
*/
@Apply(ChangeUnlisten.class)
protected void unlisten(Commit<ChangeUnlisten> commit) {
updateTime(commit);
listeners.remove(commit.session());
}

/**
* Triggers a change event.
*/
private void change(Object value) {
for (Session session : listeners) {
session.publish(value);
}
}

/**
* Handles a get commit.
*/
Expand All @@ -829,6 +985,7 @@ protected void set(Commit<Set> commit) {
value.set(commit.operation().value());
command = commit;
version = commit.index();
change(value.get());
}

/**
Expand All @@ -840,13 +997,15 @@ protected boolean compareAndSet(Commit<CompareAndSet> commit) {
if (isActive(command)) {
if (value.compareAndSet(commit.operation().expect(), commit.operation().update())) {
command = commit;
change(value.get());
return true;
}
return false;
} else if (commit.operation().expect() == null) {
value.set(null);
command = commit;
version = commit.index();
change(null);
return true;
} else {
return false;
Expand All @@ -863,11 +1022,13 @@ protected Object getAndSet(Commit<GetAndSet> commit) {
Object result = value.getAndSet(commit.operation().value());
command = commit;
version = commit.index();
change(value.get());
return result;
} else {
value.set(commit.operation().value());
command = commit;
version = commit.index();
change(value.get());
return null;
}
}
Expand Down
Expand Up @@ -62,6 +62,37 @@ public void testSetGet() throws Throwable {
await();
}

/**
* Tests setting and getting a value with a change event.
*/
@SuppressWarnings("unchecked")
public void testChange() throws Throwable {
Servers servers = createCopycats(3, 2);

Copycat copycat = servers.active.get(0);

Node node = copycat.create("/test").get();
AsyncReference<String> reference = node.create(AsyncReference.class).get();

expectResume();
reference.onChange(value -> {
threadAssertEquals("Hello world!", value);
resume();
}).thenRun(this::resume);
await();

expectResumes(2);
reference.set("Hello world!").thenRun(this::resume);
await();

expectResume();
reference.get().thenAccept(result -> {
threadAssertEquals(result, "Hello world!");
resume();
});
await();
}

/**
* Tests compare-and-set.
*/
Expand Down
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.client.state.RaftClientState;
import net.kuujo.copycat.transport.Transport;
import net.kuujo.copycat.util.concurrent.Context;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -49,6 +50,11 @@ private RaftClient(RaftClientState client) {
this.client = client;
}

@Override
public Context context() {
return client.getContext();
}

@Override
public Session session() {
return client.getSession();
Expand Down
Expand Up @@ -97,6 +97,15 @@ protected RaftClientState(Member member, Members members, Transport transport, A
this.session = new ClientSession(context);
}

/**
* Returns the client context.
*
* @return The client context.
*/
public Context getContext() {
return context;
}

/**
* Returns the cluster leader.
*
Expand Down
9 changes: 9 additions & 0 deletions common/src/main/java/net/kuujo/copycat/Listeners.java
Expand Up @@ -27,6 +27,15 @@
public class Listeners<T> implements Iterable<ListenerContext<T>> {
private final List<ListenerContext<T>> listeners = new CopyOnWriteArrayList<>();

/**
* Returns the number of registered listeners.
*
* @return The number of registered listeners.
*/
public int size() {
return listeners.size();
}

/**
* Adds a listener to the set of listeners.
*
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/java/net/kuujo/copycat/Copycat.java
Expand Up @@ -15,10 +15,7 @@
*/
package net.kuujo.copycat;

import net.kuujo.copycat.manager.CreatePath;
import net.kuujo.copycat.manager.CreateResource;
import net.kuujo.copycat.manager.DeletePath;
import net.kuujo.copycat.manager.PathExists;
import net.kuujo.copycat.manager.*;
import net.kuujo.copycat.raft.ManagedRaft;
import net.kuujo.copycat.raft.Raft;
import net.kuujo.copycat.raft.server.StateMachine;
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/net/kuujo/copycat/ResourceProtocol.java
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.raft.Query;
import net.kuujo.copycat.raft.Raft;
import net.kuujo.copycat.raft.Session;
import net.kuujo.copycat.util.concurrent.Context;

import java.util.concurrent.CompletableFuture;

Expand All @@ -31,15 +32,22 @@
class ResourceProtocol implements Raft {
private final long resource;
private final Raft protocol;
private final ResourceSession session;

public ResourceProtocol(long resource, Raft protocol) {
this.resource = resource;
this.protocol = protocol;
this.session = new ResourceSession(resource, protocol.session(), protocol.context());
}

@Override
public Context context() {
return protocol.context();
}

@Override
public Session session() {
return protocol.session();
return session;
}

@Override
Expand Down

0 comments on commit 1387c3a

Please sign in to comment.