diff --git a/README.md b/README.md index 077597ff07..ea9ab466d4 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,8 @@ * [7.5. Distributed Events](#75-distributed-events) * [7.5.1. Cluster Events](#751-cluster-events) * [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events) - * [7.5.1.2. Listening for Lifecycle Events](#7512-listening-for-lifecycle-events) + * [7.5.1.2. Listenring for Distributed Object Events](#7512-listening-for-distributed-object-events) + * [7.5.1.3. Listening for Lifecycle Events](#7513-listening-for-lifecycle-events) * [7.5.2. Distributed Data Structure Events](#752-distributed-data-structure-events) * [7.5.2.1. Listening for Map Events](#7521-listening-for-map-events) * [7.6. Distributed Computing](#76-distributed-computing) @@ -1737,7 +1738,42 @@ The following is a membership listener registration by using the `add_listener() client.cluster.add_listener(member_added=lambda m: print("Member Added: The address is {}".format(m.address))) ``` -#### 7.5.1.2. Listening for Lifecycle Events +#### 7.5.1.2. Listening for Distributed Object Events + +The events for distributed objects are invoked when they are created and destroyed in the cluster. When an event +is received, listener function will be called. The parameter passed into the listener function will be of the type +``DistributedObjectEvent``. A ``DistributedObjectEvent`` contains the following fields: +* ``name``: Name of the distributed object. +* ``service_name``: Service name of the distributed object. +* ``event_type``: Type of the invoked event. It is either ``CREATED`` or ``DESTROYED``. + +The following is example of adding a distributed object listener to a client. + +```python +def distributed_object_listener(event): + print("Distributed object event >>>", event.name, event.service_name, event.event_type) + +client.add_distributed_object_listener(listener_func=distributed_object_listener) + +map_name = "test_map" + +# This call causes a CREATED event +test_map = client.get_map(map_name) + +# This causes no event because map was already created +test_map2 = client.get_map(map_name) + +# This causes a DESTROYED event +test_map.destroy() +``` + +**Output** +``` +Distributed object event >>> test_map hz:impl:mapService CREATED +Distributed object event >>> test_map hz:impl:mapService DESTROYED +``` + +#### 7.5.1.3. Listening for Lifecycle Events The `Lifecycle Listener` notifies for the following events: diff --git a/examples/cloud-discovery/__init__.py b/examples/cloud-discovery/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/monitoring/distributed_object_listener.py b/examples/monitoring/distributed_object_listener.py new file mode 100644 index 0000000000..59cff5920c --- /dev/null +++ b/examples/monitoring/distributed_object_listener.py @@ -0,0 +1,28 @@ +import hazelcast + + +def distributed_object_listener(event): + print("Distributed object event >>>", event.name, event.service_name, event.event_type) + + +if __name__ == "__main__": + client = hazelcast.HazelcastClient() + + # Register the listener + reg_id = client.add_distributed_object_listener(distributed_object_listener) + + map_name = "test_map" + + # This call causes a CREATED event + test_map = client.get_map(map_name) + + # This causes no event because map was already created + test_map2 = client.get_map(map_name) + + # This causes a DESTROYED event + test_map.destroy() + + # Deregister the listener + client.remove_distributed_object_listener(reg_id) + + client.shutdown() diff --git a/hazelcast/client.py b/hazelcast/client.py index 4acea4bc2c..d7a0cff47c 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -6,10 +6,12 @@ from hazelcast.cluster import ClusterService, RandomLoadBalancer from hazelcast.config import ClientConfig, ClientProperties from hazelcast.connection import ConnectionManager, Heartbeat, DefaultAddressProvider, DefaultAddressTranslator +from hazelcast.core import DistributedObjectInfo from hazelcast.invocation import InvocationService from hazelcast.listener import ListenerService from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN from hazelcast.partition import PartitionService +from hazelcast.protocol.codec import client_get_distributed_objects_codec from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \ REPLICATED_MAP_SERVICE, ATOMIC_LONG_SERVICE, ATOMIC_REFERENCE_SERVICE, RINGBUFFER_SERVICE, COUNT_DOWN_LATCH_SERVICE, \ TOPIC_SERVICE, RELIABLE_TOPIC_SERVICE, SEMAPHORE_SERVICE, LOCK_SERVICE, ID_GENERATOR_SERVICE, \ @@ -251,6 +253,48 @@ def new_transaction(self, timeout=120, durability=1, type=TWO_PHASE): """ return self.transaction_manager.new_transaction(timeout, durability, type) + def add_distributed_object_listener(self, listener_func): + """ + Adds a listener which will be notified when a + new distributed object is created or destroyed. + :param listener_func: Function to be called when a distributed object is created or destroyed. + :return: (str), a registration id which is used as a key to remove the listener. + """ + return self.proxy.add_distributed_object_listener(listener_func) + + def remove_distributed_object_listener(self, registration_id): + """ + Removes the specified distributed object listener. Returns silently if there is no such listener added before. + :param registration_id: (str), id of registered listener. + :return: (bool), ``true`` if registration is removed, ``false`` otherwise. + """ + return self.proxy.remove_distributed_object_listener(registration_id) + + def get_distributed_objects(self): + """ + Returns all distributed objects such as; queue, map, set, list, topic, lock, multimap. + Also, as a side effect, it clears the local instances of the destroyed proxies. + :return:(Sequence), List of instances created by Hazelcast. + """ + request = client_get_distributed_objects_codec.encode_request() + to_object = self.serialization_service.to_object + future = self.invoker.invoke_on_random_target(request) + response = client_get_distributed_objects_codec.decode_response(future.result(), to_object)["response"] + + distributed_objects = self.proxy.get_distributed_objects() + local_distributed_object_infos = set() + for dist_obj in distributed_objects: + local_distributed_object_infos.add(DistributedObjectInfo(dist_obj.name, dist_obj.service_name)) + + for dist_obj_info in response: + local_distributed_object_infos.discard(dist_obj_info) + self.proxy.get_or_create(dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False) + + for dist_obj_info in local_distributed_object_infos: + self.proxy.destroy_proxy(dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False) + + return self.proxy.get_distributed_objects() + def shutdown(self): """ Shuts down this HazelcastClient. diff --git a/hazelcast/core.py b/hazelcast/core.py index 5fd1e04590..9c38069dfe 100644 --- a/hazelcast/core.py +++ b/hazelcast/core.py @@ -3,6 +3,7 @@ from hazelcast import six from hazelcast import util +from hazelcast.util import enum class Member(object): @@ -55,6 +56,39 @@ def __init__(self, name, service_name): def __repr__(self): return "DistributedObjectInfo(name={}, serviceName={})".format(self.name, self.service_name) + def __hash__(self): + return hash((self.name, self.service_name)) + + def __eq__(self, other): + if isinstance(other, DistributedObjectInfo): + return self.name == other.name and self.service_name == other.service_name + return False + + +DistributedObjectEventType = enum(CREATED="CREATED", DESTROYED="DESTROYED") +""" +Type of the distributed object event. + +* CREATED : DistributedObject is created. +* DESTROYED: DistributedObject is destroyed. +""" + + +class DistributedObjectEvent(object): + """ + Distributed Object Event + """ + + def __init__(self, name, service_name, event_type): + self.name = name + self.service_name = service_name + self.event_type = DistributedObjectEventType.reverse.get(event_type, None) + + def __repr__(self): + return "DistributedObjectEvent[name={}, " \ + "service_name={}, " \ + "event_type={}]".format(self.name, self.service_name, self.event_type) + class EntryView(object): """ diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index d420fe909b..030cbaa245 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -1,4 +1,6 @@ -from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec +from hazelcast.core import DistributedObjectEvent +from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec, \ + client_add_distributed_object_listener_codec, client_remove_distributed_object_listener_codec from hazelcast.proxy.atomic_long import AtomicLong from hazelcast.proxy.atomic_reference import AtomicReference from hazelcast.proxy.count_down_latch import CountDownLatch @@ -17,6 +19,7 @@ from hazelcast.proxy.topic import Topic from hazelcast.proxy.pn_counter import PNCounter from hazelcast.proxy.flake_id_generator import FlakeIdGenerator +from hazelcast.util import to_list ATOMIC_LONG_SERVICE = "hz:impl:atomicLongService" ATOMIC_REFERENCE_SERVICE = "hz:impl:atomicReferenceService" @@ -66,31 +69,60 @@ def __init__(self, client): self._client = client self._proxies = {} - def get_or_create(self, service_name, name, **kwargs): + def get_or_create(self, service_name, name, create_on_remote=True, **kwargs): ns = (service_name, name) if ns in self._proxies: return self._proxies[ns] - proxy = self.create_proxy(service_name, name, **kwargs) + proxy = self.create_proxy(service_name, name, create_on_remote, **kwargs) self._proxies[ns] = proxy return proxy - def create_proxy(self, service_name, name, **kwargs): - message = client_create_proxy_codec.encode_request(name=name, service_name=service_name, - target=self._find_next_proxy_address()) - self._client.invoker.invoke_on_random_target(message).result() + def create_proxy(self, service_name, name, create_on_remote, **kwargs): + if create_on_remote: + message = client_create_proxy_codec.encode_request(name=name, service_name=service_name, + target=self._find_next_proxy_address()) + self._client.invoker.invoke_on_random_target(message).result() + return _proxy_init[service_name](client=self._client, service_name=service_name, name=name, **kwargs) - def destroy_proxy(self, service_name, name): + def destroy_proxy(self, service_name, name, destroy_on_remote=True): ns = (service_name, name) try: self._proxies.pop(ns) - message = client_destroy_proxy_codec.encode_request(name=name, service_name=service_name) - self._client.invoker.invoke_on_random_target(message).result() + if destroy_on_remote: + message = client_destroy_proxy_codec.encode_request(name=name, service_name=service_name) + self._client.invoker.invoke_on_random_target(message).result() return True except KeyError: return False + def get_distributed_objects(self): + return to_list(self._proxies.values()) + + def add_distributed_object_listener(self, listener_func): + is_smart = self._client.config.network_config.smart_routing + request = client_add_distributed_object_listener_codec.encode_request(is_smart) + + def handle_distributed_object_event(**kwargs): + event = DistributedObjectEvent(**kwargs) + listener_func(event) + + def event_handler(client_message): + return client_add_distributed_object_listener_codec.handle(client_message, handle_distributed_object_event) + + def decode_add_listener(response): + return client_add_distributed_object_listener_codec.decode_response(response)["response"] + + def encode_remove_listener(registration_id): + return client_remove_distributed_object_listener_codec.encode_request(registration_id) + + return self._client.listener.register_listener(request, decode_add_listener, + encode_remove_listener, event_handler) + + def remove_distributed_object_listener(self, registration_id): + return self._client.listener.deregister_listener(registration_id) + def _find_next_proxy_address(self): # TODO: filter out lite members return self._client.load_balancer.next_address() diff --git a/hazelcast/util.py b/hazelcast/util.py index 4ec58fdd78..7d01411326 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -341,3 +341,7 @@ def create_git_info(): if GIT_COMMIT_ID: return "(" + GIT_COMMIT_ID + ") " return "" + + +def to_list(*args, **kwargs): + return list(*args, **kwargs) diff --git a/tests/base.py b/tests/base.py index 358237bd48..c7fdc7592a 100644 --- a/tests/base.py +++ b/tests/base.py @@ -77,6 +77,11 @@ def assertEntryEvent(self, event, event_type, key=None, value=None, old_value=No self.assertEqual(event.old_value, old_value) self.assertEqual(event.number_of_affected_entries, number_of_affected_entries) + def assertDistributedObjectEvent(self, event, name, service_name, event_type): + self.assertEqual(name, event.name) + self.assertEqual(service_name, event.service_name) + self.assertEqual(event_type, event.event_type) + def set_logging_level(self, level): logging.getLogger().setLevel(level) diff --git a/tests/proxy/distributed_objects_test.py b/tests/proxy/distributed_objects_test.py new file mode 100644 index 0000000000..ab5ea6e21c --- /dev/null +++ b/tests/proxy/distributed_objects_test.py @@ -0,0 +1,110 @@ +import hazelcast +from hazelcast.core import DistributedObjectEventType + +from hazelcast.proxy import MAP_SERVICE +from tests.base import SingleMemberTestCase +from tests.util import event_collector +from hazelcast import six + + +class DistributedObjectsTest(SingleMemberTestCase): + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, cls.configure_cluster()) + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + self.member = self.cluster.start_member() + self.client = hazelcast.HazelcastClient() + + def tearDown(self): + self.client.shutdown() + self.member.shutdown() + + def test_get_distributed_objects(self): + six.assertCountEqual(self, [], self.client.get_distributed_objects()) + + m = self.client.get_map("map") + s = self.client.get_set("set") + q = self.client.get_queue("queue") + + six.assertCountEqual(self, [m, s, q], self.client.get_distributed_objects()) + + def test_get_distributed_objects_clears_destroyed_proxies(self): + m = self.client.get_map("map") + + six.assertCountEqual(self, [m], self.client.get_distributed_objects()) + + other_client = hazelcast.HazelcastClient() + other_clients_map = other_client.get_map("map") + other_clients_map.destroy() + + six.assertCountEqual(self, [], self.client.get_distributed_objects()) + + def test_add_distributed_object_listener_object_created(self): + collector = event_collector() + self.client.add_distributed_object_listener(listener_func=collector) + + self.client.get_map("test-map") + + def assert_event(): + self.assertEqual(1, len(collector.events)) + event = collector.events[0] + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, DistributedObjectEventType.CREATED) + + self.assertTrueEventually(assert_event) + + def test_add_distributed_object_listener_object_destroyed(self): + collector = event_collector() + m = self.client.get_map("test-map") + self.client.add_distributed_object_listener(listener_func=collector) + + m.destroy() + + def assert_event(): + self.assertEqual(1, len(collector.events)) + event = collector.events[0] + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, DistributedObjectEventType.DESTROYED) + + self.assertTrueEventually(assert_event) + + def test_add_distributed_object_listener_object_created_and_destroyed(self): + collector = event_collector() + self.client.add_distributed_object_listener(listener_func=collector) + + m = self.client.get_map("test-map") + m.destroy() + + def assert_event(): + self.assertEqual(2, len(collector.events)) + created_event = collector.events[0] + destroyed_event = collector.events[1] + self.assertDistributedObjectEvent(created_event, "test-map", MAP_SERVICE, + DistributedObjectEventType.CREATED) + self.assertDistributedObjectEvent(destroyed_event, "test-map", MAP_SERVICE, + DistributedObjectEventType.DESTROYED) + + self.assertTrueEventually(assert_event) + + def test_remove_distributed_object_listener(self): + collector = event_collector() + reg_id = self.client.add_distributed_object_listener(listener_func=collector) + m = self.client.get_map("test-map") + + response = self.client.remove_distributed_object_listener(reg_id) + self.assertTrue(response) + m.destroy() + + # only map creation should be notified + def assert_event(): + self.assertEqual(1, len(collector.events)) + event = collector.events[0] + self.assertDistributedObjectEvent(event, "test-map", MAP_SERVICE, DistributedObjectEventType.CREATED) + self.assertTrueEventually(assert_event) + + def test_remove_invalid_distributed_object_listener(self): + self.assertFalse(self.client.remove_distributed_object_listener("invalid-reg-id"))