Skip to content

Commit

Permalink
Handle expiring and ephemeral values in AsyncReference.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 16, 2015
1 parent 0f6468b commit 8ed7c51
Showing 1 changed file with 51 additions and 13 deletions.
64 changes: 51 additions & 13 deletions atomic/src/main/java/net/kuujo/copycat/atomic/AsyncReference.java
Expand Up @@ -24,6 +24,7 @@
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.log.Compaction;

import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -721,24 +722,54 @@ public Builder<T> withValue(Object value) {
* Async reference state machine.
*/
public static class StateMachine extends net.kuujo.copycat.raft.StateMachine {
private final java.util.Set<Long> sessions = new HashSet<>();
private final AtomicReference<Object> value = new AtomicReference<>();
private Commit<? extends ReferenceCommand> command;
private long version;
private long timestamp;
private long time;

/**
* Updates the state machine timestamp.
*/
private void updateTimestamp(Commit commit) {
this.timestamp = commit.timestamp();
private void updateTime(Commit commit) {
this.time = Math.max(time, commit.timestamp());
}

@Override
public void register(Session session) {
sessions.add(session.id());
}

@Override
public void expire(Session session) {
sessions.remove(session.id());
}

@Override
public void close(Session session) {
sessions.remove(session.id());
}

/**
* Returns a boolean value indicating whether the given commit is active.
*/
private boolean isActive(Commit<? extends ReferenceCommand> commit) {
if (commit == null) {
return false;
} else if (commit.operation().mode() == Mode.EPHEMERAL && !sessions.contains(commit.session().id())) {
return false;
} else if (commit.operation().ttl() != 0 && commit.operation().ttl() < time - commit.timestamp()) {
return false;
}
return true;
}

/**
* Handles a get commit.
*/
@Apply(Get.class)
protected Object get(Commit<Get> commit) {
updateTimestamp(commit);
updateTime(commit);
return value.get();
}

Expand All @@ -747,7 +778,7 @@ protected Object get(Commit<Get> commit) {
*/
@Apply(Set.class)
protected void set(Commit<Set> commit) {
updateTimestamp(commit);
updateTime(commit);
value.set(commit.operation().value());
command = commit;
version = commit.index();
Expand All @@ -758,8 +789,8 @@ protected void set(Commit<Set> commit) {
*/
@Apply(CompareAndSet.class)
protected boolean compareAndSet(Commit<CompareAndSet> commit) {
updateTimestamp(commit);
if (command != null && (command.operation().ttl() == 0 || command.timestamp() + command.operation().ttl() < timestamp)) {
updateTime(commit);
if (isActive(command)) {
if (value.compareAndSet(commit.operation().expect(), commit.operation().update())) {
command = commit;
return true;
Expand All @@ -780,19 +811,26 @@ protected boolean compareAndSet(Commit<CompareAndSet> commit) {
*/
@Apply(GetAndSet.class)
protected Object getAndSet(Commit<GetAndSet> commit) {
updateTimestamp(commit);
Object result = value.getAndSet(commit.operation().value());
command = commit;
version = commit.index();
return result;
updateTime(commit);
if (isActive(command)) {
Object result = value.getAndSet(commit.operation().value());
command = commit;
version = commit.index();
return result;
} else {
value.set(commit.operation().value());
command = commit;
version = commit.index();
return null;
}
}

/**
* Filters all entries.
*/
@Filter(Filter.All.class)
protected boolean filterAll(Commit<? extends ReferenceCommand<?>> commit, Compaction compaction) {
return commit.index() >= version && (commit.operation().ttl() == 0 || commit.operation().ttl() + commit.timestamp() > timestamp);
return commit.index() >= version && isActive(commit);
}
}

Expand Down

0 comments on commit 8ed7c51

Please sign in to comment.