Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 92 additions & 48 deletions hazelcast/proxy/map.py

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions hazelcast/proxy/multi_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def handler(message):
request = multi_map_key_set_codec.encode_request(self.name)
return self._invoke(request, handler)

def lock(self, key, lease_time=-1):
def lock(self, key, lease_time=None):
"""Acquires the lock for the specified key infinitely or for the specified lease time if provided.

If the lock is not available, the current thread becomes disabled for thread scheduling purposes and lies
Expand Down Expand Up @@ -365,19 +365,19 @@ def handler(message):
request = multi_map_values_codec.encode_request(self.name)
return self._invoke(request, handler)

def try_lock(self, key, lease_time=-1, timeout=-1):
def try_lock(self, key, lease_time=None, timeout=0):
"""Tries to acquire the lock for the specified key.

When the lock is not available:

- If timeout is not provided, the current thread doesn't wait and returns ``false`` immediately.
- If a timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies
- If the timeout is not provided, the current thread doesn't wait and returns ``False`` immediately.
- If the timeout is provided, the current thread becomes disabled for thread scheduling purposes and lies
dormant until one of the followings happens:

- the lock is acquired by the current thread, or
- the specified waiting time elapses.
- The lock is acquired by the current thread, or
- The specified waiting time elapses.

If lease_time is provided, lock will be released after this time elapses.
If the lease time is provided, lock will be released after this time elapses.

Args:
key: Key to lock in this map.
Expand Down
19 changes: 6 additions & 13 deletions hazelcast/proxy/queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from hazelcast.errors import IllegalStateError
from hazelcast.protocol.codec import \
queue_add_all_codec, \
queue_add_listener_codec, \
Expand All @@ -22,14 +23,6 @@
from hazelcast.util import check_not_none, to_millis, ImmutableLazyDataList


class Empty(Exception):
pass


class Full(Exception):
pass


class Queue(PartitionSpecificProxy):
"""Concurrent, blocking, distributed, observable queue.

Expand All @@ -48,7 +41,7 @@ def add(self, item):
def result_fnc(f):
if f.result():
return True
raise Full("Queue is full!")
raise IllegalStateError("Queue is full!")

return self.offer(item).continue_with(result_fnc)

Expand Down Expand Up @@ -192,8 +185,8 @@ def offer(self, item, timeout=0):

If there is no space currently available:

- If a timeout is provided, it waits until this timeout elapses and returns the result.
- If a timeout is not provided, returns ``False`` immediately.
- If the timeout is provided, it waits until this timeout elapses and returns the result.
- If the timeout is not provided, returns ``False`` immediately.

Args:
item: The item to be added.
Expand Down Expand Up @@ -224,8 +217,8 @@ def poll(self, timeout=0):

If this queue is empty:

- If a timeout is provided, it waits until this timeout elapses and returns the result.
- If a timeout is not provided, returns ``None``.
- If the timeout is provided, it waits until this timeout elapses and returns the result.
- If the timeout is not provided, returns ``None``.

Args:
timeout (int): Maximum time in seconds to wait for addition.
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/proxy/transactional_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def is_empty(self):
request = transactional_map_is_empty_codec.encode_request(self.name, self.transaction.id, thread_id())
return self._invoke(request, transactional_map_is_empty_codec.decode_response)

def put(self, key, value, ttl=-1):
def put(self, key, value, ttl=None):
"""Transactional implementation of :func:`Map.put(key, value, ttl) <hazelcast.proxy.map.Map.put>`

The object to be put will be accessible only in the current transaction context till the transaction is
Expand Down
2 changes: 2 additions & 0 deletions hazelcast/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def thread_id():


def to_millis(seconds):
if seconds is None:
return -1
return int(seconds * MILLISECONDS_IN_SECONDS)


Expand Down
102 changes: 102 additions & 0 deletions tests/proxy/map_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,105 @@ def assert_event():
self.assertEntryEvent(event, key='key', value='value', event_type=EntryEventType.LOADED)

self.assertTrueEventually(assert_event, 10)


class MapTTLTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

def setUp(self):
self.map = self.client.get_map(random_string()).blocking()

def tearDown(self):
self.map.destroy()

def test_put_default_ttl(self):
self.map.put("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_put(self):
self.map.put("key", "value", 0.1)
self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key")))

def test_put_transient_default_ttl(self):
self.map.put_transient("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_put_transient(self):
self.map.put_transient("key", "value", 0.1)
self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key")))

def test_put_if_absent_ttl(self):
self.map.put_if_absent("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_put_if_absent(self):
self.map.put_if_absent("key", "value", 0.1)
self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key")))

def test_set_default_ttl(self):
self.map.set("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_set(self):
self.map.set("key", "value", 0.1)
self.assertTrueEventually(lambda: self.assertFalse(self.map.contains_key("key")))


class MapMaxIdleTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

def setUp(self):
self.map = self.client.get_map(random_string()).blocking()

def tearDown(self):
self.map.destroy()

def test_put_default_max_idle(self):
self.map.put("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_put(self):
self.map.put("key", "value", max_idle=0.1)
time.sleep(1.0)
self.assertFalse(self.map.contains_key("key"))

def test_put_transient_default_max_idle(self):
self.map.put_transient("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_put_transient(self):
self.map.put_transient("key", "value", max_idle=0.1)
time.sleep(1.0)
self.assertFalse(self.map.contains_key("key"))

def test_put_if_absent_max_idle(self):
self.map.put_if_absent("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_put_if_absent(self):
self.map.put_if_absent("key", "value", max_idle=0.1)
time.sleep(1.0)
self.assertFalse(self.map.contains_key("key"))

def test_set_default_ttl(self):
self.map.set("key", "value")
time.sleep(1.0)
self.assertTrue(self.map.contains_key("key"))

def test_set(self):
self.map.set("key", "value", max_idle=0.1)
time.sleep(1.0)
self.assertFalse(self.map.contains_key("key"))
4 changes: 2 additions & 2 deletions tests/proxy/queue_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from hazelcast.errors import IllegalStateError
from hazelcast.proxy.base import ItemEventType
from hazelcast.proxy.queue import Full
from tests.base import SingleMemberTestCase
from tests.util import random_string, event_collector
from hazelcast import six
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_add(self):
def test_add_full(self):
_all = ["1", "2", "3", "4", "5", "6"]
self.queue.add_all(_all)
with self.assertRaises(Full):
with self.assertRaises(IllegalStateError):
self.queue.add("cannot add this one")

def test_add_null_element(self):
Expand Down