Skip to content

Commit

Permalink
Refactor service Session to support a proxy-based client event API. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 9, 2018
1 parent 78fed35 commit 98bdc82
Show file tree
Hide file tree
Showing 18 changed files with 328 additions and 241 deletions.
Expand Up @@ -102,7 +102,7 @@ public void unlisten() {
} }


private void notifyLeadershipChange(Leadership<byte[]> oldLeadership, Leadership<byte[]> newLeadership) { private void notifyLeadershipChange(Leadership<byte[]> oldLeadership, Leadership<byte[]> newLeadership) {
listeners.forEach(id -> acceptOn(id, client -> client.onLeadershipChange(oldLeadership, newLeadership))); listeners.forEach(id -> getSession(id).accept(client -> client.onLeadershipChange(oldLeadership, newLeadership)));
} }


@Override @Override
Expand Down
Expand Up @@ -91,7 +91,7 @@ public void restore(BackupInput reader) {
} }


private void notifyLeadershipChange(String topic, Leadership<byte[]> previousLeadership, Leadership<byte[]> newLeadership) { private void notifyLeadershipChange(String topic, Leadership<byte[]> previousLeadership, Leadership<byte[]> newLeadership) {
listeners.forEach(id -> acceptOn(id, client -> client.onLeadershipChange(topic, previousLeadership, newLeadership))); listeners.forEach(id -> getSession(id).accept(client -> client.onLeadershipChange(topic, previousLeadership, newLeadership)));
} }


@Override @Override
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void restore(BackupInput input) {
queue.remove(holder); queue.remove(holder);
Session session = getSession(holder.session); Session session = getSession(holder.session);
if (session != null && session.getState().active()) { if (session != null && session.getState().active()) {
acceptOn(holder.session, service -> service.failed(holder.id)); getSession(holder.session).accept(service -> service.failed(holder.id));
} }
})); }));
} }
Expand All @@ -97,7 +97,7 @@ public void onClose(Session session) {


@Override @Override
public void lock(int id, long timeout) { public void lock(int id, long timeout) {
Session session = getCurrentSession(); Session<DistributedLockClient> session = getCurrentSession();
// If the lock is not already owned, immediately grant the lock to the requester. // If the lock is not already owned, immediately grant the lock to the requester.
// Note that we still have to publish an event to the session. The event is guaranteed to be received // Note that we still have to publish an event to the session. The event is guaranteed to be received
// by the client-side primitive after the LOCK response. // by the client-side primitive after the LOCK response.
Expand All @@ -107,10 +107,10 @@ public void lock(int id, long timeout) {
getCurrentIndex(), getCurrentIndex(),
session.sessionId(), session.sessionId(),
0); 0);
acceptOn(session, service -> service.locked(id, getCurrentIndex())); session.accept(service -> service.locked(id, getCurrentIndex()));
// If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request. // If the timeout is 0, that indicates this is a tryLock request. Immediately fail the request.
} else if (timeout == 0) { } else if (timeout == 0) {
acceptOn(session, service -> service.failed(id)); session.accept(service -> service.failed(id));
// If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration // If a timeout exists, add the request to the queue and set a timer. Note that the lock request expiration
// time is based on the *state machine* time - not the system time - to ensure consistency across servers. // time is based on the *state machine* time - not the system time - to ensure consistency across servers.
} else if (timeout > 0) { } else if (timeout > 0) {
Expand All @@ -127,7 +127,7 @@ public void lock(int id, long timeout) {
timers.remove(getCurrentIndex()); timers.remove(getCurrentIndex());
queue.remove(holder); queue.remove(holder);
if (session.getState().active()) { if (session.getState().active()) {
acceptOn(session, service -> service.failed(id)); session.accept(service -> service.failed(id));
} }
})); }));
// If the lock is -1, just add the request to the queue with no expiration. // If the lock is -1, just add the request to the queue with no expiration.
Expand Down Expand Up @@ -168,7 +168,7 @@ public void unlock(int id) {
// Notify the client that it has acquired the lock. // Notify the client that it has acquired the lock.
Session lockSession = getSession(lock.session); Session lockSession = getSession(lock.session);
if (lockSession != null && lockSession.getState().active()) { if (lockSession != null && lockSession.getState().active()) {
acceptOn(lock.session, service -> service.locked(lock.id, getCurrentIndex())); getSession(lock.session).accept(service -> service.locked(lock.id, getCurrentIndex()));
break; break;
} }
lock = queue.poll(); lock = queue.poll();
Expand Down Expand Up @@ -203,7 +203,7 @@ private void releaseSession(Session session) {
// Notify the client that it has acquired the lock. // Notify the client that it has acquired the lock.
Session lockSession = getSession(lock.session); Session lockSession = getSession(lock.session);
if (lockSession != null && lockSession.getState().active()) { if (lockSession != null && lockSession.getState().active()) {
acceptOn(lock.session, service -> service.locked(lock.id, lock.index)); getSession(lock.session).accept(service -> service.locked(lock.id, lock.index));
break; break;
} }
lock = queue.poll(); lock = queue.poll();
Expand Down
Expand Up @@ -778,7 +778,7 @@ private void publish(MapEvent<String, byte[]> event) {
* @param events list of map event to publish * @param events list of map event to publish
*/ */
private void publish(List<MapEvent<String, byte[]>> events) { private void publish(List<MapEvent<String, byte[]>> events) {
listeners.forEach(listener -> events.forEach(event -> acceptOn(listener, client -> client.change(event)))); listeners.forEach(listener -> events.forEach(event -> getSession(listener).accept(client -> client.change(event))));
} }


@Override @Override
Expand Down
Expand Up @@ -320,7 +320,7 @@ public void unlisten() {
* @param newValue the new value * @param newValue the new value
*/ */
private void onChange(String key, byte[] oldValue, byte[] newValue) { private void onChange(String key, byte[] oldValue, byte[] newValue) {
listeners.forEach(id -> acceptOn(id, client -> client.onChange(key, oldValue, newValue))); listeners.forEach(id -> getSession(id).accept(client -> client.onChange(key, oldValue, newValue)));
} }


private interface MapEntryValues { private interface MapEntryValues {
Expand Down
Expand Up @@ -129,7 +129,7 @@ public void add(Collection<byte[]> items) {


// Send an event to all sessions that have expressed interest in task processing // Send an event to all sessions that have expressed interest in task processing
// and are not actively processing a task. // and are not actively processing a task.
registeredWorkers.forEach(session -> acceptOn(session, client -> client.taskAvailable())); registeredWorkers.forEach(session -> getSession(session).accept(client -> client.taskAvailable()));
// FIXME: This generates a lot of event traffic. // FIXME: This generates a lot of event traffic.
} }


Expand Down
Expand Up @@ -205,11 +205,11 @@ private void release(long sessionId, int releasePermits) {
} }


private void success(SessionId sessionId, long operationId, int acquirePermits, long version) { private void success(SessionId sessionId, long operationId, int acquirePermits, long version) {
acceptOn(sessionId, client -> client.succeeded(operationId, version, acquirePermits)); getSession(sessionId).accept(client -> client.succeeded(operationId, version, acquirePermits));
} }


private void fail(SessionId sessionId, long operationId) { private void fail(SessionId sessionId, long operationId) {
acceptOn(sessionId, client -> client.failed(operationId)); getSession(sessionId).accept(client -> client.failed(operationId));
} }


private void releaseSession(Session session) { private void releaseSession(Session session) {
Expand Down
Expand Up @@ -266,7 +266,7 @@ private void notifyListeners(DocumentTreeEvent<byte[]> event) {
listeners.entrySet() listeners.entrySet()
.stream() .stream()
.filter(e -> event.path().isDescendentOf(e.getValue().leastCommonAncestorPath())) .filter(e -> event.path().isDescendentOf(e.getValue().leastCommonAncestorPath()))
.forEach(e -> acceptOn(e.getKey(), client -> client.change(event))); .forEach(e -> getSession(e.getKey()).accept(client -> client.change(event)));
} }


@Override @Override
Expand Down
Expand Up @@ -20,45 +20,50 @@
import io.atomix.primitive.service.AbstractPrimitiveService; import io.atomix.primitive.service.AbstractPrimitiveService;
import io.atomix.primitive.service.BackupInput; import io.atomix.primitive.service.BackupInput;
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.SessionId;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.Serializer;


import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.Set;


/** /**
* Raft atomic value service. * Raft atomic value service.
*/ */
public class DefaultAtomicValueService extends AbstractPrimitiveService<AtomicValueClient> implements AtomicValueService { public class DefaultAtomicValueService extends AbstractPrimitiveService<AtomicValueClient> implements AtomicValueService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder()
.register((KryoNamespace) AtomicValueType.instance().namespace())
.register(SessionId.class)
.build());

private byte[] value; private byte[] value;
private java.util.Set<Session> listeners = Sets.newHashSet(); private Set<SessionId> listeners = Sets.newHashSet();


public DefaultAtomicValueService() { public DefaultAtomicValueService() {
super(AtomicValueType.instance(), AtomicValueClient.class); super(AtomicValueType.instance(), AtomicValueClient.class);
} }


@Override
public Serializer serializer() {
return SERIALIZER;
}

@Override @Override
public void backup(BackupOutput writer) { public void backup(BackupOutput writer) {
writer.writeInt(value.length).writeBytes(value); writer.writeInt(value.length).writeBytes(value);
java.util.Set<Long> sessionIds = new HashSet<>(); writer.writeObject(listeners);
for (Session session : listeners) {
sessionIds.add(session.sessionId().id());
}
writer.writeObject(sessionIds);
} }


@Override @Override
public void restore(BackupInput reader) { public void restore(BackupInput reader) {
value = reader.readBytes(reader.readInt()); value = reader.readBytes(reader.readInt());
listeners = new HashSet<>(); listeners = reader.readObject();
for (Long sessionId : reader.<java.util.Set<Long>>readObject()) {
listeners.add(getSession(sessionId));
}
} }


private byte[] updateAndNotify(byte[] value) { private byte[] updateAndNotify(byte[] value) {
byte[] oldValue = this.value; byte[] oldValue = this.value;
this.value = value; this.value = value;
listeners.forEach(s -> acceptOn(s, client -> client.change(value, oldValue))); listeners.forEach(session -> getSession(session).accept(client -> client.change(value, oldValue)));
return oldValue; return oldValue;
} }


Expand Down Expand Up @@ -93,11 +98,11 @@ public byte[] getAndSet(byte[] value) {


@Override @Override
public void addListener() { public void addListener() {
listeners.add(getCurrentSession()); listeners.add(getCurrentSession().sessionId());
} }


@Override @Override
public void removeListener() { public void removeListener() {
listeners.remove(getCurrentSession()); listeners.remove(getCurrentSession().sessionId());
} }
} }

0 comments on commit 98bdc82

Please sign in to comment.