Skip to content

Commit

Permalink
Rename primitive session classes and interfaces for consistency with …
Browse files Browse the repository at this point in the history
…other primitive APIs.
  • Loading branch information
kuujo committed Apr 27, 2018
1 parent c70097c commit eecb343
Show file tree
Hide file tree
Showing 40 changed files with 192 additions and 198 deletions.
Expand Up @@ -34,7 +34,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.ArraySizeHashPrinter; import io.atomix.utils.ArraySizeHashPrinter;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;
Expand Down Expand Up @@ -76,7 +76,7 @@ public class LeaderElectionService extends AbstractPrimitiveService {
private long termStartTime; private long termStartTime;
private List<Registration> registrations = new LinkedList<>(); private List<Registration> registrations = new LinkedList<>();
private AtomicLong termCounter = new AtomicLong(); private AtomicLong termCounter = new AtomicLong();
private Map<Long, Session> listeners = new LinkedHashMap<>(); private Map<Long, PrimitiveSession> listeners = new LinkedHashMap<>();


@Override @Override
protected Serializer serializer() { protected Serializer serializer() {
Expand Down Expand Up @@ -320,7 +320,7 @@ private Leadership<byte[]> leadership() {
return new Leadership<>(leader(), candidates()); return new Leadership<>(leader(), candidates());
} }


private void onSessionEnd(Session session) { private void onSessionEnd(PrimitiveSession session) {
listeners.remove(session.sessionId().id()); listeners.remove(session.sessionId().id());
Leadership<byte[]> oldLeadership = leadership(); Leadership<byte[]> oldLeadership = leadership();
cleanup(session); cleanup(session);
Expand Down Expand Up @@ -380,7 +380,7 @@ protected void cleanup(byte[] id) {
} }
} }


protected void cleanup(Session session) { protected void cleanup(PrimitiveSession session) {
Optional<Registration> registration = Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst(); registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst();
if (registration.isPresent()) { if (registration.isPresent()) {
Expand Down Expand Up @@ -432,12 +432,12 @@ protected void addRegistration(Registration registration) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
onSessionEnd(session); onSessionEnd(session);
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
onSessionEnd(session); onSessionEnd(session);
} }
} }
Expand Up @@ -39,7 +39,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.ArraySizeHashPrinter; import io.atomix.utils.ArraySizeHashPrinter;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class LeaderElectorService extends AbstractPrimitiveService {


private Map<String, AtomicLong> termCounters = new HashMap<>(); private Map<String, AtomicLong> termCounters = new HashMap<>();
private Map<String, ElectionState> elections = new HashMap<>(); private Map<String, ElectionState> elections = new HashMap<>();
private Map<Long, Session> listeners = new LinkedHashMap<>(); private Map<Long, PrimitiveSession> listeners = new LinkedHashMap<>();


@Override @Override
protected Serializer serializer() { protected Serializer serializer() {
Expand Down Expand Up @@ -355,7 +355,7 @@ private List<byte[]> candidates(String topic) {
return electionState == null ? new LinkedList<>() : electionState.candidates(); return electionState == null ? new LinkedList<>() : electionState.candidates();
} }


private void onSessionEnd(Session session) { private void onSessionEnd(PrimitiveSession session) {
listeners.remove(session.sessionId().id()); listeners.remove(session.sessionId().id());
Set<String> topics = elections.keySet(); Set<String> topics = elections.keySet();
List<LeadershipEvent<byte[]>> changes = Lists.newArrayList(); List<LeadershipEvent<byte[]>> changes = Lists.newArrayList();
Expand Down Expand Up @@ -457,7 +457,7 @@ private long countLeaders(String topic, Registration registration) {
.count(); .count();
} }


public ElectionState cleanup(String topic, Session session, Supplier<Long> termCounter) { public ElectionState cleanup(String topic, PrimitiveSession session, Supplier<Long> termCounter) {
Optional<Registration> registration = Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst(); registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst();
if (registration.isPresent()) { if (registration.isPresent()) {
Expand Down Expand Up @@ -586,12 +586,12 @@ public ElectionState promote(byte[] id) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
onSessionEnd(session); onSessionEnd(session);
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
onSessionEnd(session); onSessionEnd(session);
} }


Expand Down
Expand Up @@ -22,7 +22,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.concurrent.Scheduled; import io.atomix.utils.concurrent.Scheduled;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void restore(BackupInput input) {
timers.put(holder.index, getScheduler().schedule(Duration.ofMillis(holder.expire - getWallClock().getTime().unixTimestamp()), () -> { timers.put(holder.index, getScheduler().schedule(Duration.ofMillis(holder.expire - getWallClock().getTime().unixTimestamp()), () -> {
timers.remove(holder.index); timers.remove(holder.index);
queue.remove(holder); queue.remove(holder);
Session session = getSessions().getSession(holder.session); PrimitiveSession session = getSessions().getSession(holder.session);
if (session != null && session.getState().active()) { if (session != null && session.getState().active()) {
session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index)); session.publish(FAILED, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
} }
Expand All @@ -96,12 +96,12 @@ public void restore(BackupInput input) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
releaseSession(session); releaseSession(session);
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
releaseSession(session); releaseSession(session);
} }


Expand Down Expand Up @@ -190,7 +190,7 @@ protected void unlock(Commit<Unlock> commit) {


// If the lock session is for some reason inactive, continue on to the next waiter. Otherwise, // If the lock session is for some reason inactive, continue on to the next waiter. Otherwise,
// publish a LOCKED event to the new lock holder's session. // publish a LOCKED event to the new lock holder's session.
Session session = getSessions().getSession(lock.session); PrimitiveSession session = getSessions().getSession(lock.session);
if (session == null || !session.getState().active()) { if (session == null || !session.getState().active()) {
lock = queue.poll(); lock = queue.poll();
} else { } else {
Expand All @@ -213,7 +213,7 @@ protected void unlock(Commit<Unlock> commit) {
* *
* @param session the closed session * @param session the closed session
*/ */
private void releaseSession(Session session) { private void releaseSession(PrimitiveSession session) {
// Remove all instances of the session from the lock queue. // Remove all instances of the session from the lock queue.
queue.removeIf(lock -> lock.session == session.sessionId().id()); queue.removeIf(lock -> lock.session == session.sessionId().id());


Expand All @@ -230,7 +230,7 @@ private void releaseSession(Session session) {


// If the lock session is inactive, continue on to the next waiter. Otherwise, // If the lock session is inactive, continue on to the next waiter. Otherwise,
// publish a LOCKED event to the new lock holder's session. // publish a LOCKED event to the new lock holder's session.
Session lockSession = getSessions().getSession(lock.session); PrimitiveSession lockSession = getSessions().getSession(lock.session);
if (lockSession == null || !lockSession.getState().active()) { if (lockSession == null || !lockSession.getState().active()) {
lock = queue.poll(); lock = queue.poll();
} else { } else {
Expand Down
Expand Up @@ -45,7 +45,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.concurrent.Scheduled; import io.atomix.utils.concurrent.Scheduled;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
Expand Down Expand Up @@ -112,7 +112,7 @@ public class ConsistentMapService extends AbstractPrimitiveService {
.register(new HashMap().keySet().getClass()) .register(new HashMap().keySet().getClass())
.build()); .build());


protected Map<Long, Session> listeners = new LinkedHashMap<>(); protected Map<Long, PrimitiveSession> listeners = new LinkedHashMap<>();
private Map<String, MapEntryValue> map; private Map<String, MapEntryValue> map;
protected Set<String> preparedKeys = Sets.newHashSet(); protected Set<String> preparedKeys = Sets.newHashSet();
protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap(); protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
Expand Down Expand Up @@ -701,7 +701,7 @@ protected MapEntryUpdateResult.Status clear() {
* *
* @param session listen session * @param session listen session
*/ */
protected void listen(Session session) { protected void listen(PrimitiveSession session) {
listeners.put(session.sessionId().id(), session); listeners.put(session.sessionId().id(), session);
} }


Expand All @@ -710,7 +710,7 @@ protected void listen(Session session) {
* *
* @param session unlisten session * @param session unlisten session
*/ */
protected void unlisten(Session session) { protected void unlisten(PrimitiveSession session) {
listeners.remove(session.sessionId().id()); listeners.remove(session.sessionId().id());
} }


Expand Down Expand Up @@ -1009,12 +1009,12 @@ private void publish(List<MapEvent<String, byte[]>> events) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
closeListener(session.sessionId().id()); closeListener(session.sessionId().id());
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
closeListener(session.sessionId().id()); closeListener(session.sessionId().id());
} }


Expand Down
Expand Up @@ -30,7 +30,7 @@
import io.atomix.core.transaction.TransactionLog; import io.atomix.core.transaction.TransactionLog;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;
Expand Down Expand Up @@ -185,12 +185,12 @@ private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
closeListener(session.sessionId().id()); closeListener(session.sessionId().id());
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
closeListener(session.sessionId().id()); closeListener(session.sessionId().id());
} }


Expand Down
Expand Up @@ -41,7 +41,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.Match; import io.atomix.utils.Match;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
Expand Down Expand Up @@ -115,7 +115,7 @@ public NonTransactionalCommit read(Kryo kryo, Input input, Class<NonTransactiona
.build()); .build());


private AtomicLong globalVersion = new AtomicLong(1); private AtomicLong globalVersion = new AtomicLong(1);
private Map<Long, Session> listeners = new LinkedHashMap<>(); private Map<Long, PrimitiveSession> listeners = new LinkedHashMap<>();
private Map<String, MapEntryValue> backingMap = Maps.newHashMap(); private Map<String, MapEntryValue> backingMap = Maps.newHashMap();


@Override @Override
Expand Down Expand Up @@ -164,12 +164,12 @@ protected void configure(ServiceExecutor executor) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
listeners.remove(session.sessionId().id()); listeners.remove(session.sessionId().id());
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
listeners.remove(session.sessionId().id()); listeners.remove(session.sessionId().id());
} }


Expand Down
Expand Up @@ -31,7 +31,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;
Expand Down Expand Up @@ -76,7 +76,7 @@ public class WorkQueueService extends AbstractPrimitiveService {


private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque(); private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque();
private Map<String, TaskAssignment> assignments = Maps.newHashMap(); private Map<String, TaskAssignment> assignments = Maps.newHashMap();
private Map<Long, Session> registeredWorkers = Maps.newHashMap(); private Map<Long, PrimitiveSession> registeredWorkers = Maps.newHashMap();


@Override @Override
protected Serializer serializer() { protected Serializer serializer() {
Expand Down Expand Up @@ -196,12 +196,12 @@ protected void complete(Commit<? extends Complete> commit) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
evictWorker(session.sessionId().id()); evictWorker(session.sessionId().id());
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
evictWorker(session.sessionId().id()); evictWorker(session.sessionId().id());
} }


Expand Down
Expand Up @@ -42,7 +42,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.Match; import io.atomix.utils.Match;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
Expand Down Expand Up @@ -290,12 +290,12 @@ private void notifyListeners(DocumentTreeEvent<byte[]> event) {
} }


@Override @Override
public void onExpire(Session session) { public void onExpire(PrimitiveSession session) {
closeListener(session.sessionId().id()); closeListener(session.sessionId().id());
} }


@Override @Override
public void onClose(Session session) { public void onClose(PrimitiveSession session) {
closeListener(session.sessionId().id()); closeListener(session.sessionId().id());
} }


Expand Down Expand Up @@ -341,10 +341,10 @@ private void recomputeLeastCommonAncestor() {
} }


private static class Listener { private static class Listener {
private final Session session; private final PrimitiveSession session;
private final DocumentPath path; private final DocumentPath path;


public Listener(Session session, DocumentPath path) { public Listener(PrimitiveSession session, DocumentPath path) {
this.session = session; this.session = session;
this.path = path; this.path = path;
} }
Expand All @@ -353,7 +353,7 @@ public DocumentPath path() {
return path; return path;
} }


public Session session() { public PrimitiveSession session() {
return session; return session;
} }
} }
Expand Down
Expand Up @@ -25,7 +25,7 @@
import io.atomix.primitive.service.BackupOutput; import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.Commit; import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor; import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session; import io.atomix.primitive.session.PrimitiveSession;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;
Expand All @@ -52,7 +52,7 @@ public class AtomicValueService extends AbstractPrimitiveService {
.build()); .build());


private byte[] value; private byte[] value;
private java.util.Set<Session> listeners = Sets.newHashSet(); private java.util.Set<PrimitiveSession> listeners = Sets.newHashSet();


@Override @Override
protected Serializer serializer() { protected Serializer serializer() {
Expand All @@ -73,7 +73,7 @@ protected void configure(ServiceExecutor executor) {
public void backup(BackupOutput writer) { public void backup(BackupOutput writer) {
writer.writeInt(value.length).writeBytes(value); writer.writeInt(value.length).writeBytes(value);
java.util.Set<Long> sessionIds = new HashSet<>(); java.util.Set<Long> sessionIds = new HashSet<>();
for (Session session : listeners) { for (PrimitiveSession session : listeners) {
sessionIds.add(session.sessionId().id()); sessionIds.add(session.sessionId().id());
} }
writer.writeObject(sessionIds); writer.writeObject(sessionIds);
Expand Down

0 comments on commit eecb343

Please sign in to comment.