Skip to content

Commit

Permalink
Rename internal PrimitiveService methods to adhere to naming conventi…
Browse files Browse the repository at this point in the history
…ons.
  • Loading branch information
kuujo committed Jan 11, 2018
1 parent 76c9238 commit 6f88c80
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 58 deletions.
Expand Up @@ -86,7 +86,7 @@ public void backup(BufferOutput<?> writer) {
writer.writeLong(termStartTime);
writer.writeObject(registrations, SERIALIZER::encode);
writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
logger().debug("Took state machine snapshot");
getLogger().debug("Took state machine snapshot");
}

@Override
Expand All @@ -98,9 +98,9 @@ public void restore(BufferInput<?> reader) {
registrations = reader.readObject(SERIALIZER::decode);
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
listeners.put(sessionId, sessions().getSession(sessionId));
listeners.put(sessionId, getSessions().getSession(sessionId));
}
logger().debug("Reinstated state machine from snapshot");
getLogger().debug("Reinstated state machine from snapshot");
}

@Override
Expand Down Expand Up @@ -165,7 +165,7 @@ protected Leadership<byte[]> run(Commit<? extends Run> commit) {
}
return newLeadership;
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -182,7 +182,7 @@ protected void withdraw(Commit<? extends Withdraw> commit) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -204,15 +204,15 @@ protected boolean anoint(Commit<? extends Anoint> commit) {
if (newLeader != null) {
this.leader = newLeader;
this.term = termCounter.incrementAndGet();
this.termStartTime = context().wallClock().getTime().unixTimestamp();
this.termStartTime = getContext().wallClock().getTime().unixTimestamp();
}
Leadership<byte[]> newLeadership = leadership();
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
return leader != null && Arrays.equals(commit.value().id(), leader.id());
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand Down Expand Up @@ -252,7 +252,7 @@ protected boolean promote(Commit<? extends Promote> commit) {
}
return true;
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -278,7 +278,7 @@ protected void evict(Commit<? extends Evict> commit) {
this.registrations = updatedRegistrations;
this.leader = updatedRegistrations.get(0);
this.term = termCounter.incrementAndGet();
this.termStartTime = context().wallClock().getTime().unixTimestamp();
this.termStartTime = getContext().wallClock().getTime().unixTimestamp();
} else {
this.registrations = updatedRegistrations;
this.leader = null;
Expand All @@ -292,7 +292,7 @@ protected void evict(Commit<? extends Evict> commit) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -306,7 +306,7 @@ protected Leadership<byte[]> getLeadership() {
try {
return leadership();
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand Down Expand Up @@ -364,7 +364,7 @@ protected void cleanup(byte[] id) {
this.registrations = updatedRegistrations;
this.leader = updatedRegistrations.get(0);
this.term = termCounter.incrementAndGet();
this.termStartTime = context().wallClock().getTime().unixTimestamp();
this.termStartTime = getContext().wallClock().getTime().unixTimestamp();
} else {
this.registrations = updatedRegistrations;
this.leader = null;
Expand All @@ -388,7 +388,7 @@ protected void cleanup(Session session) {
this.registrations = updatedRegistrations;
this.leader = updatedRegistrations.get(0);
this.term = termCounter.incrementAndGet();
this.termStartTime = context().wallClock().getTime().unixTimestamp();
this.termStartTime = getContext().wallClock().getTime().unixTimestamp();
} else {
this.registrations = updatedRegistrations;
this.leader = null;
Expand Down Expand Up @@ -421,7 +421,7 @@ protected void addRegistration(Registration registration) {
if (newLeader) {
this.leader = registration;
this.term = termCounter.incrementAndGet();
this.termStartTime = context().wallClock().getTime().unixTimestamp();
this.termStartTime = getContext().wallClock().getTime().unixTimestamp();
}
}
}
Expand Down
Expand Up @@ -91,19 +91,19 @@ public void backup(BufferOutput<?> writer) {
writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
writer.writeObject(termCounters, SERIALIZER::encode);
writer.writeObject(elections, SERIALIZER::encode);
logger().debug("Took state machine snapshot");
getLogger().debug("Took state machine snapshot");
}

@Override
public void restore(BufferInput<?> reader) {
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
listeners.put(sessionId, sessions().getSession(sessionId));
listeners.put(sessionId, getSessions().getSession(sessionId));
}
termCounters = reader.readObject(SERIALIZER::decode);
elections = reader.readObject(SERIALIZER::decode);
elections.values().forEach(e -> e.elections = elections);
logger().debug("Reinstated state machine from snapshot");
getLogger().debug("Reinstated state machine from snapshot");
}

@Override
Expand Down Expand Up @@ -182,7 +182,7 @@ public Leadership run(Commit<? extends Run> commit) {
}
return newLeadership;
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -203,7 +203,7 @@ public void withdraw(Commit<? extends Withdraw> commit) {
notifyLeadershipChange(topic, oldLeadership, newLeadership);
}
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -229,7 +229,7 @@ public boolean anoint(Commit<? extends Anoint> commit) {
electionState.leader() != null &&
Arrays.equals(commit.value().id(), electionState.leader().id()));
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -255,7 +255,7 @@ public boolean promote(Commit<? extends Promote> commit) {
}
return true;
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -280,7 +280,7 @@ public void evict(Commit<? extends Evict> commit) {
});
notifyLeadershipChanges(changes);
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -296,7 +296,7 @@ public Leadership getLeadership(Commit<? extends GetLeadership> commit) {
try {
return leadership(topic);
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -315,7 +315,7 @@ public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
return leader != null && Arrays.equals(leader.id(), id);
}).keySet());
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -332,7 +332,7 @@ public Map<String, Leadership> allLeaderships(Commit<Void> commit) {
result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
return result;
} catch (Exception e) {
logger().error("State machine operation failed", e);
getLogger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
Expand Down
Expand Up @@ -73,10 +73,10 @@ public void restore(BufferInput<?> reader) {
timers.clear();
for (LockHolder holder : queue) {
if (holder.expire > 0) {
timers.put(holder.index, scheduler().schedule(Duration.ofMillis(holder.expire - context().wallClock().getTime().unixTimestamp()), () -> {
timers.put(holder.index, getScheduler().schedule(Duration.ofMillis(holder.expire - getContext().wallClock().getTime().unixTimestamp()), () -> {
timers.remove(holder.index);
queue.remove(holder);
Session session = sessions().getSession(holder.session);
Session session = getSessions().getSession(holder.session);
if (session != null && session.getState().active()) {
session.publish(DistributedLockEvents.FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
}
Expand Down Expand Up @@ -113,9 +113,9 @@ protected void lock(Commit<Lock> commit) {
commit.value().id(),
commit.index(),
commit.session().sessionId().id(),
context().wallClock().getTime().unixTimestamp() + commit.value().timeout());
getContext().wallClock().getTime().unixTimestamp() + commit.value().timeout());
queue.add(holder);
timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
timers.put(commit.index(), getScheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
timers.remove(commit.index());
queue.remove(holder);
if (commit.session().getState().active()) {
Expand Down Expand Up @@ -148,7 +148,7 @@ protected void unlock(Commit<Unlock> commit) {
timer.cancel();
}

Session session = sessions().getSession(lock.session);
Session session = getSessions().getSession(lock.session);
if (session == null || session.getState() == Session.State.EXPIRED || session.getState() == Session.State.CLOSED) {
lock = queue.poll();
} else {
Expand All @@ -171,7 +171,7 @@ private void releaseSession(Session session) {
timer.cancel();
}

Session lockSession = sessions().getSession(lock.session);
Session lockSession = getSessions().getSession(lock.session);
if (lockSession == null || lockSession.getState() == Session.State.EXPIRED || lockSession.getState() == Session.State.CLOSED) {
lock = queue.poll();
} else {
Expand Down
Expand Up @@ -147,15 +147,15 @@ public void backup(BufferOutput<?> writer) {
public void restore(BufferInput<?> reader) {
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
listeners.put(sessionId, sessions().getSession(sessionId));
listeners.put(sessionId, getSessions().getSession(sessionId));
}
preparedKeys = reader.readObject(serializer()::decode);
map = reader.readObject(serializer()::decode);
activeTransactions = reader.readObject(serializer()::decode);
currentVersion = reader.readLong();
map.forEach((key, value) -> {
if (value.ttl() > 0) {
value.timer = scheduler().schedule(Duration.ofMillis(value.ttl() - (wallClock().getTime().unixTimestamp() - value.created())), () -> {
value.timer = getScheduler().schedule(Duration.ofMillis(value.ttl() - (getWallClock().getTime().unixTimestamp() - value.created())), () -> {
entries().remove(key, value);
publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, toVersioned(value)));
});
Expand Down Expand Up @@ -370,7 +370,7 @@ protected void putValue(String key, MapEntryValue value) {
*/
protected void scheduleTtl(String key, MapEntryValue value) {
if (value.ttl() > 0) {
value.timer = scheduler().schedule(Duration.ofMillis(value.ttl()), () -> {
value.timer = getScheduler().schedule(Duration.ofMillis(value.ttl()), () -> {
entries().remove(key, value);
publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, toVersioned(value)));
});
Expand Down Expand Up @@ -815,7 +815,7 @@ protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
return PrepareResult.OK;
}
} catch (Exception e) {
logger().warn("Failure applying {}", commit, e);
getLogger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
Expand All @@ -837,7 +837,7 @@ protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
this.currentVersion = commit.index();
return commitTransaction(transactionScope);
} catch (Exception e) {
logger().warn("Failure applying {}", commit, e);
getLogger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
} finally {
discardTombstones();
Expand Down
Expand Up @@ -131,7 +131,7 @@ public void restore(BufferInput<?> reader) {

listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
listeners.put(sessionId, sessions().getSession(sessionId));
listeners.put(sessionId, getSessions().getSession(sessionId));
}

backingMap = reader.readObject(serializer::decode);
Expand Down
Expand Up @@ -91,7 +91,7 @@ public void backup(BufferOutput<?> writer) {
public void restore(BufferInput<?> reader) {
registeredWorkers = Maps.newHashMap();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
registeredWorkers.put(sessionId, sessions().getSession(sessionId));
registeredWorkers.put(sessionId, getSessions().getSession(sessionId));
}
assignments = reader.readObject(SERIALIZER::decode);
unassignedTasks = reader.readObject(SERIALIZER::decode);
Expand Down Expand Up @@ -169,7 +169,7 @@ protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
})
.collect(Collectors.toCollection(ArrayList::new));
} catch (Exception e) {
logger().warn("State machine update failed", e);
getLogger().warn("State machine update failed", e);
throw Throwables.propagate(e);
}
}
Expand All @@ -186,7 +186,7 @@ protected void complete(Commit<? extends Complete> commit) {
}
});
} catch (Exception e) {
logger().warn("State machine update failed", e);
getLogger().warn("State machine update failed", e);
throw Throwables.propagate(e);
}
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void write(Kryo kryo, Output output, Listener listener) {

@Override
public Listener read(Kryo kryo, Input input, Class<Listener> type) {
return new Listener(sessions().getSession(input.readLong()),
return new Listener(getSessions().getSession(input.readLong()),
kryo.readObjectOrNull(input, DocumentPath.class));
}
}, Listener.class)
Expand Down Expand Up @@ -247,7 +247,7 @@ protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update>
} catch (NoSuchDocumentPathException e) {
result = DocumentTreeResult.invalidPath();
} catch (Exception e) {
logger().error("Failed to apply {} to state machine", commit.value(), e);
getLogger().error("Failed to apply {} to state machine", commit.value(), e);
throw Throwables.propagate(e);
}
return result;
Expand Down
Expand Up @@ -80,7 +80,7 @@ public void restore(BufferInput<?> reader) {
value = reader.readBytes(reader.readInt());
listeners = new HashSet<>();
for (Long sessionId : reader.<java.util.Set<Long>>readObject(SERIALIZER::decode)) {
listeners.add(sessions().getSession(sessionId));
listeners.add(getSessions().getSession(sessionId));
}
}

Expand Down
Expand Up @@ -72,7 +72,7 @@ public void testSnapshot() throws Exception {

private static class TestConsistentMapService extends ConsistentMapService {
@Override
protected Scheduler scheduler() {
protected Scheduler getScheduler() {
return new Scheduler() {
@Override
public Scheduled schedule(Duration delay, Runnable callback) {
Expand All @@ -87,7 +87,7 @@ public Scheduled schedule(Duration initialDelay, Duration interval, Runnable cal
}

@Override
protected WallClock wallClock() {
protected WallClock getWallClock() {
return new WallClock();
}
}
Expand Down

0 comments on commit 6f88c80

Please sign in to comment.