Permalink
Browse files

Added tryPut with both ttl and timeout, added putAsync with ttl.

Already existing methods use only timeout field of the packet sending both ttl and timeout in it depending on method. Method tryPut with both ttl and timeout sends timeout as timeout and ttl as ttl in resulting packet, but parameters naming in methods are complete mess. I was trying to make less changes in the code and in packet data sent, to not break compatibility with 2.0.2 client versions. Master branch has more clean implementation

Set US locale when creating SimpleDateFormat for Date.toString parsing in Predicates. Else it will fail when there is non US default locale
  • Loading branch information...
Shohou committed Apr 3, 2012
1 parent d892195 commit a14543da392a7c838d8a4f4bb286a05ab3964638
@@ -277,6 +277,12 @@ public LocalMapStats getLocalMapStats() {
return proxyHelper.doAsync(ClusterOperation.CONCURRENT_MAP_PUT, key, value);
}
+ public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
+ check(key);
+ check(value);
+ return proxyHelper.doAsync(ClusterOperation.CONCURRENT_MAP_PUT, key, value, ttl, timeunit);
+ }
+
public Future<V> removeAsync(K key) {
check(key);
return proxyHelper.doAsync(ClusterOperation.CONCURRENT_MAP_REMOVE, key, null);
@@ -312,6 +318,13 @@ public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
return (Boolean) proxyHelper.doOp(ClusterOperation.CONCURRENT_MAP_TRY_PUT, key, value, timeout, timeunit);
}
+ public boolean tryPut(K key, V value, long ttl, TimeUnit ttlTimeunit, long timeout, TimeUnit timeunit) {
+ check(key);
+ check(value);
+ //FIXME: sending ttl as timeout and timeout as ttl for backward compatibility
+ return (Boolean) proxyHelper.doOpTtlTimeout(ClusterOperation.CONCURRENT_MAP_TRY_PUT, key, value, timeout, timeunit, ttl, ttlTimeunit);
+ }
+
public void putAll(final Map<? extends K, ? extends V> map) {
Pairs pairs = new Pairs(map.size());
for (final K key : map.keySet()) {
@@ -115,16 +115,20 @@ public int getCurrentThreadId() {
}
public Packet createRequestPacket(ClusterOperation operation, byte[] key, byte[] value) {
- return createRequestPacket(operation, key, value, 0, null);
+ return createRequestPacket(operation, key, value, 0, null, 0, null);
}
- private Packet createRequestPacket(ClusterOperation operation, byte[] key, byte[] value, long ttl, TimeUnit timeunit) {
+ private Packet createRequestPacket(ClusterOperation operation, byte[] key, byte[] value, long ttl, TimeUnit ttlTimeunit, long timeout, TimeUnit timeunit) {
Packet request = createRequestPacket();
request.setOperation(operation);
request.setKey(key);
request.setValue(value);
- if (ttl >= 0 && timeunit != null) {
- request.setTimeout(timeunit.toMillis(ttl));
+ //FIXME: sending timeout as ttl and ttl as timeout for backward compatibility
+ if (ttl >= 0 && ttlTimeunit != null) {
+ request.setTimeout(ttlTimeunit.toMillis(ttl));
+ }
+ if (timeout >= 0 && timeunit != null) {
+ request.setTtl(timeunit.toMillis(timeout));
}
return request;
}
@@ -136,6 +140,13 @@ private Packet createRequestPacket(ClusterOperation operation, byte[] key, byte[
return new AsyncClientCall<V>(remoteCall);
}
+ <V> Future<V> doAsync(final ClusterOperation operation, final Object key, final Object value, long ttl, TimeUnit timeunit) {
+ Packet request = prepareRequest(operation, key, value, ttl, timeunit);
+ Call remoteCall = createCall(request);
+ sendCall(remoteCall);
+ return new AsyncClientCall<V>(remoteCall);
+ }
+
protected Object doOp(ClusterOperation operation, Object key, Object value) {
return doOp(operation, key, value, 0, null);
}
@@ -146,14 +157,20 @@ public Object doOp(ClusterOperation operation, Object key, Object value, long tt
return getValue(response);
}
+ public Object doOpTtlTimeout(ClusterOperation operation, Object key, Object value, long ttl, TimeUnit ttlTimeunit, long timeout, TimeUnit timeunit) {
+ Packet request = prepareRequest(operation, key, value, ttl, ttlTimeunit, timeout, timeunit);
+ Packet response = callAndGetResult(request);
+ return getValue(response);
+ }
+
public void doFireAndForget(ClusterOperation operation, Object key, Object value) {
Packet request = prepareRequest(operation, key, value, 0, null);
Call fireNForgetCall = createCall(request);
fireNForgetCall.setFireNforget(true);
sendCall(fireNForgetCall);
}
- Packet prepareRequest(ClusterOperation operation, Object key, Object value, long ttl, TimeUnit timeunit) {
+ Packet prepareRequest(ClusterOperation operation, Object key, Object value, long ttl, TimeUnit ttlTimeunit, long timeout, TimeUnit timeunit) {
byte[] k = null;
byte[] v = null;
if (key != null) {
@@ -162,7 +179,7 @@ Packet prepareRequest(ClusterOperation operation, Object key, Object value, long
if (value != null) {
v = toByte(value);
}
- Packet packet = createRequestPacket(operation, k, v, ttl, timeunit);
+ Packet packet = createRequestPacket(operation, k, v, ttl, ttlTimeunit, timeout, timeunit);
if (key instanceof PartitionAware) {
Object partitionKey = ((PartitionAware) key).getPartitionKey();
if (partitionKey == null) throw new IllegalArgumentException("PartitionKey cannot be null!");
@@ -176,6 +193,10 @@ Packet prepareRequest(ClusterOperation operation, Object key, Object value, long
return packet;
}
+ Packet prepareRequest(ClusterOperation operation, Object key, Object value, long ttl, TimeUnit timeunit) {
+ return prepareRequest(operation, key, value, ttl, timeunit, -1, null);
+ }
+
Packet prepareRequest(ClusterOperation operation, Object key,
Object value) {
return prepareRequest(operation, key, value, 0, null);
@@ -292,6 +292,18 @@ public void tryPut() throws InterruptedException {
assertEquals(1, map.size());
}
+ @Test
+ public void tryPutWithTTL() throws InterruptedException {
+ HazelcastClient hClient = getHazelcastClient();
+ IMap<String, String> map = hClient.getMap("tryPut");
+ assertEquals(0, map.size());
+ Boolean result = map.tryPut("1", "CBDEF", 100, TimeUnit.MILLISECONDS, 1, TimeUnit.SECONDS);
+ assertTrue(result);
+ assertEquals(1, map.size());
+ Thread.sleep(300);
+ assertEquals(0, map.size());
+ }
+
@Test
public void putAndGetEmployeeObjects() {
HazelcastClient hClient = getHazelcastClient();
@@ -120,6 +120,38 @@
*/
Future<V> putAsync(K key, V value);
+ /**
+ * Asynchronously puts the given key and value with a given ttl (time to live) value.
+ * Entry will expire and get evicted after the ttl. If ttl is 0, then
+ * the entry lives forever.
+ * <code>
+ * Future future = map.putAsync(key, value);
+ * // do some other stuff, when ready get the result
+ * Object oldValue = future.get();
+ * </code>
+ * Future.get() will block until the actual map.get() completes.
+ * If the application requires timely response,
+ * then Future.get(timeout, timeunit) can be used.
+ * <code>
+ * try{
+ * Future future = map.putAsync(key, newValue);
+ * Object oldValue = future.get(40, TimeUnit.MILLISECOND);
+ * }catch (TimeoutException t) {
+ * // time wasn't enough
+ * }
+ * </code>
+ * ExecutionException is never thrown.
+ *
+ * @param key the key of the map entry
+ * @param value the new value of the map entry
+ * @param ttl maximum time for this entry to stay in the map
+ * 0 means infinite.
+ * @param timeunit time unit for the ttl
+ * @return Future from which the old value of the key can be retrieved.
+ * @see java.util.concurrent.Future
+ */
+ Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit);
+
/**
* Asynchronously removes the given key.
*
@@ -160,6 +192,26 @@
*/
boolean tryPut(K key, V value, long timeout, TimeUnit timeunit);
+ /**
+ * Tries to put the given key, value into this map within specified
+ * timeout value with a given ttl (time to live) value.
+ * Entry will expire and get evicted after the ttl. If ttl is 0, then
+ * the entry lives forever. If this method returns false, it means that
+ * the caller thread couldn't acquire the lock for the key within
+ * timeout duration, thus put operation is not successful.
+ *
+ * @param key key of the entry
+ * @param value value of the entry
+ * @param ttl maximum time for this entry to stay in the map
+ * 0 means infinite.
+ * @param ttlTimeunit time unit for the ttl
+ * @param timeout maximum time to wait
+ * @param timeunit time unit for the timeout
+ * @return <tt>true</tt> if the put is successful, <tt>false</tt>
+ * otherwise.
+ */
+ boolean tryPut(K key, V value, long ttl, TimeUnit ttlTimeunit, long timeout, TimeUnit timeunit);
+
/**
* Puts an entry into this map with a given ttl (time to live) value.
* Entry will expire and get evicted after the ttl. If ttl is 0, then
@@ -62,7 +62,6 @@ public ClientHandlerService(Node node) {
this.logger = node.getLogger(this.getClass().getName());
node.getClusterImpl().addMembershipListener(new ClientServiceMembershipListener());
registerHandler(CONCURRENT_MAP_PUT.getValue(), new MapPutHandler());
- registerHandler(CONCURRENT_MAP_PUT.getValue(), new MapPutHandler());
registerHandler(CONCURRENT_MAP_PUT_AND_UNLOCK.getValue(), new MapPutAndUnlockHandler());
registerHandler(CONCURRENT_MAP_PUT_ALL.getValue(), new MapPutAllHandler());
registerHandler(CONCURRENT_MAP_PUT_MULTI.getValue(), new MapPutMultiHandler());
@@ -958,15 +957,15 @@ protected Data processMapOp(IMap<Object, Object> map, Data key, Data value, long
}
}
- private class MapTryPutHandler extends ClientMapOperationHandlerWithTTL {
+ private class MapTryPutHandler extends ClientMapOperationHandlerWithTTLAndTimeout {
@Override
- protected Data processMapOp(IMap<Object, Object> map, Data key, Data value, long ttl) {
+ protected Data processMapOp(IMap<Object, Object> map, Data key, Data value, long ttl, long timeout) {
MProxy mproxy = (MProxy) map;
Object v = value;
if (node.concurrentMapManager.isMapIndexed(mproxy.getLongName())) {
v = toObject(value);
}
- return toData(map.tryPut(key, v, ttl, TimeUnit.MILLISECONDS));
+ return toData(map.tryPut(key, v, ttl, TimeUnit.MILLISECONDS, timeout, TimeUnit.MILLISECONDS));
}
}
@@ -1609,6 +1608,17 @@ public void processCall(Node node, final Packet packet) {
protected abstract Data processMapOp(IMap<Object, Object> map, Data keyData, Data valueData, long ttl);
}
+ private abstract class ClientMapOperationHandlerWithTTLAndTimeout extends ClientOperationHandler {
+ public void processCall(Node node, final Packet packet) {
+ final IMap<Object, Object> map = (IMap) factory.getOrCreateProxyByName(packet.name);
+ Data value = processMapOp(map, packet.getKeyData(), packet.getValueData(), packet.ttl, packet.timeout);
+ packet.clearForResponse();
+ packet.setValue(value);
+ }
+
+ protected abstract Data processMapOp(IMap<Object, Object> map, Data keyData, Data valueData, long ttl, long timeout);
+ }
+
private abstract class ClientQueueOperationHandler extends ClientOperationHandler {
public abstract Data processQueueOp(IQueue<Object> queue, Data key, Data value);
@@ -152,6 +152,21 @@ protected void call() {
return call;
}
+ public Future putAsync(Object key, Object value, final long ttl, final TimeUnit timeunit) {
+ beforeCall();
+ final MProxyImpl mProxy = MProxyImpl.this;
+ final Data dataKey = toData(key);
+ final Data dataValue = toData(value);
+ AsyncCall call = new AsyncCall() {
+ @Override
+ protected void call() {
+ setResult(mProxy.put(dataKey, dataValue, ttl, timeunit));
+ }
+ };
+ factory.node.executorManager.executeAsync(call);
+ return call;
+ }
+
public Future removeAsync(Object key) {
beforeCall();
final MProxyImpl mProxy = MProxyImpl.this;
@@ -243,6 +258,10 @@ public boolean tryPut(Object key, Object value, long time, TimeUnit timeunit) {
return dynamicProxy.tryPut(key, value, time, timeunit);
}
+ public boolean tryPut(Object key, Object value, long ttl, TimeUnit ttlTimeunit, long time, TimeUnit timeunit) {
+ return dynamicProxy.tryPut(key, value, ttl, ttlTimeunit, time, timeunit);
+ }
+
public void set(Object key, Object value, long time, TimeUnit timeunit) {
dynamicProxy.set(key, value, time, timeunit);
}
@@ -585,6 +604,10 @@ public Future putAsync(Object key, Object value) {
throw new UnsupportedOperationException();
}
+ public Future putAsync(Object key, Object value, long ttl, TimeUnit timeunit) {
+ throw new UnsupportedOperationException();
+ }
+
public Future removeAsync(Object key) {
throw new UnsupportedOperationException();
}
@@ -658,6 +681,26 @@ public boolean tryPut(Object key, Object value, long timeout, TimeUnit timeunit)
return result;
}
+ public boolean tryPut(Object key, Object value, long ttl, TimeUnit ttlTimeunit, long timeout, TimeUnit timeunit) {
+ long begin = System.currentTimeMillis();
+ if (timeout < 0) {
+ throw new IllegalArgumentException("timeout value cannot be negative. " + timeout);
+ }
+ timeout = toMillis(timeout, timeunit);
+ if (ttl == 0) {
+ ttl = -1;
+ } else {
+ ttl = toMillis(ttl, ttlTimeunit);
+ }
+ check(key);
+ check(value);
+ MPut mput = ThreadContext.get().getCallCache(factory).getMPut();
+ Boolean result = mput.tryPut(name, key, value, timeout, ttl);
+ mput.clearRequest();
+ mapOperationCounter.incrementPuts(System.currentTimeMillis() - begin);
+ return result;
+ }
+
public Object tryLockAndGet(Object key, long timeout, TimeUnit timeunit) throws TimeoutException {
long begin = System.currentTimeMillis();
if (timeout < 0) {
@@ -1087,6 +1087,7 @@ private static DateFormat getSqlDateFormat() {
}
private static DateFormat getUtilDateFormat() {
- return new SimpleDateFormat(dateFormat);
+ //PredicatesTest.testEqual throws exceptions if not US locale
+ return new SimpleDateFormat(dateFormat, Locale.US);
}
}
@@ -61,6 +61,22 @@ public void testPutAsync() throws Exception {
TestCase.assertEquals(value1, f2Val);
}
+ @Test
+ public void testPutAsyncWithTTL() throws Exception {
+ IMap<String, String> map = Hazelcast.getMap("map:test:putAsyncTTL");
+ Future<String> f1 = map.putAsync(key, value1);
+ String f1Val = f1.get();
+ TestCase.assertNull(f1Val);
+ Future<String> f2 = map.putAsync(key, value2, 100, TimeUnit.MILLISECONDS);
+ String f2Val = f2.get();
+ TestCase.assertEquals(value1, f2Val);
+ String val = map.get(key);
+ TestCase.assertEquals(value2, val);
+ Thread.sleep(200);
+ val = map.get(key);
+ TestCase.assertNull(val);
+ }
+
@Test
public void testRemoveAsync() throws Exception {
IMap<String, String> map = Hazelcast.getMap("map:test:removeAsync");

0 comments on commit a14543d

Please sign in to comment.