Skip to content

Commit

Permalink
Ensure event threads can be blocked by futures.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed May 1, 2016
1 parent 559f25a commit 379efe3
Show file tree
Hide file tree
Showing 31 changed files with 558 additions and 275 deletions.
61 changes: 26 additions & 35 deletions collections/src/main/java/io/atomix/collections/DistributedMap.java
Expand Up @@ -109,7 +109,7 @@ public DistributedMap(CopycatClient client, Properties options) {
* @return A completable future to be completed with a boolean value indicating whether the map is empty. * @return A completable future to be completed with a boolean value indicating whether the map is empty.
*/ */
public CompletableFuture<Boolean> isEmpty() { public CompletableFuture<Boolean> isEmpty() {
return submit(new MapCommands.IsEmpty()); return client.submit(new MapCommands.IsEmpty());
} }


/** /**
Expand Down Expand Up @@ -151,7 +151,7 @@ public CompletableFuture<Boolean> isEmpty() {
* @return A completable future to be completed with a boolean value indicating whether the map is empty. * @return A completable future to be completed with a boolean value indicating whether the map is empty.
*/ */
public CompletableFuture<Boolean> isEmpty(ReadConsistency consistency) { public CompletableFuture<Boolean> isEmpty(ReadConsistency consistency) {
return submit(new MapCommands.IsEmpty(), consistency); return client.submit(new MapCommands.IsEmpty(consistency.level()));
} }


/** /**
Expand Down Expand Up @@ -191,7 +191,7 @@ public CompletableFuture<Boolean> isEmpty(ReadConsistency consistency) {
* @return A completable future to be completed with the number of entries in the map. * @return A completable future to be completed with the number of entries in the map.
*/ */
public CompletableFuture<Integer> size() { public CompletableFuture<Integer> size() {
return submit(new MapCommands.Size()); return client.submit(new MapCommands.Size());
} }


/** /**
Expand Down Expand Up @@ -231,7 +231,7 @@ public CompletableFuture<Integer> size() {
* @return A completable future to be completed with the number of entries in the map. * @return A completable future to be completed with the number of entries in the map.
*/ */
public CompletableFuture<Integer> size(ReadConsistency consistency) { public CompletableFuture<Integer> size(ReadConsistency consistency) {
return submit(new MapCommands.Size(), consistency); return client.submit(new MapCommands.Size(consistency.level()));
} }


/** /**
Expand Down Expand Up @@ -275,7 +275,7 @@ public CompletableFuture<Integer> size(ReadConsistency consistency) {
* @throws NullPointerException if {@code key} is {@code null} * @throws NullPointerException if {@code key} is {@code null}
*/ */
public CompletableFuture<Boolean> containsKey(Object key) { public CompletableFuture<Boolean> containsKey(Object key) {
return submit(new MapCommands.ContainsKey(key)); return client.submit(new MapCommands.ContainsKey(key));
} }


/** /**
Expand Down Expand Up @@ -319,7 +319,7 @@ public CompletableFuture<Boolean> containsKey(Object key) {
* @throws NullPointerException if {@code key} is {@code null} * @throws NullPointerException if {@code key} is {@code null}
*/ */
public CompletableFuture<Boolean> containsKey(Object key, ReadConsistency consistency) { public CompletableFuture<Boolean> containsKey(Object key, ReadConsistency consistency) {
return submit(new MapCommands.ContainsKey(key), consistency); return client.submit(new MapCommands.ContainsKey(key, consistency.level()));
} }


/** /**
Expand Down Expand Up @@ -363,7 +363,7 @@ public CompletableFuture<Boolean> containsKey(Object key, ReadConsistency consis
* @throws NullPointerException if {@code key} is {@code null} * @throws NullPointerException if {@code key} is {@code null}
*/ */
public CompletableFuture<Boolean> containsValue(Object value) { public CompletableFuture<Boolean> containsValue(Object value) {
return submit(new MapCommands.ContainsValue(value)); return client.submit(new MapCommands.ContainsValue(value));
} }


/** /**
Expand Down Expand Up @@ -407,7 +407,7 @@ public CompletableFuture<Boolean> containsValue(Object value) {
* @throws NullPointerException if {@code key} is {@code null} * @throws NullPointerException if {@code key} is {@code null}
*/ */
public CompletableFuture<Boolean> containsValue(Object value, ReadConsistency consistency) { public CompletableFuture<Boolean> containsValue(Object value, ReadConsistency consistency) {
return submit(new MapCommands.ContainsValue(value), consistency); return client.submit(new MapCommands.ContainsValue(value, consistency.level()));
} }


/** /**
Expand Down Expand Up @@ -450,8 +450,7 @@ public CompletableFuture<Boolean> containsValue(Object value, ReadConsistency co
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> get(Object key) { public CompletableFuture<V> get(Object key) {
return submit(new MapCommands.Get(key)) return client.submit(new MapCommands.Get(key)).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -494,8 +493,7 @@ public CompletableFuture<V> get(Object key) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> get(Object key, ReadConsistency consistency) { public CompletableFuture<V> get(Object key, ReadConsistency consistency) {
return submit(new MapCommands.Get(key), consistency) return client.submit(new MapCommands.Get(key, consistency.level())).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -542,8 +540,7 @@ public CompletableFuture<V> get(Object key, ReadConsistency consistency) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> getOrDefault(Object key, V defaultValue) { public CompletableFuture<V> getOrDefault(Object key, V defaultValue) {
return submit(new MapCommands.GetOrDefault(key, defaultValue)) return client.submit(new MapCommands.GetOrDefault(key, defaultValue)).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -590,8 +587,7 @@ public CompletableFuture<V> getOrDefault(Object key, V defaultValue) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> getOrDefault(Object key, V defaultValue, ReadConsistency consistency) { public CompletableFuture<V> getOrDefault(Object key, V defaultValue, ReadConsistency consistency) {
return submit(new MapCommands.GetOrDefault(key, defaultValue), consistency) return client.submit(new MapCommands.GetOrDefault(key, defaultValue, consistency.level())).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -625,8 +621,7 @@ public CompletableFuture<V> getOrDefault(Object key, V defaultValue, ReadConsist
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> put(K key, V value) { public CompletableFuture<V> put(K key, V value) {
return submit(new MapCommands.Put(key, value)) return client.submit(new MapCommands.Put(key, value)).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -665,8 +660,7 @@ public CompletableFuture<V> put(K key, V value) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> put(K key, V value, Duration ttl) { public CompletableFuture<V> put(K key, V value, Duration ttl) {
return submit(new MapCommands.Put(key, value, ttl.toMillis())) return client.submit(new MapCommands.Put(key, value, ttl.toMillis())).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -699,8 +693,7 @@ public CompletableFuture<V> put(K key, V value, Duration ttl) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> putIfAbsent(K key, V value) { public CompletableFuture<V> putIfAbsent(K key, V value) {
return submit(new MapCommands.PutIfAbsent(key, value)) return client.submit(new MapCommands.PutIfAbsent(key, value)).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -738,7 +731,7 @@ public CompletableFuture<V> putIfAbsent(K key, V value) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl) { public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl) {
return submit(new MapCommands.PutIfAbsent(key, value, ttl.toMillis())).thenApply(result -> (V) result); return client.submit(new MapCommands.PutIfAbsent(key, value, ttl.toMillis())).thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -773,8 +766,7 @@ public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> remove(Object key) { public CompletableFuture<V> remove(Object key) {
return submit(new MapCommands.Remove(key)) return client.submit(new MapCommands.Remove(key)).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -811,7 +803,7 @@ public CompletableFuture<V> remove(Object key) {
* @throws NullPointerException if {@code key} is {@code null} * @throws NullPointerException if {@code key} is {@code null}
*/ */
public CompletableFuture<Boolean> remove(K key, V value) { public CompletableFuture<Boolean> remove(K key, V value) {
return submit(new MapCommands.RemoveIfPresent(key, value)); return client.submit(new MapCommands.RemoveIfPresent(key, value));
} }


/** /**
Expand Down Expand Up @@ -844,8 +836,7 @@ public CompletableFuture<Boolean> remove(K key, V value) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> replace(K key, V value) { public CompletableFuture<V> replace(K key, V value) {
return submit(new MapCommands.Replace(key, value)) return client.submit(new MapCommands.Replace(key, value)).thenApply(result -> (V) result);
.thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -884,7 +875,7 @@ public CompletableFuture<V> replace(K key, V value) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<V> replace(K key, V value, Duration ttl) { public CompletableFuture<V> replace(K key, V value, Duration ttl) {
return submit(new MapCommands.Replace(key, value, ttl.toMillis())).thenApply(result -> (V) result); return client.submit(new MapCommands.Replace(key, value, ttl.toMillis())).thenApply(result -> (V) result);
} }


/** /**
Expand Down Expand Up @@ -920,7 +911,7 @@ public CompletableFuture<V> replace(K key, V value, Duration ttl) {
* @throws NullPointerException if {@code key} is {@code null} * @throws NullPointerException if {@code key} is {@code null}
*/ */
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) { public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
return submit(new MapCommands.ReplaceIfPresent(key, oldValue, newValue)); return client.submit(new MapCommands.ReplaceIfPresent(key, oldValue, newValue));
} }


/** /**
Expand Down Expand Up @@ -963,7 +954,7 @@ public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue, Duration ttl) { public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue, Duration ttl) {
return submit(new MapCommands.ReplaceIfPresent(key, oldValue, newValue, ttl.toMillis())); return client.submit(new MapCommands.ReplaceIfPresent(key, oldValue, newValue, ttl.toMillis()));
} }


/** /**
Expand Down Expand Up @@ -994,7 +985,7 @@ public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue, Duratio
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Set<K>> keySet() { public CompletableFuture<Set<K>> keySet() {
return submit(new MapCommands.KeySet()).thenApply(keys -> (Set<K>) keys); return client.submit(new MapCommands.KeySet()).thenApply(keys -> (Set<K>) keys);
} }


/** /**
Expand Down Expand Up @@ -1025,7 +1016,7 @@ public CompletableFuture<Set<K>> keySet() {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Collection<V>> values() { public CompletableFuture<Collection<V>> values() {
return submit(new MapCommands.Values()).thenApply(values -> (Collection<V>) values); return client.submit(new MapCommands.Values()).thenApply(values -> (Collection<V>) values);
} }


/** /**
Expand Down Expand Up @@ -1056,7 +1047,7 @@ public CompletableFuture<Collection<V>> values() {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Set<Map.Entry<K, V>>> entrySet() { public CompletableFuture<Set<Map.Entry<K, V>>> entrySet() {
return submit(new MapCommands.EntrySet()).thenApply(entries -> (Set<Map.Entry<K, V>>) entries); return client.submit(new MapCommands.EntrySet()).thenApply(entries -> (Set<Map.Entry<K, V>>) entries);
} }


/** /**
Expand All @@ -1083,7 +1074,7 @@ public CompletableFuture<Set<Map.Entry<K, V>>> entrySet() {
* @return A completable future to be completed once the operation is complete. * @return A completable future to be completed once the operation is complete.
*/ */
public CompletableFuture<Void> clear() { public CompletableFuture<Void> clear() {
return submit(new MapCommands.Clear()); return client.submit(new MapCommands.Clear());
} }


} }

0 comments on commit 379efe3

Please sign in to comment.