Skip to content

Commit

Permalink
Update state machines for Catalog updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 23, 2015
1 parent d68f554 commit 59e9c1c
Show file tree
Hide file tree
Showing 26 changed files with 693 additions and 674 deletions.
Expand Up @@ -15,12 +15,13 @@
*/
package io.atomix.copycat.atomic;

import io.atomix.copycat.atomic.state.AtomicValueCommands;
import io.atomix.catalog.client.ConsistencyLevel;
import io.atomix.catalog.client.Command;
import io.atomix.catalog.client.Query;
import io.atomix.catalog.server.StateMachine;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.PersistenceMode;
import io.atomix.copycat.Resource;
import io.atomix.copycat.atomic.state.AtomicValueCommands;
import io.atomix.copycat.atomic.state.AtomicValueState;
import io.atomix.copycat.resource.ResourceContext;

Expand All @@ -36,7 +37,8 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class DistributedAtomicValue<T> extends Resource {
private ConsistencyLevel defaultConsistency = ConsistencyLevel.LINEARIZABLE_LEASE;
private Command.ConsistencyLevel commandConsistency = Command.ConsistencyLevel.LINEARIZABLE;
private Query.ConsistencyLevel queryConsistency = Query.ConsistencyLevel.LINEARIZABLE;
private final java.util.Set<Consumer<T>> changeListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
Expand All @@ -47,44 +49,73 @@ protected Class<? extends StateMachine> stateMachine() {
@Override
protected void open(ResourceContext context) {
super.open(context);
context.session().<T>onEvent(event -> {
context.session().<T>onEvent("change", event -> {
for (Consumer<T> listener : changeListeners) {
listener.accept(event);
}
});
}

/**
* Sets the default write consistency level.
*
* @param consistency The default write consistency level.
* @throws java.lang.NullPointerException If the consistency level is {@code null}
*/
public void setDefaultCommandConsistency(Command.ConsistencyLevel consistency) {
this.commandConsistency = Assert.notNull(consistency, "consistency");
}

/**
* Sets the default write consistency level, returning the resource for method chaining.
*
* @param consistency The default write consistency level.
* @return The reference.
* @throws java.lang.NullPointerException If the consistency level is {@code null}
*/
public DistributedAtomicValue<T> withDefaultCommandConsistency(Command.ConsistencyLevel consistency) {
setDefaultCommandConsistency(consistency);
return this;
}

/**
* Returns the default write consistency level.
*
* @return The default write consistency level.
*/
public Command.ConsistencyLevel getDefaultCommandConsistency() {
return commandConsistency;
}

/**
* Sets the default read consistency level.
*
* @param consistency The default read consistency level.
* @throws java.lang.NullPointerException If the consistency level is {@code null}
*/
public void setDefaultConsistencyLevel(ConsistencyLevel consistency) {
if (consistency == null)
throw new NullPointerException("consistency cannot be null");
this.defaultConsistency = consistency;
public void setDefaultQueryConsistency(Query.ConsistencyLevel consistency) {
this.queryConsistency = Assert.notNull(consistency, "consistency");
}

/**
* Sets the default consistency level, returning the resource for method chaining.
* Sets the default read consistency level, returning the resource for method chaining.
*
* @param consistency The default read consistency level.
* @return The reference.
* @throws java.lang.NullPointerException If the consistency level is {@code null}
*/
public DistributedAtomicValue<T> withDefaultConsistencyLevel(ConsistencyLevel consistency) {
setDefaultConsistencyLevel(consistency);
public DistributedAtomicValue<T> withDefaultQueryConsistency(Query.ConsistencyLevel consistency) {
setDefaultQueryConsistency(consistency);
return this;
}

/**
* Returns the default consistency level.
* Returns the default read consistency level.
*
* @return The default consistency level.
* @return The default read consistency level.
*/
public ConsistencyLevel getDefaultConsistencyLevel() {
return defaultConsistency;
public Query.ConsistencyLevel getDefaultQueryConsistency() {
return queryConsistency;
}

/**
Expand All @@ -93,7 +124,7 @@ public ConsistencyLevel getDefaultConsistencyLevel() {
* @return A completable future to be completed with the current value.
*/
public CompletableFuture<T> get() {
return get(defaultConsistency);
return get(queryConsistency);
}

/**
Expand All @@ -102,7 +133,7 @@ public CompletableFuture<T> get() {
* @param consistency The read consistency level.
* @return A completable future to be completed with the current value.
*/
public CompletableFuture<T> get(ConsistencyLevel consistency) {
public CompletableFuture<T> get(Query.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.Get.<T>builder()
.withConsistency(consistency)
.build());
Expand All @@ -115,9 +146,7 @@ public CompletableFuture<T> get(ConsistencyLevel consistency) {
* @return A completable future to be completed once the value has been set.
*/
public CompletableFuture<Void> set(T value) {
return submit(AtomicValueCommands.Set.builder()
.withValue(value)
.build());
return set(value, commandConsistency);
}

/**
Expand All @@ -128,23 +157,20 @@ public CompletableFuture<Void> set(T value) {
* @return A completable future to be completed once the value has been set.
*/
public CompletableFuture<Void> set(T value, Duration ttl) {
return submit(AtomicValueCommands.Set.builder()
.withValue(value)
.withTtl(ttl.toMillis())
.build());
return set(value, ttl, commandConsistency);
}

/**
* Sets the value with a write persistence.
* Sets the current value.
*
* @param value The value to set.
* @param persistence The write persistence.
* @param value The current value.
* @param consistency The write consistency level.
* @return A completable future to be completed once the value has been set.
*/
public CompletableFuture<Void> set(T value, PersistenceMode persistence) {
public CompletableFuture<Void> set(T value, Command.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.Set.builder()
.withValue(value)
.withPersistence(persistence)
.withConsistency(consistency)
.build());
}

Expand All @@ -153,14 +179,14 @@ public CompletableFuture<Void> set(T value, PersistenceMode persistence) {
*
* @param value The value to set.
* @param ttl The time after which to expire the value.
* @param persistence The write persistence.
* @param consistency The write consistency level.
* @return A completable future to be completed once the value has been set.
*/
public CompletableFuture<Void> set(T value, Duration ttl, PersistenceMode persistence) {
public CompletableFuture<Void> set(T value, Duration ttl, Command.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.Set.builder()
.withValue(value)
.withTtl(ttl.toMillis())
.withPersistence(persistence)
.withConsistency(consistency)
.build());
}

Expand All @@ -171,9 +197,7 @@ public CompletableFuture<Void> set(T value, Duration ttl, PersistenceMode persis
* @return A completable future to be completed with the previous value.
*/
public CompletableFuture<T> getAndSet(T value) {
return submit(AtomicValueCommands.GetAndSet.<T>builder()
.withValue(value)
.build());
return getAndSet(value, commandConsistency);
}

/**
Expand All @@ -184,23 +208,20 @@ public CompletableFuture<T> getAndSet(T value) {
* @return A completable future to be completed with the previous value.
*/
public CompletableFuture<T> getAndSet(T value, Duration ttl) {
return submit(AtomicValueCommands.GetAndSet.<T>builder()
.withValue(value)
.withTtl(ttl.toMillis())
.build());
return getAndSet(value, ttl, commandConsistency);
}

/**
* Gets the current value and updates it.
*
* @param value The updated value.
* @param persistence The write persistence.
* @param consistency The write consistency level.
* @return A completable future to be completed with the previous value.
*/
public CompletableFuture<T> getAndSet(T value, PersistenceMode persistence) {
public CompletableFuture<T> getAndSet(T value, Command.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.GetAndSet.<T>builder()
.withValue(value)
.withPersistence(persistence)
.withConsistency(consistency)
.build());
}

Expand All @@ -209,14 +230,14 @@ public CompletableFuture<T> getAndSet(T value, PersistenceMode persistence) {
*
* @param value The updated value.
* @param ttl The time after which to expire the value.
* @param persistence The write persistence.
* @param consistency The write consistency level.
* @return A completable future to be completed with the previous value.
*/
public CompletableFuture<T> getAndSet(T value, Duration ttl, PersistenceMode persistence) {
public CompletableFuture<T> getAndSet(T value, Duration ttl, Command.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.GetAndSet.<T>builder()
.withValue(value)
.withTtl(ttl.toMillis())
.withPersistence(persistence)
.withConsistency(consistency)
.build());
}

Expand All @@ -228,10 +249,7 @@ public CompletableFuture<T> getAndSet(T value, Duration ttl, PersistenceMode per
* @return A completable future to be completed with a boolean value indicating whether the value was updated.
*/
public CompletableFuture<Boolean> compareAndSet(T expect, T update) {
return submit(AtomicValueCommands.CompareAndSet.builder()
.withExpect(expect)
.withUpdate(update)
.build());
return compareAndSet(expect, update, commandConsistency);
}

/**
Expand All @@ -243,26 +261,22 @@ public CompletableFuture<Boolean> compareAndSet(T expect, T update) {
* @return A completable future to be completed with a boolean value indicating whether the value was updated.
*/
public CompletableFuture<Boolean> compareAndSet(T expect, T update, Duration ttl) {
return submit(AtomicValueCommands.CompareAndSet.builder()
.withExpect(expect)
.withUpdate(update)
.withTtl(ttl.toMillis())
.build());
return compareAndSet(expect, update, ttl, commandConsistency);
}

/**
* Compares the current value and updated it if expected value == the current value.
*
* @param expect The expected value.
* @param update The updated value.
* @param persistence The write persistence.
* @param consistency The write consistency level.
* @return A completable future to be completed with a boolean value indicating whether the value was updated.
*/
public CompletableFuture<Boolean> compareAndSet(T expect, T update, PersistenceMode persistence) {
public CompletableFuture<Boolean> compareAndSet(T expect, T update, Command.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.CompareAndSet.builder()
.withExpect(expect)
.withUpdate(update)
.withPersistence(persistence)
.withConsistency(consistency)
.build());
}

Expand All @@ -272,15 +286,15 @@ public CompletableFuture<Boolean> compareAndSet(T expect, T update, PersistenceM
* @param expect The expected value.
* @param update The updated value.
* @param ttl The time after which to expire the value.
* @param persistence The write persistence.
* @param consistency The write consistency level.
* @return A completable future to be completed with a boolean value indicating whether the value was updated.
*/
public CompletableFuture<Boolean> compareAndSet(T expect, T update, Duration ttl, PersistenceMode persistence) {
public CompletableFuture<Boolean> compareAndSet(T expect, T update, Duration ttl, Command.ConsistencyLevel consistency) {
return submit(AtomicValueCommands.CompareAndSet.builder()
.withExpect(expect)
.withUpdate(update)
.withTtl(ttl.toMillis())
.withPersistence(persistence)
.withConsistency(consistency)
.build());
}

Expand Down

0 comments on commit 59e9c1c

Please sign in to comment.