Skip to content

Commit

Permalink
Add missing distributed map replace methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Oct 4, 2015
1 parent b48c5a7 commit e63f10b
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 23 deletions.
Expand Up @@ -186,14 +186,80 @@ public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl) {
* @param value The value to remove. * @param value The value to remove.
* @return A completable future to be completed with the result once complete. * @return A completable future to be completed with the result once complete.
*/ */
public CompletableFuture<Boolean> remove(Object key, Object value) { public CompletableFuture<Boolean> remove(K key, V value) {
return submit(MapCommands.Remove.builder() return submit(MapCommands.RemoveIfPresent.builder()
.withKey(key) .withKey(key)
.withValue(value) .withValue(value)
.build()) .build())
.thenApply(result -> (boolean) result); .thenApply(result -> (boolean) result);
} }


/**
* Replaces a value in the map.
*
* @param key The key to replace.
* @param value The value with which to replace the key if it exists.
* @return A completable future to be completed with the result once complete.
*/
public CompletableFuture<Object> replace(K key, V value) {
return submit(MapCommands.Replace.builder()
.withKey(key)
.withValue(value)
.build());
}

/**
* Replaces a value in the map.
*
* @param key The key to replace.
* @param value The value with which to replace the key if it exists.
* @param ttl The duration after which to expire the key/value.
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Object> replace(K key, V value, Duration ttl) {
return submit(MapCommands.Replace.builder()
.withKey(key)
.withValue(value)
.withTtl(ttl.toMillis())
.build());
}

/**
* Replaces a value in the map.
*
* @param key The key to replace.
* @param oldValue The value to check.
* @param newValue The value to replace.
* @return A completable future to be completed with the result once complete.
*/
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
return submit(MapCommands.ReplaceIfPresent.builder()
.withKey(key)
.withValue(oldValue)
.withReplace(newValue)
.build());
}

/**
* Replaces a value in the map.
*
* @param key The key to replace.
* @param oldValue The value to check.
* @param newValue The value to replace.
* @param ttl The duration after which to expire the key/value.
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue, Duration ttl) {
return submit(MapCommands.ReplaceIfPresent.builder()
.withKey(key)
.withValue(oldValue)
.withReplace(newValue)
.withTtl(ttl.toMillis())
.build());
}

/** /**
* Removes all entries from the map. * Removes all entries from the map.
* *
Expand Down
136 changes: 134 additions & 2 deletions collections/src/main/java/io/atomix/collections/state/MapCommands.java
Expand Up @@ -465,7 +465,7 @@ public Builder withDefaultValue(Object defaultValue) {
* Remove command. * Remove command.
*/ */
@SerializeWith(id=445) @SerializeWith(id=445)
public static class Remove extends KeyValueCommand<Object> { public static class Remove extends KeyCommand<Object> {


/** /**
* Returns a builder for this command. * Returns a builder for this command.
Expand All @@ -482,7 +482,7 @@ public PersistenceLevel persistence() {
/** /**
* Get command builder. * Get command builder.
*/ */
public static class Builder extends KeyValueCommand.Builder<Builder, Remove, Object> { public static class Builder extends KeyCommand.Builder<Builder, Remove, Object> {
public Builder(BuilderPool<Builder, Remove> pool) { public Builder(BuilderPool<Builder, Remove> pool) {
super(pool); super(pool);
} }
Expand All @@ -494,6 +494,138 @@ protected Remove create() {
} }
} }


/**
* Remove if absent command.
*/
@SerializeWith(id=449)
public static class RemoveIfPresent extends KeyValueCommand<Boolean> {

/**
* Returns a builder for this command.
*/
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

@Override
public PersistenceLevel persistence() {
return PersistenceLevel.PERSISTENT;
}

/**
* Remove if absent command builder.
*/
public static class Builder extends KeyValueCommand.Builder<Builder, RemoveIfPresent, Boolean> {
public Builder(BuilderPool<Builder, RemoveIfPresent> pool) {
super(pool);
}

@Override
protected RemoveIfPresent create() {
return new RemoveIfPresent();
}
}
}

/**
* Remove command.
*/
@SerializeWith(id=450)
public static class Replace extends TtlCommand<Object> {

/**
* Returns a builder for this command.
*/
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

@Override
public PersistenceLevel persistence() {
return PersistenceLevel.PERSISTENT;
}

/**
* Get command builder.
*/
public static class Builder extends TtlCommand.Builder<Builder, Replace, Object> {
public Builder(BuilderPool<Builder, Replace> pool) {
super(pool);
}

@Override
protected Replace create() {
return new Replace();
}
}
}

/**
* Remove if absent command.
*/
@SerializeWith(id=451)
public static class ReplaceIfPresent extends TtlCommand<Boolean> {
private Object replace;

/**
* Returns a builder for this command.
*/
public static Builder builder() {
return Operation.builder(Builder.class, Builder::new);
}

@Override
public PersistenceLevel persistence() {
return PersistenceLevel.PERSISTENT;
}

/**
* Returns the replace value.
*
* @return The replace value.
*/
public Object replace() {
return replace;
}

@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(replace, buffer);
}

@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
replace = serializer.readObject(buffer);
}

/**
* Get command builder.
*/
public static class Builder extends TtlCommand.Builder<Builder, ReplaceIfPresent, Boolean> {
public Builder(BuilderPool<Builder, ReplaceIfPresent> pool) {
super(pool);
}

/**
* Sets the map replace value.
*
* @param replace The map replace value.
* @return The builder.
*/
public Builder withReplace(Object replace) {
command.replace = replace;
return this;
}

@Override
protected ReplaceIfPresent create() {
return new ReplaceIfPresent();
}
}
}

/** /**
* Is empty query. * Is empty query.
*/ */
Expand Down
92 changes: 73 additions & 19 deletions collections/src/main/java/io/atomix/collections/state/MapState.java
Expand Up @@ -41,6 +41,9 @@ protected void configure(StateMachineExecutor executor) {
executor.register(MapCommands.Put.class, this::put); executor.register(MapCommands.Put.class, this::put);
executor.register(MapCommands.PutIfAbsent.class, this::putIfAbsent); executor.register(MapCommands.PutIfAbsent.class, this::putIfAbsent);
executor.register(MapCommands.Remove.class, this::remove); executor.register(MapCommands.Remove.class, this::remove);
executor.register(MapCommands.RemoveIfPresent.class, this::removeIfPresent);
executor.register(MapCommands.Replace.class, this::replace);
executor.register(MapCommands.ReplaceIfPresent.class, this::replaceIfPresent);
executor.register(MapCommands.Size.class, this::size); executor.register(MapCommands.Size.class, this::size);
executor.register(MapCommands.IsEmpty.class, this::isEmpty); executor.register(MapCommands.IsEmpty.class, this::isEmpty);
executor.register(MapCommands.Clear.class, this::clear); executor.register(MapCommands.Clear.class, this::clear);
Expand Down Expand Up @@ -134,41 +137,92 @@ protected Object putIfAbsent(Commit<MapCommands.PutIfAbsent> commit) {
* Handles a remove commit. * Handles a remove commit.
*/ */
protected Object remove(Commit<MapCommands.Remove> commit) { protected Object remove(Commit<MapCommands.Remove> commit) {
if (commit.operation().value() != null) { try {
Value value = map.get(commit.operation().key()); Value value = map.remove(commit.operation().key());
if (value == null || !value.equals(commit.operation().value())) { if (value != null) {
commit.clean(false);
return null;
} else {
try { try {
map.remove(commit.operation().key());
if (value.timer != null) if (value.timer != null)
value.timer.cancel(); value.timer.cancel();
return value.commit.operation().value(); return value.commit.operation().value();
} finally { } finally {
value.commit.clean(); value.commit.clean();
commit.clean();
} }
} }
return null;
} finally {
commit.clean();
}
}

/**
* Handles a remove if present commit.
*/
protected boolean removeIfPresent(Commit<MapCommands.RemoveIfPresent> commit) {
Value value = map.get(commit.operation().key());
if (value == null || ((value.commit.operation().value() == null && commit.operation().value() != null)
|| (value.commit.operation().value() != null && !value.commit.operation().value().equals(commit.operation().value())))) {
commit.clean(false);
return false;
} else { } else {
try { try {
Value value = map.remove(commit.operation().key()); map.remove(commit.operation().key());
if (value != null) { if (value.timer != null)
try { value.timer.cancel();
if (value.timer != null) return true;
value.timer.cancel();
return value.commit.operation().value();
} finally {
value.commit.clean();
}
}
return null;
} finally { } finally {
value.commit.clean();
commit.clean(); commit.clean();
} }
} }
} }


/**
* Handles a replace commit.
*/
protected Object replace(Commit<MapCommands.Replace> commit) {
Value value = map.get(commit.operation().key());
if (value != null) {
try {
if (value.timer != null)
value.timer.cancel();
Scheduled timer = commit.operation().ttl() > 0 ? executor().schedule(Duration.ofMillis(commit.operation().ttl()), () -> {
map.remove(commit.operation().key()).commit.clean();
}) : null;
map.put(commit.operation().key(), new Value(commit, timer));
return value.commit.operation().value();
} finally {
value.commit.clean();
}
}
return null;
}

/**
* Handles a replace if present commit.
*/
protected boolean replaceIfPresent(Commit<MapCommands.ReplaceIfPresent> commit) {
Value value = map.get(commit.operation().key());
if (value == null) {
commit.clean();
return false;
}

if ((value.commit.operation().value() == null && commit.operation().value() == null)
|| (value.commit.operation().value() != null && value.commit.operation().value().equals(commit.operation().value()))) {
if (value.timer != null)
value.timer.cancel();
Scheduled timer = commit.operation().ttl() > 0 ? executor().schedule(Duration.ofMillis(commit.operation().ttl()), () -> {
map.remove(commit.operation().key()).commit.clean();
}) : null;
map.put(commit.operation().key(), new Value(commit, timer));
value.commit.clean();
return true;
} else {
commit.clean();
}
return false;
}

/** /**
* Handles a count commit. * Handles a count commit.
*/ */
Expand Down

0 comments on commit e63f10b

Please sign in to comment.