diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientAtomicLongProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientAtomicLongProxy.java
index aaae62a93a7f..860d8b4144af 100644
--- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientAtomicLongProxy.java
+++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientAtomicLongProxy.java
@@ -16,6 +16,7 @@
package com.hazelcast.client.proxy;
+import com.hazelcast.client.impl.ClientMessageDecoder;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.AtomicLongAddAndGetCodec;
import com.hazelcast.client.impl.protocol.codec.AtomicLongAlterAndGetCodec;
@@ -31,6 +32,7 @@
import com.hazelcast.client.impl.protocol.codec.AtomicLongIncrementAndGetCodec;
import com.hazelcast.client.impl.protocol.codec.AtomicLongSetCodec;
import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IFunction;
import static com.hazelcast.util.Preconditions.isNotNull;
@@ -38,8 +40,100 @@
/**
* Proxy implementation of {@link IAtomicLong}.
*/
+@SuppressWarnings("checkstyle:methodcount")
public class ClientAtomicLongProxy extends PartitionSpecificClientProxy implements IAtomicLong {
+ private static final ClientMessageDecoder ADD_AND_GET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongAddAndGetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder COMPARE_AND_SET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Boolean decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongCompareAndSetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder DECREMENT_AND_GET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongDecrementAndGetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder GET_AND_ADD_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongGetAndAddCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder GET_AND_SET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongGetAndSetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder INCREMENT_AND_GET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongIncrementAndGetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder GET_AND_INCREMENT_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongGetAndIncrementCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder SET_ASYNC_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Void decodeClientMessage(ClientMessage clientMessage) {
+ return null;
+ }
+ };
+
+ private static final ClientMessageDecoder ALTER_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Void decodeClientMessage(ClientMessage clientMessage) {
+ return null;
+ }
+ };
+
+ private static final ClientMessageDecoder GET_AND_ALTER_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongGetAndAlterCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder ALTER_AND_GET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongAlterAndGetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder APPLY_DECODER = new ClientMessageDecoder() {
+ @Override
+ public V decodeClientMessage(ClientMessage clientMessage) {
+ return (V) AtomicLongApplyCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
+ private static final ClientMessageDecoder GET_DECODER = new ClientMessageDecoder() {
+ @Override
+ public Long decodeClientMessage(ClientMessage clientMessage) {
+ return AtomicLongGetCodec.decodeResponse(clientMessage).response;
+ }
+ };
+
public ClientAtomicLongProxy(String serviceName, String objectId) {
super(serviceName, objectId);
}
@@ -148,6 +242,88 @@ public void set(long newValue) {
invokeOnPartition(request);
}
+ @Override
+ public ICompletableFuture addAndGetAsync(long delta) {
+ ClientMessage request = AtomicLongAddAndGetCodec.encodeRequest(name, delta);
+ return invokeOnPartitionAsync(request, ADD_AND_GET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture compareAndSetAsync(long expect, long update) {
+ ClientMessage request = AtomicLongCompareAndSetCodec.encodeRequest(name, expect, update);
+ return invokeOnPartitionAsync(request, COMPARE_AND_SET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture decrementAndGetAsync() {
+ ClientMessage request = AtomicLongDecrementAndGetCodec.encodeRequest(name);
+ return invokeOnPartitionAsync(request, DECREMENT_AND_GET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture getAsync() {
+ ClientMessage request = AtomicLongGetCodec.encodeRequest(name);
+ return invokeOnPartitionAsync(request, GET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture getAndAddAsync(long delta) {
+ ClientMessage request = AtomicLongGetAndAddCodec.encodeRequest(name, delta);
+ return invokeOnPartitionAsync(request, GET_AND_ADD_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture getAndSetAsync(long newValue) {
+ ClientMessage request = AtomicLongGetAndSetCodec.encodeRequest(name, newValue);
+ return invokeOnPartitionAsync(request, GET_AND_SET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture incrementAndGetAsync() {
+ ClientMessage request = AtomicLongIncrementAndGetCodec.encodeRequest(name);
+ return invokeOnPartitionAsync(request, INCREMENT_AND_GET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture getAndIncrementAsync() {
+ ClientMessage request = AtomicLongGetAndIncrementCodec.encodeRequest(name);
+ return invokeOnPartitionAsync(request, GET_AND_INCREMENT_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture setAsync(long newValue) {
+ ClientMessage request = AtomicLongSetCodec.encodeRequest(name, newValue);
+ return invokeOnPartitionAsync(request, SET_ASYNC_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture alterAsync(IFunction function) {
+ isNotNull(function, "function");
+ ClientMessage request = AtomicLongAlterCodec.encodeRequest(name, toData(function));
+ return invokeOnPartitionAsync(request, ALTER_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture alterAndGetAsync(IFunction function) {
+ isNotNull(function, "function");
+ ClientMessage request = AtomicLongAlterAndGetCodec.encodeRequest(name, toData(function));
+ return invokeOnPartitionAsync(request, ALTER_AND_GET_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture getAndAlterAsync(IFunction function) {
+ isNotNull(function, "function");
+ ClientMessage request = AtomicLongGetAndAlterCodec.encodeRequest(name, toData(function));
+ return invokeOnPartitionAsync(request, GET_AND_ALTER_DECODER);
+ }
+
+ @Override
+ public ICompletableFuture applyAsync(IFunction function) {
+ isNotNull(function, "function");
+ ClientMessage request = AtomicLongApplyCodec.encodeRequest(name, toData(function));
+ return invokeOnPartitionAsync(request, APPLY_DECODER);
+ }
+
@Override
public String toString() {
return "IAtomicLong{" + "name='" + name + '\'' + '}';
diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java
index b94e723b3f0e..529fa2154de3 100644
--- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java
+++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java
@@ -16,8 +16,14 @@
package com.hazelcast.client.proxy;
+import com.hazelcast.client.impl.ClientMessageDecoder;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.spi.ClientProxy;
+import com.hazelcast.client.spi.impl.ClientInvocation;
+import com.hazelcast.client.spi.impl.ClientInvocationFuture;
+import com.hazelcast.client.util.ClientDelegatingFuture;
+import com.hazelcast.spi.serialization.SerializationService;
+import com.hazelcast.util.ExceptionUtil;
/**
* Base class for proxies of distributed objects that lives in on partition.
@@ -42,4 +48,16 @@ protected ClientMessage invokeOnPartition(ClientMessage req) {
protected T invokeOnPartitionInterruptibly(ClientMessage clientMessage) throws InterruptedException {
return invokeOnPartitionInterruptibly(clientMessage, partitionId);
}
+
+ protected ClientDelegatingFuture invokeOnPartitionAsync(ClientMessage clientMessage,
+ ClientMessageDecoder clientMessageDecoder) {
+ SerializationService serializationService = getContext().getSerializationService();
+
+ try {
+ final ClientInvocationFuture future = new ClientInvocation(getClient(), clientMessage, partitionId).invoke();
+ return new ClientDelegatingFuture(future, serializationService, clientMessageDecoder);
+ } catch (Exception e) {
+ throw ExceptionUtil.rethrow(e);
+ }
+ }
}
diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/atomiclong/ClientAtomicLongTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/atomiclong/ClientAtomicLongTest.java
index f2a35cc1dc60..df2973e8ebd3 100644
--- a/hazelcast-client/src/test/java/com/hazelcast/client/atomiclong/ClientAtomicLongTest.java
+++ b/hazelcast-client/src/test/java/com/hazelcast/client/atomiclong/ClientAtomicLongTest.java
@@ -18,8 +18,10 @@
import com.hazelcast.client.UndefinedErrorCodeException;
import com.hazelcast.client.test.TestHazelcastFactory;
+import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IFunction;
import com.hazelcast.test.ExpectedRuntimeException;
import com.hazelcast.test.HazelcastParallelClassRunner;
@@ -32,6 +34,11 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -77,6 +84,28 @@ public void test() throws Exception {
}
+ @Test
+ public void testAsync() throws Exception {
+ ICompletableFuture future = l.getAndAddAsync(10);
+ assertEquals(0, future.get().longValue());
+
+ ICompletableFuture booleanFuture = l.compareAndSetAsync(10, 42);
+ assertTrue(booleanFuture.get());
+
+ future = l.getAsync();
+ assertEquals(42, future.get().longValue());
+
+ future = l.incrementAndGetAsync();
+ assertEquals(43, future.get().longValue());
+
+ future = l.addAndGetAsync(-13);
+ assertEquals(30, future.get().longValue());
+
+ future = l.alterAndGetAsync(new AddOneFunction());
+ assertEquals(31, future.get().longValue());
+
+ }
+
@Test(expected = IllegalArgumentException.class)
public void apply_whenCalledWithNullFunction() {
IAtomicLong ref = client.getAtomicLong("apply_whenCalledWithNullFunction");
@@ -92,6 +121,50 @@ public void apply() {
assertEquals(0, ref.get());
}
+ @Test
+ public void applyAsync()
+ throws ExecutionException, InterruptedException {
+ IAtomicLong ref = client.getAtomicLong("apply");
+ ICompletableFuture future =ref.applyAsync(new AddOneFunction());
+ assertEquals(new Long(1), future.get());
+ assertEquals(0, ref.get());
+ }
+
+ @Test
+ public void applyBooleanAsync() throws ExecutionException, InterruptedException {
+ final CountDownLatch cdl = new CountDownLatch(1);
+ final IAtomicLong ref = client.getAtomicLong("apply");
+ ICompletableFuture incAndGetFuture = ref.setAsync(1);
+ final AtomicBoolean failed = new AtomicBoolean(true);
+ incAndGetFuture.andThen(new ExecutionCallback() {
+ @Override
+ public void onResponse(Void response) {
+ ICompletableFuture future = ref.applyAsync(new FilterOnesFunction());
+ try {
+ assertEquals(Boolean.TRUE, future.get());
+ failed.set(false);
+ cdl.countDown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+ if (cdl.await(15, TimeUnit.SECONDS)){
+ assertEquals(1, ref.get());
+ assertEquals(false, failed.get());
+ }
+ else {
+ fail("Timeout after 15 seconds");
+ }
+ }
+
@Test
public void apply_whenException() {
IAtomicLong ref = client.getAtomicLong("apply_whenException");
@@ -106,6 +179,24 @@ public void apply_whenException() {
assertEquals(1, ref.get());
}
+ @Test
+ public void applyAsync_whenException() {
+ IAtomicLong ref = client.getAtomicLong("applyAsync_whenException");
+ ref.set(1);
+ try {
+ ICompletableFuture future = ref.applyAsync(new FailingFunction());
+ future.get();
+ } catch (InterruptedException e) {
+ fail();
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass(), UndefinedErrorCodeException.class);
+ assertEquals(((UndefinedErrorCodeException)e.getCause()).getOriginClassName(),
+ ExpectedRuntimeException.class.getName());
+ }
+
+ assertEquals(1, ref.get());
+ }
+
@Test(expected = IllegalArgumentException.class)
public void alter_whenCalledWithNullFunction() {
IAtomicLong ref = client.getAtomicLong("alter_whenCalledWithNullFunction");
@@ -128,6 +219,25 @@ public void alter_whenException() {
assertEquals(10, ref.get());
}
+ @Test
+ public void alterAsync_whenException() {
+ IAtomicLong ref = client.getAtomicLong("alterAsync_whenException");
+ ref.set(10);
+
+ try {
+ ICompletableFuture future = ref.alterAsync(new FailingFunction());
+ future.get();
+ } catch (InterruptedException e) {
+ fail();
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass(), UndefinedErrorCodeException.class);
+ assertEquals(((UndefinedErrorCodeException)e.getCause()).getOriginClassName(),
+ ExpectedRuntimeException.class.getName());
+ }
+
+ assertEquals(10, ref.get());
+ }
+
@Test
public void alter() {
IAtomicLong ref = client.getAtomicLong("alter");
@@ -138,6 +248,18 @@ public void alter() {
}
+ @Test
+ public void alterAsync()
+ throws ExecutionException, InterruptedException {
+ IAtomicLong ref = client.getAtomicLong("alterAsync");
+
+ ref.set(10);
+ ICompletableFuture future = ref.alterAsync(new AddOneFunction());
+ future.get();
+ assertEquals(11, ref.get());
+
+ }
+
@Test(expected = IllegalArgumentException.class)
public void alterAndGet_whenCalledWithNullFunction() {
IAtomicLong ref = client.getAtomicLong("alterAndGet_whenCalledWithNullFunction");
@@ -160,6 +282,25 @@ public void alterAndGet_whenException() {
assertEquals(10, ref.get());
}
+ @Test
+ public void alterAndGetAsync_whenException() {
+ IAtomicLong ref = client.getAtomicLong("alterAndGetAsync_whenException");
+ ref.set(10);
+
+ try {
+ ICompletableFuture future = ref.alterAndGetAsync(new FailingFunction());
+ future.get();
+ } catch (InterruptedException e) {
+ fail();
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass(), UndefinedErrorCodeException.class);
+ assertEquals(((UndefinedErrorCodeException)e.getCause()).getOriginClassName(),
+ ExpectedRuntimeException.class.getName());
+ }
+
+ assertEquals(10, ref.get());
+ }
+
@Test
public void alterAndGet() {
IAtomicLong ref = client.getAtomicLong("alterAndGet");
@@ -169,6 +310,16 @@ public void alterAndGet() {
assertEquals(11, ref.get());
}
+ @Test
+ public void alterAndGetAsync() throws ExecutionException, InterruptedException {
+ IAtomicLong ref = client.getAtomicLong("alterAndGetAsync");
+
+ ICompletableFuture future = ref.setAsync(10);
+ future.get();
+ assertEquals(11, ref.alterAndGetAsync(new AddOneFunction()).get().longValue());
+ assertEquals(11, ref.get());
+ }
+
@Test(expected = IllegalArgumentException.class)
public void getAndAlter_whenCalledWithNullFunction() {
IAtomicLong ref = client.getAtomicLong("getAndAlter_whenCalledWithNullFunction");
@@ -191,6 +342,26 @@ public void getAndAlter_whenException() {
assertEquals(10, ref.get());
}
+ @Test
+ public void getAndAlterAsync_whenException() {
+ IAtomicLong ref = client.getAtomicLong("getAndAlterAsync_whenException");
+ ref.set(10);
+
+ try {
+ ICompletableFuture future = ref.getAndAlterAsync(new FailingFunction());
+ future.get();
+ fail();
+ } catch (InterruptedException e) {
+ assertEquals(e.getCause().getClass().getName(), UndefinedErrorCodeException.class.getName());
+ assertEquals(((UndefinedErrorCodeException)e.getCause()).getOriginClassName(), ExpectedRuntimeException.class.getName());
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass().getName(), UndefinedErrorCodeException.class.getName());
+ assertEquals(((UndefinedErrorCodeException)e.getCause()).getOriginClassName(), ExpectedRuntimeException.class.getName());
+ }
+
+ assertEquals(10, ref.get());
+ }
+
@Test
public void getAndAlter() {
IAtomicLong ref = client.getAtomicLong("getAndAlter");
@@ -200,6 +371,17 @@ public void getAndAlter() {
assertEquals(11, ref.get());
}
+ @Test
+ public void getAndAlterAsync() throws ExecutionException, InterruptedException {
+ IAtomicLong ref = client.getAtomicLong("getAndAlterAsync");
+
+ ref.set(10);
+
+ ICompletableFuture future = ref.getAndAlterAsync(new AddOneFunction());
+ assertEquals(10, future.get().longValue());
+ assertEquals(11, ref.get());
+ }
+
private static class AddOneFunction implements IFunction {
@Override
public Long apply(Long input) {
@@ -207,6 +389,13 @@ public Long apply(Long input) {
}
}
+ private static class FilterOnesFunction implements IFunction {
+ @Override
+ public Boolean apply(Long input) {
+ return input.equals(1L);
+ }
+ }
+
private static class FailingFunction implements IFunction {
@Override
diff --git a/hazelcast/src/main/java/com/hazelcast/concurrent/atomiclong/AtomicLongProxy.java b/hazelcast/src/main/java/com/hazelcast/concurrent/atomiclong/AtomicLongProxy.java
index c117c0d6e70c..348b6dcb7be0 100644
--- a/hazelcast/src/main/java/com/hazelcast/concurrent/atomiclong/AtomicLongProxy.java
+++ b/hazelcast/src/main/java/com/hazelcast/concurrent/atomiclong/AtomicLongProxy.java
@@ -62,113 +62,157 @@ public String getServiceName() {
@Override
public long addAndGet(long delta) {
- return asyncAddAndGet(delta).join();
+ return addAndGetAsync(delta).join();
}
@Override
- public InternalCompletableFuture asyncAddAndGet(long delta) {
- Operation operation = new AddAndGetOperation(name, delta)
- .setPartitionId(partitionId);
+ public InternalCompletableFuture addAndGetAsync(long delta) {
+ Operation operation = new AddAndGetOperation(name, delta).setPartitionId(partitionId);
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncAddAndGet(long delta) {
+ return addAndGetAsync(delta);
+ }
+
@Override
public boolean compareAndSet(long expect, long update) {
- return asyncCompareAndSet(expect, update).join();
+ return compareAndSetAsync(expect, update).join();
}
@Override
- public InternalCompletableFuture asyncCompareAndSet(long expect, long update) {
+ public InternalCompletableFuture compareAndSetAsync(long expect, long update) {
Operation operation = new CompareAndSetOperation(name, expect, update)
.setPartitionId(partitionId);
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncCompareAndSet(long expect, long update) {
+ return compareAndSetAsync(expect, update);
+ }
+
@Override
public void set(long newValue) {
- asyncSet(newValue).join();
+ setAsync(newValue).join();
}
@Override
- public InternalCompletableFuture asyncSet(long newValue) {
+ public InternalCompletableFuture setAsync(long newValue) {
Operation operation = new SetOperation(name, newValue)
.setPartitionId(partitionId);
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncSet(long newValue) {
+ return setAsync(newValue);
+ }
+
@Override
public long getAndSet(long newValue) {
- return asyncGetAndSet(newValue).join();
+ return getAndSetAsync(newValue).join();
}
@Override
- public InternalCompletableFuture asyncGetAndSet(long newValue) {
+ public InternalCompletableFuture getAndSetAsync(long newValue) {
Operation operation = new GetAndSetOperation(name, newValue)
.setPartitionId(partitionId);
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncGetAndSet(long newValue) {
+ return getAndSetAsync(newValue);
+ }
+
@Override
public long getAndAdd(long delta) {
- return asyncGetAndAdd(delta).join();
+ return getAndAddAsync(delta).join();
}
@Override
- public InternalCompletableFuture asyncGetAndAdd(long delta) {
+ public InternalCompletableFuture getAndAddAsync(long delta) {
Operation operation = new GetAndAddOperation(name, delta)
.setPartitionId(partitionId);
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncGetAndAdd(long delta) {
+ return getAndAddAsync(delta);
+ }
+
@Override
public long decrementAndGet() {
- return asyncDecrementAndGet().join();
+ return decrementAndGetAsync().join();
+ }
+
+ @Override
+ public InternalCompletableFuture decrementAndGetAsync() {
+ return addAndGetAsync(-1);
}
@Override
public InternalCompletableFuture asyncDecrementAndGet() {
- return asyncAddAndGet(-1);
+ return addAndGetAsync(-1);
}
@Override
public long get() {
- return asyncGet().join();
+ return getAsync().join();
}
@Override
- public InternalCompletableFuture asyncGet() {
+ public InternalCompletableFuture getAsync() {
Operation operation = new GetOperation(name)
.setPartitionId(partitionId);
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncGet() {
+ return getAsync();
+ }
+
@Override
public long incrementAndGet() {
- return asyncIncrementAndGet().join();
+ return incrementAndGetAsync().join();
+ }
+
+ @Override
+ public InternalCompletableFuture incrementAndGetAsync() {
+ return addAndGetAsync(1);
}
@Override
public InternalCompletableFuture asyncIncrementAndGet() {
- return asyncAddAndGet(1);
+ return addAndGetAsync(1);
}
@Override
public long getAndIncrement() {
- return asyncGetAndIncrement().join();
+ return getAndIncrementAsync().join();
+ }
+
+ @Override
+ public InternalCompletableFuture getAndIncrementAsync() {
+ return getAndAddAsync(1);
}
@Override
public InternalCompletableFuture asyncGetAndIncrement() {
- return asyncGetAndAdd(1);
+ return getAndAddAsync(1);
}
@Override
public void alter(IFunction function) {
- asyncAlter(function).join();
+ alterAsync(function).join();
}
@Override
- public InternalCompletableFuture asyncAlter(IFunction function) {
+ public InternalCompletableFuture alterAsync(IFunction function) {
isNotNull(function, "function");
Operation operation = new AlterOperation(name, function)
@@ -176,13 +220,18 @@ public InternalCompletableFuture asyncAlter(IFunction function
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncAlter(IFunction function) {
+ return alterAsync(function);
+ }
+
@Override
public long alterAndGet(IFunction function) {
- return asyncAlterAndGet(function).join();
+ return alterAndGetAsync(function).join();
}
@Override
- public InternalCompletableFuture asyncAlterAndGet(IFunction function) {
+ public InternalCompletableFuture alterAndGetAsync(IFunction function) {
isNotNull(function, "function");
Operation operation = new AlterAndGetOperation(name, function)
@@ -190,13 +239,18 @@ public InternalCompletableFuture asyncAlterAndGet(IFunction fu
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncAlterAndGet(IFunction function) {
+ return alterAndGetAsync(function);
+ }
+
@Override
public long getAndAlter(IFunction function) {
- return asyncGetAndAlter(function).join();
+ return getAndAlterAsync(function).join();
}
@Override
- public InternalCompletableFuture asyncGetAndAlter(IFunction function) {
+ public InternalCompletableFuture getAndAlterAsync(IFunction function) {
isNotNull(function, "function");
Operation operation = new GetAndAlterOperation(name, function)
@@ -204,13 +258,18 @@ public InternalCompletableFuture asyncGetAndAlter(IFunction fu
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncGetAndAlter(IFunction function) {
+ return getAndAlterAsync(function);
+ }
+
@Override
public R apply(IFunction function) {
- return asyncApply(function).join();
+ return applyAsync(function).join();
}
@Override
- public InternalCompletableFuture asyncApply(IFunction function) {
+ public InternalCompletableFuture applyAsync(IFunction function) {
isNotNull(function, "function");
Operation operation = new ApplyOperation(name, function)
@@ -218,6 +277,11 @@ public InternalCompletableFuture asyncApply(IFunction function)
return invokeOnPartition(operation);
}
+ @Override
+ public InternalCompletableFuture asyncApply(IFunction function) {
+ return applyAsync(function);
+ }
+
@Override
public String toString() {
return "IAtomicLong{" + "name='" + name + '\'' + '}';
diff --git a/hazelcast/src/main/java/com/hazelcast/core/AsyncAtomicLong.java b/hazelcast/src/main/java/com/hazelcast/core/AsyncAtomicLong.java
index 231ed2a77504..5572c2e5fad6 100644
--- a/hazelcast/src/main/java/com/hazelcast/core/AsyncAtomicLong.java
+++ b/hazelcast/src/main/java/com/hazelcast/core/AsyncAtomicLong.java
@@ -26,6 +26,7 @@
* @since 3.2
*/
@Beta
+@Deprecated
public interface AsyncAtomicLong extends IAtomicLong {
/**
diff --git a/hazelcast/src/main/java/com/hazelcast/core/IAtomicLong.java b/hazelcast/src/main/java/com/hazelcast/core/IAtomicLong.java
index 058df7915f30..9cd26c707d61 100644
--- a/hazelcast/src/main/java/com/hazelcast/core/IAtomicLong.java
+++ b/hazelcast/src/main/java/com/hazelcast/core/IAtomicLong.java
@@ -20,6 +20,26 @@
* IAtomicLong is a redundant and highly available distributed alternative to the
* {@link java.util.concurrent.atomic.AtomicLong java.util.concurrent.atomic.AtomicLong}.
*
+ * Asynchronous variants of all methods have been introduced in version 3.7.
+ * Async methods return immediately an {@link ICompletableFuture} from which the operation's result
+ * can be obtained either in a blocking manner or by registering a callback to be executed
+ * upon completion. For example:
+ *
+ *
+ *
+ * ICompletableFuture<Long> future = atomicLong.addAndGetAsync(13);
+ * future.andThen(new ExecutionCallback<Long>() {
+ * void onResponse(Long response) {
+ * // do something with the result
+ * }
+ *
+ * void onFailure(Throwable t) {
+ * // handle failure
+ * }
+ * });
+ *
+ *
+ *
* @see IAtomicReference
*/
public interface IAtomicLong extends DistributedObject {
@@ -139,4 +159,181 @@ public interface IAtomicLong extends DistributedObject {
* @since 3.2
*/
R apply(IFunction function);
+
+ /**
+ * Atomically adds the given value to the current value.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ * The operations result can be obtained in a blocking way, or a
+ * callback can be provided for execution upon completion, as demonstrated in the following examples:
+ *
+ *
+ * ICompletableFuture<Long> future = atomicLong.addAndGetAsync(13);
+ * // do something else, then read the result
+ * Long result = future.get(); // this method will block until the result is available
+ *
+ *
+ *
+ *
+ * ICompletableFuture<Long> future = atomicLong.addAndGetAsync(13);
+ * future.andThen(new ExecutionCallback<Long>() {
+ * void onResponse(Long response) {
+ * // do something with the result
+ * }
+ *
+ * void onFailure(Throwable t) {
+ * // handle failure
+ * }
+ * });
+ *
+ *
+ *
+ * @param delta the value to add
+ * @return an {@link ICompletableFuture} bearing the response
+ * @since 3.7
+ */
+ ICompletableFuture addAndGetAsync(long delta);
+
+ /**
+ * Atomically sets the value to the given updated value
+ * only if the current value {@code ==} the expected value.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param expect the expected value
+ * @param update the new value
+ * @return an {@link ICompletableFuture} with value true if successful; or false if the actual value
+ * was not equal to the expected value.
+ * @since 3.7
+ */
+ ICompletableFuture compareAndSetAsync(long expect, long update);
+
+ /**
+ * Atomically decrements the current value by one.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @return an {@link ICompletableFuture} with the updated value.
+ * @since 3.7
+ */
+ ICompletableFuture decrementAndGetAsync();
+
+ /**
+ * Gets the current value. This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @return an {@link ICompletableFuture} with the current value
+ * @since 3.7
+ */
+ ICompletableFuture getAsync();
+
+ /**
+ * Atomically adds the given value to the current value.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param delta the value to add
+ * @return an {@link ICompletableFuture} with the old value before the addition
+ * @since 3.7
+ */
+ ICompletableFuture getAndAddAsync(long delta);
+
+ /**
+ * Atomically sets the given value and returns the old value.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param newValue the new value
+ * @return an {@link ICompletableFuture} with the old value
+ * @since 3.7
+ */
+ ICompletableFuture getAndSetAsync(long newValue);
+
+ /**
+ * Atomically increments the current value by one.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @return an {@link ICompletableFuture} with the updated value
+ * @since 3.7
+ */
+ ICompletableFuture incrementAndGetAsync();
+
+ /**
+ * Atomically increments the current value by one.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @return an {@link ICompletableFuture} with the old value
+ * @since 3.7
+ */
+ ICompletableFuture getAndIncrementAsync();
+
+ /**
+ * Atomically sets the given value.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param newValue the new value
+ * @return an {@link ICompletableFuture} API consumers can use to track execution of this request
+ * @since 3.7
+ */
+ ICompletableFuture setAsync(long newValue);
+
+ /**
+ * Alters the currently stored value by applying a function on it.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param function the function
+ * @throws IllegalArgumentException if function is null.
+ * @return an {@link ICompletableFuture} API consumers can use to track execution of this request
+ * @since 3.7
+ */
+ ICompletableFuture alterAsync(IFunction function);
+
+ /**
+ * Alters the currently stored value by applying a function on it and gets the result.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param function the function
+ * @return an {@link ICompletableFuture} with the new value.
+ * @throws IllegalArgumentException if function is null.
+ * @since 3.7
+ */
+ ICompletableFuture alterAndGetAsync(IFunction function);
+
+ /**
+ * Alters the currently stored value by applying a function on it on and gets the old value.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ *
+ * @param function the function
+ * @return an {@link ICompletableFuture} with the old value
+ * @throws IllegalArgumentException if function is null.
+ * @since 3.7
+ */
+ ICompletableFuture getAndAlterAsync(IFunction function);
+
+ /**
+ * Applies a function on the value, the actual stored value will not change.
+ * This method will dispatch a request and return immediately an {@link ICompletableFuture}.
+ * Example:
+ *
+ *
+ * class IsOneFunction implements IFunction<Long, Boolean> {
+ * @Override
+ * public Boolean apply(Long input) {
+ * return input.equals(1L);
+ * }
+ * }
+ *
+ * ICompletableFuture future = atomicLong.applyAsync(new IsOneFunction());
+ * future.andThen(new ExecutionCallback<Boolean>() {
+ * void onResponse(Boolean response) {
+ * // do something with the response
+ * }
+ *
+ * void onFailure(Throwable t) {
+ * // handle failure
+ * }
+ * });
+ *
+ *
+ *
+ * @param function the function
+ * @return an {@link ICompletableFuture} with the result of the function application
+ * @throws IllegalArgumentException if function is null.
+ * @since 3.7
+ */
+ ICompletableFuture applyAsync(IFunction function);
}
diff --git a/hazelcast/src/main/java/com/hazelcast/core/ICompletableFuture.java b/hazelcast/src/main/java/com/hazelcast/core/ICompletableFuture.java
index c0afe8733651..fa4477f22f15 100644
--- a/hazelcast/src/main/java/com/hazelcast/core/ICompletableFuture.java
+++ b/hazelcast/src/main/java/com/hazelcast/core/ICompletableFuture.java
@@ -16,8 +16,6 @@
package com.hazelcast.core;
-import com.hazelcast.spi.annotation.Beta;
-
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@@ -34,7 +32,6 @@
* @param
* @since 3.2
*/
-@Beta
public interface ICompletableFuture extends Future {
/**