From 6d00bbf22adf170e4d7525e3027ff028037d6e17 Mon Sep 17 00:00:00 2001 From: Metin Dumandag Date: Wed, 11 Mar 2020 21:28:15 +0300 Subject: [PATCH 1/3] implement distributed object listener and get_distributed_objects method --- hazelcast/client.py | 30 ++++++++ hazelcast/proxy/__init__.py | 41 +++++++++-- hazelcast/proxy/base.py | 16 +++++ tests/base.py | 5 ++ tests/proxy/distributed_objects_test.py | 96 +++++++++++++++++++++++++ 5 files changed, 181 insertions(+), 7 deletions(-) create mode 100644 tests/proxy/distributed_objects_test.py diff --git a/hazelcast/client.py b/hazelcast/client.py index 4acea4bc2c..08292e25cf 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -10,6 +10,7 @@ from hazelcast.listener import ListenerService from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN from hazelcast.partition import PartitionService +from hazelcast.protocol.codec import client_get_distributed_objects_codec from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \ REPLICATED_MAP_SERVICE, ATOMIC_LONG_SERVICE, ATOMIC_REFERENCE_SERVICE, RINGBUFFER_SERVICE, COUNT_DOWN_LATCH_SERVICE, \ TOPIC_SERVICE, RELIABLE_TOPIC_SERVICE, SEMAPHORE_SERVICE, LOCK_SERVICE, ID_GENERATOR_SERVICE, \ @@ -251,6 +252,35 @@ def new_transaction(self, timeout=120, durability=1, type=TWO_PHASE): """ return self.transaction_manager.new_transaction(timeout, durability, type) + def add_distributed_object_listener(self, listener_func): + """ + Adds a listener which will be notified when a + new distributed object is created or destroyed. + :param listener_func: Function to be called when a distributed object is created or destroyed. + :return: (str), a registration id which is used as a key to remove the listener. + """ + return self.proxy.add_distributed_object_listener(listener_func) + + def remove_distributed_object_listener(self, registration_id): + """ + Removes the specified distributed object listener. Returns silently if there is no such listener added before. + :param registration_id: (str), id of registered listener. + :return: (bool), ``true`` if registration is removed, ``false`` otherwise. + """ + return self.proxy.remove_distributed_object_listener(registration_id) + + def get_distributed_objects(self): + """ + Returns all distributed objects such as; queue, map, set, list, topic, lock, multimap. + :return:(Sequence), List of instances created by Hazelcast. + """ + request = client_get_distributed_objects_codec.encode_request() + to_object = self.serialization_service.to_object + future = self.invoker.invoke_on_random_target(request) + response = client_get_distributed_objects_codec.decode_response(future.result(), to_object)["response"] + + return [self.proxy.get_or_create(doi.service_name, doi.name, create_on_remote=False) for doi in response] + def shutdown(self): """ Shuts down this HazelcastClient. diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index d420fe909b..d75b965e4a 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -1,6 +1,8 @@ -from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec +from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec, \ + client_add_distributed_object_listener_codec, client_remove_distributed_object_listener_codec from hazelcast.proxy.atomic_long import AtomicLong from hazelcast.proxy.atomic_reference import AtomicReference +from hazelcast.proxy.base import DistributedObjectEvent from hazelcast.proxy.count_down_latch import CountDownLatch from hazelcast.proxy.executor import Executor from hazelcast.proxy.id_generator import IdGenerator @@ -66,19 +68,21 @@ def __init__(self, client): self._client = client self._proxies = {} - def get_or_create(self, service_name, name, **kwargs): + def get_or_create(self, service_name, name, create_on_remote=True, **kwargs): ns = (service_name, name) if ns in self._proxies: return self._proxies[ns] - proxy = self.create_proxy(service_name, name, **kwargs) + proxy = self.create_proxy(service_name, name, create_on_remote, **kwargs) self._proxies[ns] = proxy return proxy - def create_proxy(self, service_name, name, **kwargs): - message = client_create_proxy_codec.encode_request(name=name, service_name=service_name, - target=self._find_next_proxy_address()) - self._client.invoker.invoke_on_random_target(message).result() + def create_proxy(self, service_name, name, create_on_remote, **kwargs): + if create_on_remote: + message = client_create_proxy_codec.encode_request(name=name, service_name=service_name, + target=self._find_next_proxy_address()) + self._client.invoker.invoke_on_random_target(message).result() + return _proxy_init[service_name](client=self._client, service_name=service_name, name=name, **kwargs) def destroy_proxy(self, service_name, name): @@ -91,6 +95,29 @@ def destroy_proxy(self, service_name, name): except KeyError: return False + def add_distributed_object_listener(self, listener_func): + is_smart = self._client.config.network_config.smart_routing + request = client_add_distributed_object_listener_codec.encode_request(is_smart) + + def handle_distributed_object_event(**kwargs): + event = DistributedObjectEvent(**kwargs) + listener_func(event) + + def event_handler(client_message): + return client_add_distributed_object_listener_codec.handle(client_message, handle_distributed_object_event) + + def decode_add_listener(response): + return client_add_distributed_object_listener_codec.decode_response(response)["response"] + + def encode_remove_listener(registration_id): + return client_remove_distributed_object_listener_codec.encode_request(registration_id) + + return self._client.listener.register_listener(request, decode_add_listener, + encode_remove_listener, event_handler) + + def remove_distributed_object_listener(self, registration_id): + return self._client.listener.deregister_listener(registration_id) + def _find_next_proxy_address(self): # TODO: filter out lite members return self._client.load_balancer.next_address() diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index b521037fee..1a585bce38 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -199,3 +199,19 @@ def get_entry_listener_flags(**kwargs): if value: flags |= getattr(EntryEventType, key) return flags + + +class DistributedObjectEvent(object): + """ + Distributed Object Event + """ + + def __init__(self, name, service_name, event_type): + self.name = name + self.service_name = service_name + self.event_type = event_type + + def __repr__(self): + return "DistributedObjectEvent[name={}, " \ + "service_name={}, " \ + "event_type={}]".format(self.name, self.service_name, self.event_type) diff --git a/tests/base.py b/tests/base.py index 358237bd48..c7fdc7592a 100644 --- a/tests/base.py +++ b/tests/base.py @@ -77,6 +77,11 @@ def assertEntryEvent(self, event, event_type, key=None, value=None, old_value=No self.assertEqual(event.old_value, old_value) self.assertEqual(event.number_of_affected_entries, number_of_affected_entries) + def assertDistributedObjectEvent(self, event, name, service_name, event_type): + self.assertEqual(name, event.name) + self.assertEqual(service_name, event.service_name) + self.assertEqual(event_type, event.event_type) + def set_logging_level(self, level): logging.getLogger().setLevel(level) diff --git a/tests/proxy/distributed_objects_test.py b/tests/proxy/distributed_objects_test.py new file mode 100644 index 0000000000..ff2bde7bba --- /dev/null +++ b/tests/proxy/distributed_objects_test.py @@ -0,0 +1,96 @@ +import hazelcast + +from hazelcast.proxy import MAP_SERVICE +from tests.base import SingleMemberTestCase +from tests.util import event_collector +from hazelcast import six + + +class DistributedObjectsTest(SingleMemberTestCase): + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, cls.configure_cluster()) + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + self.member = self.cluster.start_member() + self.client = hazelcast.HazelcastClient() + + def tearDown(self): + self.client.shutdown() + self.member.shutdown() + + def test_get_distributed_objects(self): + six.assertCountEqual(self, [], self.client.get_distributed_objects()) + + m = self.client.get_map("map") + s = self.client.get_set("set") + q = self.client.get_queue("queue") + + six.assertCountEqual(self, [m, s, q], self.client.get_distributed_objects()) + + def test_add_distributed_object_listener_object_created(self): + collector = event_collector() + self.client.add_distributed_object_listener(listener_func=collector) + + self.client.get_map("test-map") + + def assert_event(): + self.assertEqual(1, len(collector.events)) + event = collector.events[0] + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, "CREATED") + + self.assertTrueEventually(assert_event) + + def test_add_distributed_object_listener_object_destroyed(self): + collector = event_collector() + m = self.client.get_map("test-map") + self.client.add_distributed_object_listener(listener_func=collector) + + m.destroy() + + def assert_event(): + self.assertEqual(1, len(collector.events)) + event = collector.events[0] + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, "DESTROYED") + + self.assertTrueEventually(assert_event) + + def test_add_distributed_object_listener_object_created_and_destroyed(self): + collector = event_collector() + self.client.add_distributed_object_listener(listener_func=collector) + + m = self.client.get_map("test-map") + m.destroy() + + def assert_event(): + self.assertEqual(2, len(collector.events)) + created_event = collector.events[0] + destroyed_event = collector.events[1] + self.assertDistributedObjectEvent(created_event, "test-map", MAP_SERVICE, "CREATED") + self.assertDistributedObjectEvent(destroyed_event, "test-map", MAP_SERVICE, "DESTROYED") + + self.assertTrueEventually(assert_event) + + def test_remove_distributed_object_listener(self): + collector = event_collector() + reg_id = self.client.add_distributed_object_listener(listener_func=collector) + m = self.client.get_map("test-map") + + response = self.client.remove_distributed_object_listener(reg_id) + self.assertTrue(response) + m.destroy() + + # only map creation should be notified + def assert_event(): + self.assertEqual(1, len(collector.events)) + event = collector.events[0] + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, "CREATED") + self.assertTrueEventually(assert_event) + + def test_remove_invalid_distributed_object_listener(self): + self.assertFalse(self.client.remove_distributed_object_listener("invalid-reg-id")) From 771e0155f1387dac8afeee262f02ab38ac85f9b3 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Thu, 12 Mar 2020 16:45:10 +0300 Subject: [PATCH 2/3] clear destoyed proxies on get_distributed_objects --- hazelcast/client.py | 16 +++++++++++- hazelcast/core.py | 34 +++++++++++++++++++++++++ hazelcast/proxy/__init__.py | 13 +++++++--- hazelcast/proxy/base.py | 15 ----------- hazelcast/util.py | 4 +++ tests/proxy/distributed_objects_test.py | 24 +++++++++++++---- 6 files changed, 81 insertions(+), 25 deletions(-) diff --git a/hazelcast/client.py b/hazelcast/client.py index 08292e25cf..d7a0cff47c 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -6,6 +6,7 @@ from hazelcast.cluster import ClusterService, RandomLoadBalancer from hazelcast.config import ClientConfig, ClientProperties from hazelcast.connection import ConnectionManager, Heartbeat, DefaultAddressProvider, DefaultAddressTranslator +from hazelcast.core import DistributedObjectInfo from hazelcast.invocation import InvocationService from hazelcast.listener import ListenerService from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN @@ -272,6 +273,7 @@ def remove_distributed_object_listener(self, registration_id): def get_distributed_objects(self): """ Returns all distributed objects such as; queue, map, set, list, topic, lock, multimap. + Also, as a side effect, it clears the local instances of the destroyed proxies. :return:(Sequence), List of instances created by Hazelcast. """ request = client_get_distributed_objects_codec.encode_request() @@ -279,7 +281,19 @@ def get_distributed_objects(self): future = self.invoker.invoke_on_random_target(request) response = client_get_distributed_objects_codec.decode_response(future.result(), to_object)["response"] - return [self.proxy.get_or_create(doi.service_name, doi.name, create_on_remote=False) for doi in response] + distributed_objects = self.proxy.get_distributed_objects() + local_distributed_object_infos = set() + for dist_obj in distributed_objects: + local_distributed_object_infos.add(DistributedObjectInfo(dist_obj.name, dist_obj.service_name)) + + for dist_obj_info in response: + local_distributed_object_infos.discard(dist_obj_info) + self.proxy.get_or_create(dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False) + + for dist_obj_info in local_distributed_object_infos: + self.proxy.destroy_proxy(dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False) + + return self.proxy.get_distributed_objects() def shutdown(self): """ diff --git a/hazelcast/core.py b/hazelcast/core.py index 5fd1e04590..9c38069dfe 100644 --- a/hazelcast/core.py +++ b/hazelcast/core.py @@ -3,6 +3,7 @@ from hazelcast import six from hazelcast import util +from hazelcast.util import enum class Member(object): @@ -55,6 +56,39 @@ def __init__(self, name, service_name): def __repr__(self): return "DistributedObjectInfo(name={}, serviceName={})".format(self.name, self.service_name) + def __hash__(self): + return hash((self.name, self.service_name)) + + def __eq__(self, other): + if isinstance(other, DistributedObjectInfo): + return self.name == other.name and self.service_name == other.service_name + return False + + +DistributedObjectEventType = enum(CREATED="CREATED", DESTROYED="DESTROYED") +""" +Type of the distributed object event. + +* CREATED : DistributedObject is created. +* DESTROYED: DistributedObject is destroyed. +""" + + +class DistributedObjectEvent(object): + """ + Distributed Object Event + """ + + def __init__(self, name, service_name, event_type): + self.name = name + self.service_name = service_name + self.event_type = DistributedObjectEventType.reverse.get(event_type, None) + + def __repr__(self): + return "DistributedObjectEvent[name={}, " \ + "service_name={}, " \ + "event_type={}]".format(self.name, self.service_name, self.event_type) + class EntryView(object): """ diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index d75b965e4a..030cbaa245 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -1,8 +1,8 @@ +from hazelcast.core import DistributedObjectEvent from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec, \ client_add_distributed_object_listener_codec, client_remove_distributed_object_listener_codec from hazelcast.proxy.atomic_long import AtomicLong from hazelcast.proxy.atomic_reference import AtomicReference -from hazelcast.proxy.base import DistributedObjectEvent from hazelcast.proxy.count_down_latch import CountDownLatch from hazelcast.proxy.executor import Executor from hazelcast.proxy.id_generator import IdGenerator @@ -19,6 +19,7 @@ from hazelcast.proxy.topic import Topic from hazelcast.proxy.pn_counter import PNCounter from hazelcast.proxy.flake_id_generator import FlakeIdGenerator +from hazelcast.util import to_list ATOMIC_LONG_SERVICE = "hz:impl:atomicLongService" ATOMIC_REFERENCE_SERVICE = "hz:impl:atomicReferenceService" @@ -85,16 +86,20 @@ def create_proxy(self, service_name, name, create_on_remote, **kwargs): return _proxy_init[service_name](client=self._client, service_name=service_name, name=name, **kwargs) - def destroy_proxy(self, service_name, name): + def destroy_proxy(self, service_name, name, destroy_on_remote=True): ns = (service_name, name) try: self._proxies.pop(ns) - message = client_destroy_proxy_codec.encode_request(name=name, service_name=service_name) - self._client.invoker.invoke_on_random_target(message).result() + if destroy_on_remote: + message = client_destroy_proxy_codec.encode_request(name=name, service_name=service_name) + self._client.invoker.invoke_on_random_target(message).result() return True except KeyError: return False + def get_distributed_objects(self): + return to_list(self._proxies.values()) + def add_distributed_object_listener(self, listener_func): is_smart = self._client.config.network_config.smart_routing request = client_add_distributed_object_listener_codec.encode_request(is_smart) diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index 1a585bce38..5c88e93a91 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -200,18 +200,3 @@ def get_entry_listener_flags(**kwargs): flags |= getattr(EntryEventType, key) return flags - -class DistributedObjectEvent(object): - """ - Distributed Object Event - """ - - def __init__(self, name, service_name, event_type): - self.name = name - self.service_name = service_name - self.event_type = event_type - - def __repr__(self): - return "DistributedObjectEvent[name={}, " \ - "service_name={}, " \ - "event_type={}]".format(self.name, self.service_name, self.event_type) diff --git a/hazelcast/util.py b/hazelcast/util.py index 4ec58fdd78..7d01411326 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -341,3 +341,7 @@ def create_git_info(): if GIT_COMMIT_ID: return "(" + GIT_COMMIT_ID + ") " return "" + + +def to_list(*args, **kwargs): + return list(*args, **kwargs) diff --git a/tests/proxy/distributed_objects_test.py b/tests/proxy/distributed_objects_test.py index ff2bde7bba..8d12dd9f76 100644 --- a/tests/proxy/distributed_objects_test.py +++ b/tests/proxy/distributed_objects_test.py @@ -1,4 +1,5 @@ import hazelcast +from hazelcast.core import DistributedObjectEventType from hazelcast.proxy import MAP_SERVICE from tests.base import SingleMemberTestCase @@ -33,6 +34,17 @@ def test_get_distributed_objects(self): six.assertCountEqual(self, [m, s, q], self.client.get_distributed_objects()) + def test_get_distributed_objects_clears_destroyed_proxies(self): + m = self.client.get_map("map") + + six.assertCountEqual(self, [m], self.client.get_distributed_objects()) + + other_client = hazelcast.HazelcastClient() + other_clients_map = other_client.get_map("map").blocking() + other_clients_map.destroy() + + six.assertCountEqual(self, [], self.client.get_distributed_objects()) + def test_add_distributed_object_listener_object_created(self): collector = event_collector() self.client.add_distributed_object_listener(listener_func=collector) @@ -42,7 +54,7 @@ def test_add_distributed_object_listener_object_created(self): def assert_event(): self.assertEqual(1, len(collector.events)) event = collector.events[0] - self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, "CREATED") + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, DistributedObjectEventType.CREATED) self.assertTrueEventually(assert_event) @@ -56,7 +68,7 @@ def test_add_distributed_object_listener_object_destroyed(self): def assert_event(): self.assertEqual(1, len(collector.events)) event = collector.events[0] - self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, "DESTROYED") + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, DistributedObjectEventType.DESTROYED) self.assertTrueEventually(assert_event) @@ -71,8 +83,10 @@ def assert_event(): self.assertEqual(2, len(collector.events)) created_event = collector.events[0] destroyed_event = collector.events[1] - self.assertDistributedObjectEvent(created_event, "test-map", MAP_SERVICE, "CREATED") - self.assertDistributedObjectEvent(destroyed_event, "test-map", MAP_SERVICE, "DESTROYED") + self.assertDistributedObjectEvent(created_event, "test-map", MAP_SERVICE, + DistributedObjectEventType.CREATED) + self.assertDistributedObjectEvent(destroyed_event, "test-map", MAP_SERVICE, + DistributedObjectEventType.DESTROYED) self.assertTrueEventually(assert_event) @@ -89,7 +103,7 @@ def test_remove_distributed_object_listener(self): def assert_event(): self.assertEqual(1, len(collector.events)) event = collector.events[0] - self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, "CREATED") + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, DistributedObjectEventType.CREATED) self.assertTrueEventually(assert_event) def test_remove_invalid_distributed_object_listener(self): From ca1b5074c0303c05ea78db31067a70e4bbdf05cb Mon Sep 17 00:00:00 2001 From: mdumandag Date: Thu, 12 Mar 2020 17:04:46 +0300 Subject: [PATCH 3/3] add code samples and readme --- README.md | 40 ++++++++++++++++++- examples/cloud-discovery/__init__.py | 0 .../monitoring/distributed_object_listener.py | 28 +++++++++++++ hazelcast/proxy/base.py | 1 - tests/proxy/distributed_objects_test.py | 2 +- 5 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 examples/cloud-discovery/__init__.py create mode 100644 examples/monitoring/distributed_object_listener.py diff --git a/README.md b/README.md index 077597ff07..ea9ab466d4 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,8 @@ * [7.5. Distributed Events](#75-distributed-events) * [7.5.1. Cluster Events](#751-cluster-events) * [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events) - * [7.5.1.2. Listening for Lifecycle Events](#7512-listening-for-lifecycle-events) + * [7.5.1.2. Listenring for Distributed Object Events](#7512-listening-for-distributed-object-events) + * [7.5.1.3. Listening for Lifecycle Events](#7513-listening-for-lifecycle-events) * [7.5.2. Distributed Data Structure Events](#752-distributed-data-structure-events) * [7.5.2.1. Listening for Map Events](#7521-listening-for-map-events) * [7.6. Distributed Computing](#76-distributed-computing) @@ -1737,7 +1738,42 @@ The following is a membership listener registration by using the `add_listener() client.cluster.add_listener(member_added=lambda m: print("Member Added: The address is {}".format(m.address))) ``` -#### 7.5.1.2. Listening for Lifecycle Events +#### 7.5.1.2. Listening for Distributed Object Events + +The events for distributed objects are invoked when they are created and destroyed in the cluster. When an event +is received, listener function will be called. The parameter passed into the listener function will be of the type +``DistributedObjectEvent``. A ``DistributedObjectEvent`` contains the following fields: +* ``name``: Name of the distributed object. +* ``service_name``: Service name of the distributed object. +* ``event_type``: Type of the invoked event. It is either ``CREATED`` or ``DESTROYED``. + +The following is example of adding a distributed object listener to a client. + +```python +def distributed_object_listener(event): + print("Distributed object event >>>", event.name, event.service_name, event.event_type) + +client.add_distributed_object_listener(listener_func=distributed_object_listener) + +map_name = "test_map" + +# This call causes a CREATED event +test_map = client.get_map(map_name) + +# This causes no event because map was already created +test_map2 = client.get_map(map_name) + +# This causes a DESTROYED event +test_map.destroy() +``` + +**Output** +``` +Distributed object event >>> test_map hz:impl:mapService CREATED +Distributed object event >>> test_map hz:impl:mapService DESTROYED +``` + +#### 7.5.1.3. Listening for Lifecycle Events The `Lifecycle Listener` notifies for the following events: diff --git a/examples/cloud-discovery/__init__.py b/examples/cloud-discovery/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/monitoring/distributed_object_listener.py b/examples/monitoring/distributed_object_listener.py new file mode 100644 index 0000000000..59cff5920c --- /dev/null +++ b/examples/monitoring/distributed_object_listener.py @@ -0,0 +1,28 @@ +import hazelcast + + +def distributed_object_listener(event): + print("Distributed object event >>>", event.name, event.service_name, event.event_type) + + +if __name__ == "__main__": + client = hazelcast.HazelcastClient() + + # Register the listener + reg_id = client.add_distributed_object_listener(distributed_object_listener) + + map_name = "test_map" + + # This call causes a CREATED event + test_map = client.get_map(map_name) + + # This causes no event because map was already created + test_map2 = client.get_map(map_name) + + # This causes a DESTROYED event + test_map.destroy() + + # Deregister the listener + client.remove_distributed_object_listener(reg_id) + + client.shutdown() diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index 5c88e93a91..b521037fee 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -199,4 +199,3 @@ def get_entry_listener_flags(**kwargs): if value: flags |= getattr(EntryEventType, key) return flags - diff --git a/tests/proxy/distributed_objects_test.py b/tests/proxy/distributed_objects_test.py index 8d12dd9f76..ab5ea6e21c 100644 --- a/tests/proxy/distributed_objects_test.py +++ b/tests/proxy/distributed_objects_test.py @@ -40,7 +40,7 @@ def test_get_distributed_objects_clears_destroyed_proxies(self): six.assertCountEqual(self, [m], self.client.get_distributed_objects()) other_client = hazelcast.HazelcastClient() - other_clients_map = other_client.get_map("map").blocking() + other_clients_map = other_client.get_map("map") other_clients_map.destroy() six.assertCountEqual(self, [], self.client.get_distributed_objects())