Skip to content

Commit

Permalink
Update collection state machines to handle cleaning commits.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 5, 2015
1 parent 24d426c commit 8e7b9f1
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 81 deletions.
Expand Up @@ -32,16 +32,8 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class MapState extends StateMachine { public class MapState extends StateMachine {
private final Map<Object, Commit<? extends MapCommands.TtlCommand>> map = new HashMap<>(); private Map<Object, Commit<? extends MapCommands.TtlCommand>> map;
private final Set<Long> sessions = new HashSet<>(); private final Set<Long> sessions = new HashSet<>();
private long time;

/**
* Updates the wall clock time.
*/
private void updateTime(Commit<?> commit) {
time = Math.max(time, commit.timestamp());
}


@Override @Override
public void register(Session session) { public void register(Session session) {
Expand All @@ -61,7 +53,7 @@ public void close(Session session) {
/** /**
* Returns a boolean value indicating whether the given commit is active. * Returns a boolean value indicating whether the given commit is active.
*/ */
private boolean isActive(Commit<? extends MapCommands.TtlCommand> commit) { private boolean isActive(Commit<? extends MapCommands.TtlCommand> commit, long time) {
if (commit == null) { if (commit == null) {
return false; return false;
} else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) { } else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) {
Expand All @@ -77,91 +69,155 @@ private boolean isActive(Commit<? extends MapCommands.TtlCommand> commit) {
*/ */
@Apply(MapCommands.ContainsKey.class) @Apply(MapCommands.ContainsKey.class)
protected boolean containsKey(Commit<MapCommands.ContainsKey> commit) { protected boolean containsKey(Commit<MapCommands.ContainsKey> commit) {
updateTime(commit); try {
Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key()); if (map == null) {
if (!isActive(command)) { return false;
map.remove(commit.operation().key()); }
return false;
Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key());
if (!isActive(command, getTime())) {
map.remove(commit.operation().key());
return false;
}
return true;
} finally {
commit.close();
} }
return true;
} }


/** /**
* Handles a get commit. * Handles a get commit.
*/ */
@Apply(MapCommands.Get.class) @Apply(MapCommands.Get.class)
protected Object get(Commit<MapCommands.Get> commit) { protected Object get(Commit<MapCommands.Get> commit) {
updateTime(commit); if (map == null) {
Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key()); return null;
if (command != null) { }
if (!isActive(command)) {
map.remove(commit.operation().key()); try {
} else { Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key());
return command.operation().value(); if (command != null) {
if (!isActive(command, getTime())) {
map.remove(commit.operation().key());
} else {
return command.operation().value();
}
} }
return null;
} finally {
commit.close();
} }
return null;
} }


/** /**
* Handles a get or default commit. * Handles a get or default commit.
*/ */
@Apply(MapCommands.GetOrDefault.class) @Apply(MapCommands.GetOrDefault.class)
protected Object getOrDefault(Commit<MapCommands.GetOrDefault> commit) { protected Object getOrDefault(Commit<MapCommands.GetOrDefault> commit) {
updateTime(commit); if (map == null) {
Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key());
if (command == null) {
return commit.operation().defaultValue(); return commit.operation().defaultValue();
} else if (!isActive(command)) {
map.remove(commit.operation().key());
} else {
return command.operation().value();
} }
return commit.operation().defaultValue();
try {
Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) {
return commit.operation().defaultValue();
} else if (isActive(previous, getTime())) {
return previous.operation().value();
}
return commit.operation().defaultValue();
} finally {
commit.close();
}
} }


/** /**
* Handles a put commit. * Handles a put commit.
*/ */
@Apply(MapCommands.Put.class) @Apply(MapCommands.Put.class)
protected Object put(Commit<MapCommands.Put> commit) { protected Object put(Commit<MapCommands.Put> commit) {
updateTime(commit); if (map == null) {
Commit<? extends MapCommands.TtlCommand> command = map.put(commit.operation().key(), commit); map = new HashMap<>();
return isActive(command) ? command.operation().value : null; }

Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) {
if (!isActive(commit, getTime())) {
commit.clean();
} else {
map.put(commit.operation().key(), commit);
}
return null;
} else {
map.put(commit.operation().key(), commit);
previous.clean();
return isActive(previous, commit.timestamp()) ? previous.operation().value() : null;
}
} }


/** /**
* Handles a put if absent commit. * Handles a put if absent commit.
*/ */
@Apply(MapCommands.PutIfAbsent.class) @Apply(MapCommands.PutIfAbsent.class)
protected Object putIfAbsent(Commit<MapCommands.PutIfAbsent> commit) { protected Object putIfAbsent(Commit<MapCommands.PutIfAbsent> commit) {
updateTime(commit); if (map == null) {
Commit<? extends MapCommands.TtlCommand> command = map.putIfAbsent(commit.operation().key(), commit); map = new HashMap<>();
return isActive(command) ? command.operation().value : null; }

Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) {
if (!isActive(commit, getTime())) {
commit.clean();
} else {
map.put(commit.operation().key(), commit);
}
return null;
} else {
if (!isActive(previous, commit.timestamp())) {
map.put(commit.operation().key(), commit);
previous.clean();
return null;
} else {
return previous.operation().value();
}
}
} }


/** /**
* Handles a remove commit. * Handles a remove commit.
*/ */
@Apply(MapCommands.Remove.class) @Apply(MapCommands.Remove.class)
protected Object remove(Commit<MapCommands.Remove> commit) { protected Object remove(Commit<MapCommands.Remove> commit) {
updateTime(commit); if (map == null) {
if (commit.operation().value() != null) { commit.clean();
Commit<? extends MapCommands.TtlCommand> command = map.get(commit.operation().key()); return null;
if (!isActive(command)) { } else if (commit.operation().value() != null) {
Commit<? extends MapCommands.TtlCommand> previous = map.get(commit.operation().key());
if (previous == null) {
commit.clean();
return true;
} else if (!isActive(previous, commit.timestamp())) {
map.remove(commit.operation().key()); map.remove(commit.operation().key());
previous.clean();
} else { } else {
Object value = command.operation().value(); Object value = previous.operation().value();
if ((value == null && commit.operation().value() == null) || (value != null && commit.operation().value() != null && value.equals(commit.operation().value()))) { if ((value == null && commit.operation().value() == null) || (value != null && commit.operation().value() != null && value.equals(commit.operation().value()))) {
map.remove(commit.operation().key()); map.remove(commit.operation().key());
previous.clean();
return true; return true;
} }
return false; return false;
} }
return false; return false;
} else { } else {
Commit<? extends MapCommands.TtlCommand> command = map.remove(commit.operation().key()); Commit<? extends MapCommands.TtlCommand> previous = map.remove(commit.operation().key());
return isActive(command) ? command.operation().value() : null; if (previous == null) {
commit.clean();
return true;
} else {
previous.clean();
return isActive(previous, commit.timestamp()) ? previous.operation().value() : null;
}
} }
} }


Expand All @@ -170,26 +226,35 @@ protected Object remove(Commit<MapCommands.Remove> commit) {
*/ */
@Apply(MapCommands.Size.class) @Apply(MapCommands.Size.class)
protected int size(Commit<MapCommands.Size> commit) { protected int size(Commit<MapCommands.Size> commit) {
updateTime(commit); try {
return map.size(); return map != null ? map.size() : 0;
} finally {
commit.close();
}
} }


/** /**
* Handles an is empty commit. * Handles an is empty commit.
*/ */
@Apply(MapCommands.IsEmpty.class) @Apply(MapCommands.IsEmpty.class)
protected boolean isEmpty(Commit<MapCommands.IsEmpty> commit) { protected boolean isEmpty(Commit<MapCommands.IsEmpty> commit) {
updateTime(commit); try {
return map.isEmpty(); return map == null || map.isEmpty();
} finally {
commit.close();
}
} }


/** /**
* Handles a clear commit. * Handles a clear commit.
*/ */
@Apply(MapCommands.Clear.class) @Apply(MapCommands.Clear.class)
protected void clear(Commit<MapCommands.Clear> commit) { protected void clear(Commit<MapCommands.Clear> commit) {
updateTime(commit); if (map == null) {
map.clear(); commit.clean();
} else {
map.clear();
}
} }


} }

0 comments on commit 8e7b9f1

Please sign in to comment.