Skip to content

Commit

Permalink
Improve time/scheduling in state machine executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 2, 2015
1 parent e3e9293 commit 60cf261
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 236 deletions.
Expand Up @@ -16,10 +16,10 @@
package net.kuujo.copycat.atomic.state; package net.kuujo.copycat.atomic.state;


import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine; import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.StateMachineExecutor; import net.kuujo.copycat.raft.StateMachineExecutor;
import net.kuujo.copycat.raft.session.Session;


import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -132,7 +132,7 @@ protected Object get(Commit<AtomicValueCommands.Get> commit) {
* Applies a set commit. * Applies a set commit.
*/ */
protected void set(Commit<AtomicValueCommands.Set> commit) { protected void set(Commit<AtomicValueCommands.Set> commit) {
if (!isActive(commit, context().time().instant())) { if (!isActive(commit, now())) {
commit.clean(); commit.clean();
} else { } else {
if (current != null) { if (current != null) {
Expand All @@ -148,7 +148,7 @@ protected void set(Commit<AtomicValueCommands.Set> commit) {
* Handles a compare and set commit. * Handles a compare and set commit.
*/ */
protected boolean compareAndSet(Commit<AtomicValueCommands.CompareAndSet> commit) { protected boolean compareAndSet(Commit<AtomicValueCommands.CompareAndSet> commit) {
if (!isActive(commit, context().time().instant())) { if (!isActive(commit, now())) {
commit.clean(); commit.clean();
return false; return false;
} else if (isActive(current, commit.time())) { } else if (isActive(current, commit.time())) {
Expand Down Expand Up @@ -178,9 +178,8 @@ protected boolean compareAndSet(Commit<AtomicValueCommands.CompareAndSet> commit
* Handles a get and set commit. * Handles a get and set commit.
*/ */
protected Object getAndSet(Commit<AtomicValueCommands.GetAndSet> commit) { protected Object getAndSet(Commit<AtomicValueCommands.GetAndSet> commit) {
if (!isActive(commit, context().time().instant())) { if (!isActive(commit, now())) {
commit.clean(); commit.clean();

} }


if (isActive(current, commit.time())) { if (isActive(current, commit.time())) {
Expand Down
5 changes: 0 additions & 5 deletions collections/pom.xml
Expand Up @@ -32,11 +32,6 @@
<artifactId>copycat</artifactId> <artifactId>copycat</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>net.kuujo.copycat</groupId> <groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-test</artifactId> <artifactId>copycat-test</artifactId>
Expand Down
Expand Up @@ -16,28 +16,32 @@
package net.kuujo.copycat.collections.state; package net.kuujo.copycat.collections.state;


import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine; import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.StateMachineExecutor; import net.kuujo.copycat.raft.StateMachineExecutor;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.util.Listener;
import net.kuujo.copycat.util.concurrent.Scheduled;


import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;


/** /**
* Map state machine. * Map state machine.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class MapState extends StateMachine { public class MapState extends StateMachine {
private StateMachineExecutor executor;
private Map<Object, Commit<? extends MapCommands.TtlCommand>> map; private Map<Object, Commit<? extends MapCommands.TtlCommand>> map;
private final Set<Long> sessions = new HashSet<>(); private final Map<Object, Scheduled> timers = new HashMap<>();
private final Map<Object, Listener<Session>> listeners = new HashMap<>();


@Override @Override
public void configure(StateMachineExecutor executor) { public void configure(StateMachineExecutor executor) {
this.executor = executor;
executor.register(MapCommands.ContainsKey.class, this::containsKey); executor.register(MapCommands.ContainsKey.class, this::containsKey);
executor.register(MapCommands.Get.class, this::get); executor.register(MapCommands.Get.class, this::get);
executor.register(MapCommands.GetOrDefault.class, this::getOrDefault); executor.register(MapCommands.GetOrDefault.class, this::getOrDefault);
Expand All @@ -49,28 +53,13 @@ public void configure(StateMachineExecutor executor) {
executor.register(MapCommands.Clear.class, this::clear); executor.register(MapCommands.Clear.class, this::clear);
} }


@Override
public void register(Session session) {
sessions.add(session.id());
}

@Override
public void expire(Session session) {
sessions.remove(session.id());
}

@Override
public void close(Session session) {
sessions.remove(session.id());
}

/** /**
* Returns a boolean value indicating whether the given commit is active. * Returns a boolean value indicating whether the given commit is active.
*/ */
private boolean isActive(Commit<? extends MapCommands.TtlCommand> commit, Instant instant) { private boolean isActive(Commit<? extends MapCommands.TtlCommand> commit, Instant instant) {
if (commit == null) { if (commit == null) {
return false; return false;
} else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) { } else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && sessions().session(commit.session().id()) == null) {
return false; return false;
} else if (commit.operation().ttl() != 0 && commit.operation().ttl() < instant.toEpochMilli() - commit.time().toEpochMilli()) { } else if (commit.operation().ttl() != 0 && commit.operation().ttl() < instant.toEpochMilli() - commit.time().toEpochMilli()) {
return false; return false;
Expand All @@ -88,11 +77,7 @@ protected boolean containsKey(Commit<MapCommands.ContainsKey> commit) {
} }


Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key());
if (!isActive(command, context().time().instant())) { return isActive(command, commit.time());
map.remove(commit.operation().key());
return false;
}
return true;
} finally { } finally {
commit.close(); commit.close();
} }
Expand All @@ -108,14 +93,7 @@ protected Object get(Commit<MapCommands.Get> commit) {


try { try {
Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key());
if (command != null) { return isActive(command, commit.time()) ? command.operation().value() : null;
if (!isActive(command, context().time().instant())) {
map.remove(commit.operation().key());
} else {
return command.operation().value();
}
}
return null;
} finally { } finally {
commit.close(); commit.close();
} }
Expand All @@ -131,12 +109,7 @@ protected Object getOrDefault(Commit<MapCommands.GetOrDefault> commit) {


try { try {
Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) { return isActive(previous, commit.time()) ? previous.operation().value() : commit.operation().defaultValue();
return commit.operation().defaultValue();
} else if (isActive(previous, context().time().instant())) {
return previous.operation().value();
}
return commit.operation().defaultValue();
} finally { } finally {
commit.close(); commit.close();
} }
Expand All @@ -152,17 +125,51 @@ protected Object put(Commit<MapCommands.Put> commit) {


Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) { if (previous == null) {
if (!isActive(commit, context().time().instant())) { if (commit.operation().ttl() > 0) {
commit.clean(); timers.put(commit.operation().key(), executor.schedule(() -> {
} else { map.remove(commit.operation().key());
map.put(commit.operation().key(), commit); Listener<Session> listener = listeners.remove(commit.operation().key());
if (listener != null)
listener.close();
commit.clean();
}, Duration.ofMillis(commit.operation().ttl())));
}

if (commit.operation().mode() == PersistenceMode.EPHEMERAL) {
listeners.put(commit.operation().key(), commit.session().onClose(s -> {
map.remove(commit.operation().key());
Scheduled scheduled = timers.remove(commit.operation().key());
if (scheduled != null)
scheduled.cancel();
commit.clean();
}));
} }
return null;
} else { } else {
map.put(commit.operation().key(), commit);
previous.clean(); previous.clean();
return isActive(previous, commit.time()) ? previous.operation().value() : null;
if (commit.operation().ttl() > 0) {
timers.put(commit.operation().key(), executor.schedule(() -> {
map.remove(commit.operation().key());
Listener<Session> listener = listeners.remove(commit.operation().key());
if (listener != null)
listener.close();
commit.close();
}, Duration.ofMillis(commit.operation().ttl())));
}

if (commit.operation().mode() == PersistenceMode.EPHEMERAL) {
listeners.put(commit.operation().key(), commit.session().onClose(s -> {
map.remove(commit.operation().key());
Scheduled scheduled = timers.remove(commit.operation().key());
if (scheduled != null)
scheduled.cancel();
commit.close();
}));
}
} }

map.put(commit.operation().key(), commit);
return isActive(previous, commit.time()) ? previous.operation().value() : null;
} }


/** /**
Expand All @@ -174,22 +181,33 @@ protected Object putIfAbsent(Commit<MapCommands.PutIfAbsent> commit) {
} }


Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) { if (!isActive(previous, commit.time())) {
if (!isActive(commit, context().time().instant())) { if (previous == null) {
commit.clean(); if (commit.operation().ttl() > 0) {
} else { timers.put(commit.operation().key(), executor.schedule(() -> {
map.put(commit.operation().key(), commit); map.remove(commit.operation().key());
Listener<Session> listener = listeners.remove(commit.operation().key());
if (listener != null)
listener.close();
commit.clean();
}, Duration.ofMillis(commit.operation().ttl())));
}

if (commit.operation().mode() == PersistenceMode.EPHEMERAL) {
listeners.put(commit.operation().key(), commit.session().onClose(s -> {
map.remove(commit.operation().key());
Scheduled scheduled = timers.remove(commit.operation().key());
if (scheduled != null)
scheduled.cancel();
commit.clean();
}));
}
} }

map.put(commit.operation().key(), commit);
return null; return null;
} else {
if (!isActive(previous, commit.time())) {
map.put(commit.operation().key(), commit);
previous.clean();
return null;
} else {
return previous.operation().value();
}
} }
return previous.operation().value();
} }


/** /**
Expand All @@ -207,13 +225,16 @@ protected Object remove(Commit<MapCommands.Remove> commit) {
} else if (!isActive(previous, commit.time())) { } else if (!isActive(previous, commit.time())) {
map.remove(commit.operation().key()); map.remove(commit.operation().key());
previous.clean(); previous.clean();
commit.close();
} else { } else {
Object value = previous.operation().value(); Object value = previous.operation().value();
if ((value == null && commit.operation().value() == null) || (value != null && commit.operation().value() != null && value.equals(commit.operation().value()))) { if ((value == null && commit.operation().value() == null) || (value != null && commit.operation().value() != null && value.equals(commit.operation().value()))) {
map.remove(commit.operation().key()); map.remove(commit.operation().key());
previous.clean(); previous.clean();
commit.close();
return true; return true;
} }
commit.clean();
return false; return false;
} }
return false; return false;
Expand All @@ -224,6 +245,7 @@ protected Object remove(Commit<MapCommands.Remove> commit) {
return true; return true;
} else { } else {
previous.clean(); previous.clean();
commit.close();
return isActive(previous, commit.time()) ? previous.operation().value() : null; return isActive(previous, commit.time()) ? previous.operation().value() : null;
} }
} }
Expand Down
Expand Up @@ -16,10 +16,10 @@
package net.kuujo.copycat.collections.state; package net.kuujo.copycat.collections.state;


import net.kuujo.copycat.PersistenceMode; import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit; import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine; import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.StateMachineExecutor; import net.kuujo.copycat.raft.StateMachineExecutor;
import net.kuujo.copycat.raft.session.Session;


import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -104,7 +104,7 @@ protected boolean add(Commit<SetCommands.Add> commit) {
} }


Commit<? extends SetCommands.TtlCommand> previous = map.get(commit.operation().value()); Commit<? extends SetCommands.TtlCommand> previous = map.get(commit.operation().value());
if (!isActive(commit, context().time().instant())) { if (!isActive(commit, now())) {
commit.clean(); commit.clean();
return false; return false;
} else if (!isActive(previous, commit.time())) { } else if (!isActive(previous, commit.time())) {
Expand Down

0 comments on commit 60cf261

Please sign in to comment.