diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 3ac3560d43..011f7608d1 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -17,7 +17,8 @@ map_add_interceptor_codec, map_execute_on_all_keys_codec, map_execute_on_key_codec, map_execute_on_keys_codec, \ map_execute_with_predicate_codec, map_add_near_cache_invalidation_listener_codec, map_add_index_codec, \ map_set_ttl_codec, map_entries_with_paging_predicate_codec, map_key_set_with_paging_predicate_codec, \ - map_values_with_paging_predicate_codec + map_values_with_paging_predicate_codec, map_put_with_max_idle_codec, map_put_if_absent_with_max_idle_codec, \ + map_put_transient_with_max_idle_codec, map_set_with_max_idle_codec from hazelcast.proxy.base import Proxy, EntryEvent, EntryEventType, get_entry_listener_flags, MAX_SIZE from hazelcast.predicate import PagingPredicate from hazelcast.util import check_not_none, thread_id, to_millis, ImmutableLazyDataList, IterationType @@ -56,6 +57,7 @@ class Map(Proxy): This class does not allow ``None`` to be used as a key or value. """ + def __init__(self, service_name, name, context): super(Map, self).__init__(service_name, name, context) self._reference_id_generator = context.lock_reference_id_generator @@ -629,7 +631,7 @@ def load_all(self, keys=None, replace_existing_values=True): request = map_load_all_codec.encode_request(self.name, replace_existing_values) return self._invoke(request) - def lock(self, key, ttl=-1): + def lock(self, key, lease_time=None): """Acquires the lock for the specified key infinitely or for the specified lease time if provided. If the lock is not available, the current thread becomes disabled for thread scheduling purposes and lies @@ -651,7 +653,7 @@ def lock(self, key, ttl=-1): Args: key: The key to lock. - ttl (int): Time in seconds to wait before releasing the lock. + lease_time (int): Time in seconds to wait before releasing the lock. Returns: hazelcast.future.Future[None]: @@ -659,14 +661,14 @@ def lock(self, key, ttl=-1): check_not_none(key, "key can't be None") key_data = self._to_data(key) - request = map_lock_codec.encode_request(self.name, key_data, thread_id(), to_millis(ttl), + request = map_lock_codec.encode_request(self.name, key_data, thread_id(), to_millis(lease_time), self._reference_id_generator.get_and_increment()) partition_id = self._context.partition_service.get_partition_id(key_data) invocation = Invocation(request, partition_id=partition_id, timeout=MAX_SIZE) self._invocation_service.invoke(invocation) return invocation.future - def put(self, key, value, ttl=-1): + def put(self, key, value, ttl=None, max_idle=None): """Associates the specified value with the specified key in this map. If the map previously contained a mapping for the key, the old value is replaced by the specified value. @@ -683,8 +685,14 @@ def put(self, key, value, ttl=-1): Args: key: The specified key. value: The value to associate with the key. - ttl (int): Maximum time in seconds for this entry to stay, if not provided, - the value configured on server side configuration will be used. + ttl (int): Maximum time in seconds for this entry to stay in the map. + If not provided, the value configured on the server side + configuration will be used. Setting this to ``0`` means infinite + time-to-live. + max_idle (int): Maximum time in seconds for this entry to stay idle + in the map. If not provided, the value configured on the server + side configuration will be used. Setting this to ``0`` means + infinite max idle time. Returns: hazelcast.future.Future[any]: Previous value associated with key @@ -694,7 +702,7 @@ def put(self, key, value, ttl=-1): check_not_none(value, "value can't be None") key_data = self._to_data(key) value_data = self._to_data(value) - return self._put_internal(key_data, value_data, ttl) + return self._put_internal(key_data, value_data, ttl, max_idle) def put_all(self, map): """Copies all of the mappings from the specified map to this map. @@ -733,7 +741,7 @@ def put_all(self, map): return combine_futures(futures) - def put_if_absent(self, key, value, ttl=-1): + def put_if_absent(self, key, value, ttl=None, max_idle=None): """Associates the specified key with the given value if it is not already associated. If ttl is provided, entry will expire and get evicted after the ttl. @@ -756,8 +764,14 @@ def put_if_absent(self, key, value, ttl=-1): Args: key: Key of the entry. value: Value of the entry. - ttl (int): Maximum time in seconds for this entry to stay in the map, - if not provided, the value configured on server side configuration will be used. + ttl (int): Maximum time in seconds for this entry to stay in the map. + If not provided, the value configured on the server side + configuration will be used. Setting this to ``0`` means infinite + time-to-live. + max_idle (int): Maximum time in seconds for this entry to stay idle + in the map. If not provided, the value configured on the server + side configuration will be used. Setting this to ``0`` means + infinite max idle time. Returns: hazelcast.future.Future[any]: Old value of the entry. @@ -767,9 +781,9 @@ def put_if_absent(self, key, value, ttl=-1): key_data = self._to_data(key) value_data = self._to_data(value) - return self._put_if_absent_internal(key_data, value_data, ttl) + return self._put_if_absent_internal(key_data, value_data, ttl, max_idle) - def put_transient(self, key, value, ttl=-1): + def put_transient(self, key, value, ttl=None, max_idle=None): """Same as ``put``, but MapStore defined at the server side will not be called. Warning: @@ -779,8 +793,14 @@ def put_transient(self, key, value, ttl=-1): Args: key: Key of the entry. value: Value of the entry. - ttl (int): Maximum time in seconds for this entry to stay in the map, if not provided, - the value configured on server side configuration will be used. + ttl (int): Maximum time in seconds for this entry to stay in the map. + If not provided, the value configured on the server side + configuration will be used. Setting this to ``0`` means infinite + time-to-live. + max_idle (int): Maximum time in seconds for this entry to stay idle + in the map. If not provided, the value configured on the server + side configuration will be used. Setting this to ``0`` means + infinite max idle time. Returns: hazelcast.future.Future[None]: @@ -790,7 +810,7 @@ def put_transient(self, key, value, ttl=-1): key_data = self._to_data(key) value_data = self._to_data(value) - return self._put_transient_internal(key_data, value_data, ttl) + return self._put_transient_internal(key_data, value_data, ttl, max_idle) def remove(self, key): """Removes the mapping for a key from this map if it is present. @@ -921,7 +941,7 @@ def replace_if_same(self, key, old_value, new_value): return self._replace_if_same_internal(key_data, old_value_data, new_value_data) - def set(self, key, value, ttl=-1): + def set(self, key, value, ttl=None, max_idle=None): """Puts an entry into this map. Similar to the put operation except that set doesn't return the old value, which is more efficient. @@ -934,8 +954,14 @@ def set(self, key, value, ttl=-1): Args: key: Key of the entry. value: Value of the entry. - ttl (int): Maximum time in seconds for this entry to stay in the map, 0 means infinite. - If ttl is not provided, the value configured on server side configuration will be used. + ttl (int): Maximum time in seconds for this entry to stay in the map. + If not provided, the value configured on the server side + configuration will be used. Setting this to ``0`` means infinite + time-to-live. + max_idle (int): Maximum time in seconds for this entry to stay idle + in the map. If not provided, the value configured on the server + side configuration will be used. Setting this to ``0`` means + infinite max idle time. Returns: hazelcast.future.Future[None]: @@ -944,7 +970,7 @@ def set(self, key, value, ttl=-1): check_not_none(value, "value can't be None") key_data = self._to_data(key) value_data = self._to_data(value) - return self._set_internal(key_data, value_data, ttl) + return self._set_internal(key_data, value_data, ttl, max_idle) def set_ttl(self, key, ttl): """Updates the TTL (time to live) value of the entry specified by the given key with a new TTL value. @@ -955,8 +981,8 @@ def set_ttl(self, key, ttl): Args: key: The key of the map entry. - ttl (int): Maximum time for this entry to stay in the map (0 means infinite, - negative means map config default) + ttl (int): Maximum time in seconds for this entry to stay in the map. + Setting this to ``0`` means infinite time-to-live. Returns: hazelcast.future.Future[None]: @@ -975,23 +1001,23 @@ def size(self): request = map_size_codec.encode_request(self.name) return self._invoke(request, map_size_codec.decode_response) - def try_lock(self, key, ttl=-1, timeout=0): + def try_lock(self, key, lease_time=None, timeout=0): """Tries to acquire the lock for the specified key. When the lock is not available: - - If timeout is not provided, the current thread doesn't wait and returns ``false`` immediately. - - If a timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies + - If the timeout is not provided, the current thread doesn't wait and returns ``False`` immediately. + - If the timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies dormant until one of the followings happens: - - the lock is acquired by the current thread, or - - the specified waiting time elapses. + - The lock is acquired by the current thread, or + - The specified waiting time elapses. - If ttl is provided, lock will be released after this time elapses. + If the lease time is provided, lock will be released after this time elapses. Args: key: Key to lock in this map. - ttl (int): Time in seconds to wait before releasing the lock. + lease_time (int): Time in seconds to wait before releasing the lock. timeout (int): Maximum time in seconds to wait for the lock. Returns: @@ -1001,7 +1027,7 @@ def try_lock(self, key, ttl=-1, timeout=0): key_data = self._to_data(key) request = map_try_lock_codec.encode_request(self.name, key_data, thread_id(), - to_millis(ttl), to_millis(timeout), + to_millis(lease_time), to_millis(timeout), self._reference_id_generator.get_and_increment()) partition_id = self._context.partition_service.get_partition_id(key_data) invocation = Invocation(request, partition_id=partition_id, timeout=MAX_SIZE, @@ -1017,7 +1043,7 @@ def try_put(self, key, value, timeout=0): Args: key: Key of the entry. value: Value of the entry. - timeout (int): Operation timeout in seconds. + timeout (int): Maximum time in seconds to wait. Returns: hazelcast.future.Future[bool] ``True`` if the put is successful, ``False`` otherwise. @@ -1037,7 +1063,7 @@ def try_remove(self, key, timeout=0): Args: key: Key of the entry to be deleted. - timeout (int): Operation timeout in seconds. + timeout (int): Maximum time in seconds to wait. Returns: hazelcast.future.Future[bool]: ``True`` if the remove is successful, ``False`` otherwise. @@ -1154,15 +1180,23 @@ def _delete_internal(self, key_data): request = map_delete_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data) - def _put_internal(self, key_data, value_data, ttl): + def _put_internal(self, key_data, value_data, ttl, max_idle): def handler(message): return self._to_object(map_put_codec.decode_response(message)) - request = map_put_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(ttl)) + if max_idle is not None: + request = map_put_with_max_idle_codec.encode_request(self.name, key_data, value_data, thread_id(), + to_millis(ttl), to_millis(max_idle)) + else: + request = map_put_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(ttl)) return self._invoke_on_key(request, key_data, handler) - def _set_internal(self, key_data, value_data, ttl): - request = map_set_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(ttl)) + def _set_internal(self, key_data, value_data, ttl, max_idle): + if max_idle is not None: + request = map_set_with_max_idle_codec.encode_request(self.name, key_data, value_data, thread_id(), + to_millis(ttl), to_millis(max_idle)) + else: + request = map_set_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(ttl)) return self._invoke_on_key(request, key_data) def _set_ttl_internal(self, key_data, ttl): @@ -1177,15 +1211,25 @@ def _try_put_internal(self, key_data, value_data, timeout): request = map_try_put_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(timeout)) return self._invoke_on_key(request, key_data, map_try_put_codec.decode_response) - def _put_transient_internal(self, key_data, value_data, ttl): - request = map_put_transient_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(ttl)) + def _put_transient_internal(self, key_data, value_data, ttl, max_idle): + if max_idle is not None: + request = map_put_transient_with_max_idle_codec.encode_request(self.name, key_data, value_data, thread_id(), + to_millis(ttl), to_millis(max_idle)) + else: + request = map_put_transient_codec.encode_request(self.name, key_data, value_data, thread_id(), + to_millis(ttl)) return self._invoke_on_key(request, key_data) - def _put_if_absent_internal(self, key_data, value_data, ttl): + def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle): def handler(message): return self._to_object(map_put_if_absent_codec.decode_response(message)) - request = map_put_if_absent_codec.encode_request(self.name, key_data, value_data, thread_id(), to_millis(ttl)) + if max_idle is not None: + request = map_put_if_absent_with_max_idle_codec.encode_request(self.name, key_data, value_data, thread_id(), + to_millis(ttl), to_millis(max_idle)) + else: + request = map_put_if_absent_codec.encode_request(self.name, key_data, value_data, thread_id(), + to_millis(ttl)) return self._invoke_on_key(request, key_data, handler) def _replace_if_same_internal(self, key_data, old_value_data, new_value_data): @@ -1320,9 +1364,9 @@ def _try_put_internal(self, key_data, value_data, timeout): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._try_put_internal(key_data, value_data, timeout) - def _set_internal(self, key_data, value_data, ttl): + def _set_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) - return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl) + return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl, max_idle) def _set_ttl_internal(self, key_data, ttl): self._invalidate_cache(key_data) @@ -1344,17 +1388,17 @@ def _remove_if_same_internal_(self, key_data, value_data): self._invalidate_cache(key_data) return super(MapFeatNearCache, self)._remove_if_same_internal_(key_data, value_data) - def _put_transient_internal(self, key_data, value_data, ttl): + def _put_transient_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) - return super(MapFeatNearCache, self)._put_transient_internal(key_data, value_data, ttl) + return super(MapFeatNearCache, self)._put_transient_internal(key_data, value_data, ttl, max_idle) - def _put_internal(self, key_data, value_data, ttl): + def _put_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) - return super(MapFeatNearCache, self)._put_internal(key_data, value_data, ttl) + return super(MapFeatNearCache, self)._put_internal(key_data, value_data, ttl, max_idle) - def _put_if_absent_internal(self, key_data, value_data, ttl): + def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle): self._invalidate_cache(key_data) - return super(MapFeatNearCache, self)._put_if_absent_internal(key_data, value_data, ttl) + return super(MapFeatNearCache, self)._put_if_absent_internal(key_data, value_data, ttl, max_idle) def _load_all_internal(self, key_data_list, replace_existing_values): self._invalidate_cache_batch(key_data_list) diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index d473ff3364..f6d3503159 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -211,7 +211,7 @@ def handler(message): request = multi_map_key_set_codec.encode_request(self.name) return self._invoke(request, handler) - def lock(self, key, lease_time=-1): + def lock(self, key, lease_time=None): """Acquires the lock for the specified key infinitely or for the specified lease time if provided. If the lock is not available, the current thread becomes disabled for thread scheduling purposes and lies @@ -365,19 +365,19 @@ def handler(message): request = multi_map_values_codec.encode_request(self.name) return self._invoke(request, handler) - def try_lock(self, key, lease_time=-1, timeout=-1): + def try_lock(self, key, lease_time=None, timeout=0): """Tries to acquire the lock for the specified key. When the lock is not available: - - If timeout is not provided, the current thread doesn't wait and returns ``false`` immediately. - - If a timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies + - If the timeout is not provided, the current thread doesn't wait and returns ``False`` immediately. + - If the timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies dormant until one of the followings happens: - - the lock is acquired by the current thread, or - - the specified waiting time elapses. + - The lock is acquired by the current thread, or + - The specified waiting time elapses. - If lease_time is provided, lock will be released after this time elapses. + If the lease time is provided, lock will be released after this time elapses. Args: key: Key to lock in this map. diff --git a/hazelcast/proxy/queue.py b/hazelcast/proxy/queue.py index 868c1632c8..546b2fdc91 100644 --- a/hazelcast/proxy/queue.py +++ b/hazelcast/proxy/queue.py @@ -1,3 +1,4 @@ +from hazelcast.errors import IllegalStateError from hazelcast.protocol.codec import \ queue_add_all_codec, \ queue_add_listener_codec, \ @@ -22,14 +23,6 @@ from hazelcast.util import check_not_none, to_millis, ImmutableLazyDataList -class Empty(Exception): - pass - - -class Full(Exception): - pass - - class Queue(PartitionSpecificProxy): """Concurrent, blocking, distributed, observable queue. @@ -48,7 +41,7 @@ def add(self, item): def result_fnc(f): if f.result(): return True - raise Full("Queue is full!") + raise IllegalStateError("Queue is full!") return self.offer(item).continue_with(result_fnc) @@ -192,8 +185,8 @@ def offer(self, item, timeout=0): If there is no space currently available: - - If a timeout is provided, it waits until this timeout elapses and returns the result. - - If a timeout is not provided, returns ``False`` immediately. + - If the timeout is provided, it waits until this timeout elapses and returns the result. + - If the timeout is not provided, returns ``False`` immediately. Args: item: The item to be added. @@ -224,8 +217,8 @@ def poll(self, timeout=0): If this queue is empty: - - If a timeout is provided, it waits until this timeout elapses and returns the result. - - If a timeout is not provided, returns ``None``. + - If the timeout is provided, it waits until this timeout elapses and returns the result. + - If the timeout is not provided, returns ``None``. Args: timeout (int): Maximum time in seconds to wait for addition. diff --git a/hazelcast/proxy/transactional_map.py b/hazelcast/proxy/transactional_map.py index a7cd7cb6e6..c1aa1a1934 100644 --- a/hazelcast/proxy/transactional_map.py +++ b/hazelcast/proxy/transactional_map.py @@ -87,7 +87,7 @@ def is_empty(self): request = transactional_map_is_empty_codec.encode_request(self.name, self.transaction.id, thread_id()) return self._invoke(request, transactional_map_is_empty_codec.decode_response) - def put(self, key, value, ttl=-1): + def put(self, key, value, ttl=None): """Transactional implementation of :func:`Map.put(key, value, ttl) ` The object to be put will be accessible only in the current transaction context till the transaction is diff --git a/hazelcast/util.py b/hazelcast/util.py index 7d925198fa..5abf30c5e5 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -55,6 +55,8 @@ def thread_id(): def to_millis(seconds): + if seconds is None: + return -1 return int(seconds * MILLISECONDS_IN_SECONDS) diff --git a/tests/proxy/map_test.py b/tests/proxy/map_test.py index f3755a4241..4ff05ea49b 100644 --- a/tests/proxy/map_test.py +++ b/tests/proxy/map_test.py @@ -591,3 +591,105 @@ def assert_event(): self.assertEntryEvent(event, key='key', value='value', event_type=EntryEventType.LOADED) self.assertTrueEventually(assert_event, 10) + + +class MapTTLTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + def setUp(self): + self.map = self.client.get_map(random_string()).blocking() + + def tearDown(self): + self.map.destroy() + + def test_put_default_ttl(self): + self.map.put("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_put(self): + self.map.put("key", "value", 0.1) + self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key"))) + + def test_put_transient_default_ttl(self): + self.map.put_transient("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_put_transient(self): + self.map.put_transient("key", "value", 0.1) + self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key"))) + + def test_put_if_absent_ttl(self): + self.map.put_if_absent("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_put_if_absent(self): + self.map.put_if_absent("key", "value", 0.1) + self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key"))) + + def test_set_default_ttl(self): + self.map.set("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_set(self): + self.map.set("key", "value", 0.1) + self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key"))) + + +class MapMaxIdleTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + def setUp(self): + self.map = self.client.get_map(random_string()).blocking() + + def tearDown(self): + self.map.destroy() + + def test_put_default_max_idle(self): + self.map.put("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_put(self): + self.map.put("key", "value", max_idle=0.1) + time.sleep(1.0) + self.assertFalse(self.map.contains_key("key")) + + def test_put_transient_default_max_idle(self): + self.map.put_transient("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_put_transient(self): + self.map.put_transient("key", "value", max_idle=0.1) + time.sleep(1.0) + self.assertFalse(self.map.contains_key("key")) + + def test_put_if_absent_max_idle(self): + self.map.put_if_absent("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_put_if_absent(self): + self.map.put_if_absent("key", "value", max_idle=0.1) + time.sleep(1.0) + self.assertFalse(self.map.contains_key("key")) + + def test_set_default_ttl(self): + self.map.set("key", "value") + time.sleep(1.0) + self.assertTrue(self.map.contains_key("key")) + + def test_set(self): + self.map.set("key", "value", max_idle=0.1) + time.sleep(1.0) + self.assertFalse(self.map.contains_key("key")) diff --git a/tests/proxy/queue_test.py b/tests/proxy/queue_test.py index d8da2115f1..e33bb4e63a 100644 --- a/tests/proxy/queue_test.py +++ b/tests/proxy/queue_test.py @@ -1,7 +1,7 @@ import os +from hazelcast.errors import IllegalStateError from hazelcast.proxy.base import ItemEventType -from hazelcast.proxy.queue import Full from tests.base import SingleMemberTestCase from tests.util import random_string, event_collector from hazelcast import six @@ -97,7 +97,7 @@ def test_add(self): def test_add_full(self): _all = ["1", "2", "3", "4", "5", "6"] self.queue.add_all(_all) - with self.assertRaises(Full): + with self.assertRaises(IllegalStateError): self.queue.add("cannot add this one") def test_add_null_element(self):