diff --git a/README.rst b/README.rst index 758a9c9b15..96b56a3aa0 100644 --- a/README.rst +++ b/README.rst @@ -1766,7 +1766,7 @@ A Topic usage example is shown below. print("Got message:", topic_message.message) # Get a Topic called "my-distributed-topic" - topic = client.get_topic("my-distributed-topic") + topic = client.get_topic("my-distributed-topic").blocking() # Add a Listener to the Topic topic.add_listener(print_on_message) @@ -2342,15 +2342,15 @@ client. client.add_distributed_object_listener( listener_func=distributed_object_listener - ) + ).result() map_name = "test_map" # This call causes a CREATED event - test_map = client.get_map(map_name) + test_map = client.get_map(map_name).blocking() # This causes no event because map was already created - test_map2 = client.get_map(map_name) + test_map2 = client.get_map(map_name).blocking() # This causes a DESTROYED event test_map.destroy() @@ -2483,7 +2483,7 @@ See the following example. customer_map.add_entry_listener(include_value=True, clear_all_func=cleared) - customer_map.clear().result() + customer_map.clear() 7.6. Distributed Computing -------------------------- diff --git a/examples/monitoring/distributed_object_listener.py b/examples/monitoring/distributed_object_listener.py index ccf026d5a1..a1aba653fe 100644 --- a/examples/monitoring/distributed_object_listener.py +++ b/examples/monitoring/distributed_object_listener.py @@ -8,7 +8,7 @@ def distributed_object_listener(event): client = hazelcast.HazelcastClient() # Register the listener -reg_id = client.add_distributed_object_listener(distributed_object_listener) +reg_id = client.add_distributed_object_listener(distributed_object_listener).result() map_name = "test_map" @@ -22,6 +22,6 @@ def distributed_object_listener(event): test_map.destroy() # De-register the listener -client.remove_distributed_object_listener(reg_id) +client.remove_distributed_object_listener(reg_id).result() client.shutdown() diff --git a/examples/org-website/topic_sample.py b/examples/org-website/topic_sample.py index 695b1b5379..5a4b602e1c 100644 --- a/examples/org-website/topic_sample.py +++ b/examples/org-website/topic_sample.py @@ -8,7 +8,7 @@ def print_on_message(topic_message): # Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1 hz = hazelcast.HazelcastClient() # Get a Topic called "my-distributed-topic" -topic = hz.get_topic("my-distributed-topic") +topic = hz.get_topic("my-distributed-topic").blocking() # Add a Listener to the Topic topic.add_listener(print_on_message) # Publish a message to the Topic diff --git a/examples/topic/topic_example.py b/examples/topic/topic_example.py index 47ff0d3c86..ec5d0f4443 100644 --- a/examples/topic/topic_example.py +++ b/examples/topic/topic_example.py @@ -9,7 +9,7 @@ def on_message(event): client = hazelcast.HazelcastClient() -topic = client.get_topic("topic") +topic = client.get_topic("topic").blocking() topic.add_listener(on_message) for i in range(10): diff --git a/hazelcast/client.py b/hazelcast/client.py index 0fcab4067a..aafe97e791 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -522,7 +522,7 @@ def add_distributed_object_listener(self, listener_func): listener_func (function): Function to be called when a distributed object is created or destroyed. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ is_smart = self.config.smart_routing request = client_add_distributed_object_listener_codec.encode_request(is_smart) @@ -552,7 +552,7 @@ def remove_distributed_object_listener(self, registration_id): registration_id (str): The id of registered listener. Returns: - bool: ``True`` if registration is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise. """ return self._listener_service.deregister_listener(registration_id) diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 198be81a45..2709a28cad 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -261,7 +261,7 @@ def _register_backup_listener(self): self._listener_service.register_listener(request, codec.decode_response, lambda reg_id: None, - lambda m: codec.handle(m, self._backup_event_handler)) + lambda m: codec.handle(m, self._backup_event_handler)).result() def _backup_event_handler(self, correlation_id): invocation = self._pending.get(correlation_id, None) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 7575ff573f..b9daf52f8a 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -3,8 +3,8 @@ from uuid import uuid4 from hazelcast import six -from hazelcast.errors import HazelcastError -from hazelcast.future import combine_futures +from hazelcast.errors import HazelcastError, HazelcastClientNotActiveError, TargetDisconnectedError +from hazelcast.future import combine_futures, ImmediateFuture from hazelcast.invocation import Invocation from hazelcast.protocol.codec import client_add_cluster_view_listener_codec from hazelcast.util import check_not_none @@ -54,52 +54,60 @@ def register_listener(self, registration_request, decode_register_response, enco futures = [] for connection in list(six.itervalues(self._connection_manager.active_connections)): - future = self._register_on_connection_async(registration_id, registration, connection) + future = self._register_on_connection(registration_id, registration, connection) futures.append(future) - try: - combine_futures(futures).result() - except: - self.deregister_listener(registration_id) - raise HazelcastError("Listener cannot be added") + def handler(f): + try: + f.result() + return registration_id + except: + self.deregister_listener(registration_id) + raise HazelcastError("Listener cannot be added") - return registration_id + return combine_futures(futures).continue_with(handler) def deregister_listener(self, user_registration_id): check_not_none(user_registration_id, "None user_registration_id is not allowed!") with self._registration_lock: - listener_registration = self._active_registrations.get(user_registration_id) + listener_registration = self._active_registrations.pop(user_registration_id, None) if not listener_registration: - return False + return ImmediateFuture(False) - successful = True - # Need to copy items to avoid getting runtime modification errors - for connection, event_registration in list(six.iteritems(listener_registration.connection_registrations)): - try: - server_registration_id = event_registration.server_registration_id - deregister_request = listener_registration.encode_deregister_request(server_registration_id) - invocation = Invocation(deregister_request, connection=connection) - self._invocation_service.invoke(invocation) - invocation.future.result() - self.remove_event_handler(event_registration.correlation_id) - listener_registration.connection_registrations.pop(connection) - except: - if connection.live: - successful = False - _logger.warning("Deregistration for listener with ID %s has failed to address %s ", - user_registration_id, "address", exc_info=True) - if successful: - self._active_registrations.pop(user_registration_id) + for connection, event_registration in six.iteritems(listener_registration.connection_registrations): + # Remove local handler + self.remove_event_handler(event_registration.correlation_id) + # The rest is for deleting the remote registration + server_registration_id = event_registration.server_registration_id + deregister_request = listener_registration.encode_deregister_request(server_registration_id) + if deregister_request is None: + # None means no remote registration (e.g. for backup acks) + continue + + invocation = Invocation(deregister_request, connection=connection, timeout=six.MAXSIZE, urgent=True) + self._invocation_service.invoke(invocation) + + def handler(f): + e = f.exception() + if e: + if isinstance(e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)): + return + + _logger.warning("Deregistration of listener with ID %s has failed for address %s", + user_registration_id, connection.remote_address) + + invocation.future.add_done_callback(handler) - return successful + listener_registration.connection_registrations.clear() + return ImmediateFuture(True) def handle_client_message(self, message, correlation_id): handler = self._event_handlers.get(correlation_id, None) if handler: handler(message) else: - _logger.warning("Got event message with unknown correlation id: %s", message) + _logger.debug("Got event message with unknown correlation id: %s", message) def add_event_handler(self, correlation_id, event_handler): self._event_handlers[correlation_id] = event_handler @@ -107,7 +115,7 @@ def add_event_handler(self, correlation_id, event_handler): def remove_event_handler(self, correlation_id): self._event_handlers.pop(correlation_id, None) - def _register_on_connection_async(self, user_registration_id, listener_registration, connection): + def _register_on_connection(self, user_registration_id, listener_registration, connection): registration_map = listener_registration.connection_registrations if connection in registration_map: @@ -115,7 +123,7 @@ def _register_on_connection_async(self, user_registration_id, listener_registrat registration_request = listener_registration.registration_request.copy() invocation = Invocation(registration_request, connection=connection, - event_handler=listener_registration.handler, response_handler=lambda m: m) + event_handler=listener_registration.handler, response_handler=lambda m: m, urgent=True) self._invocation_service.invoke(invocation) def callback(f): @@ -136,7 +144,7 @@ def callback(f): def _connection_added(self, connection): with self._registration_lock: for user_reg_id, listener_registration in six.iteritems(self._active_registrations): - self._register_on_connection_async(user_reg_id, listener_registration, connection) + self._register_on_connection(user_reg_id, listener_registration, connection) def _connection_removed(self, connection, _): with self._registration_lock: diff --git a/hazelcast/protocol/client_message.py b/hazelcast/protocol/client_message.py index eec7c49bd1..cb1bfe84cf 100644 --- a/hazelcast/protocol/client_message.py +++ b/hazelcast/protocol/client_message.py @@ -187,6 +187,11 @@ def drop_fragmentation_frame(self): self.start_frame = self.start_frame.next self._next_frame = self.start_frame + def __repr__(self): + message_type = self.get_message_type() + correlation_id = self.get_correlation_id() + return "InboundMessage(message_type=%s, correlation_id=%s)" % (message_type, correlation_id) + NULL_FRAME_BUF = bytearray(SIZE_OF_FRAME_LENGTH_AND_FLAGS) LE_INT.pack_into(NULL_FRAME_BUF, 0, SIZE_OF_FRAME_LENGTH_AND_FLAGS) diff --git a/hazelcast/proxy/list.py b/hazelcast/proxy/list.py index b3466b04ac..68026ac5d0 100644 --- a/hazelcast/proxy/list.py +++ b/hazelcast/proxy/list.py @@ -114,7 +114,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f item_removed_func (function): To be called when an item is deleted from this list. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ request = list_add_listener_codec.encode_request(self.name, include_value, self._is_smart) @@ -341,7 +341,7 @@ def remove_listener(self, registration_id): registration_id (str): Id of the listener to be deleted. Returns: - bool: ``True`` if the item listener is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if the item listener is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index ead4aefdac..3ac3560d43 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -82,7 +82,7 @@ def add_entry_listener(self, include_value=False, key=None, predicate=None, adde loaded_func (function): Function to be called when an entry is loaded from a map loader. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ flags = get_entry_listener_flags(ADDED=added_func, REMOVED=removed_func, UPDATED=updated_func, EVICTED=evicted_func, EXPIRED=expired_func, EVICT_ALL=evict_all_func, @@ -850,7 +850,7 @@ def remove_entry_listener(self, registration_id): registration_id (str): Id of registered listener. Returns: - bool: ``True`` if registration is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) @@ -1246,15 +1246,12 @@ def _on_destroy(self): super(MapFeatNearCache, self)._on_destroy() def _add_near_cache_invalidation_listener(self): - try: - codec = map_add_near_cache_invalidation_listener_codec - request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart) - self._invalidation_listener_id = self._register_listener( - request, lambda r: codec.decode_response(r), - lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), - lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation)) - except: - pass + codec = map_add_near_cache_invalidation_listener_codec + request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart) + self._invalidation_listener_id = self._register_listener( + request, lambda r: codec.decode_response(r), + lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation)).result() def _remove_near_cache_invalidation_listener(self): if self._invalidation_listener_id: diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index f0b4edc41b..d473ff3364 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -28,7 +28,7 @@ def add_entry_listener(self, include_value=False, key=None, added_func=None, rem clear_all_func (function): Function to be called when entries are cleared from map. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ if key: codec = multi_map_add_entry_listener_to_key_codec @@ -318,7 +318,7 @@ def remove_entry_listener(self, registration_id): registration_id (str): Id of registered listener. Returns: - bool: ``True`` if registration is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) diff --git a/hazelcast/proxy/queue.py b/hazelcast/proxy/queue.py index c3d91e1041..868c1632c8 100644 --- a/hazelcast/proxy/queue.py +++ b/hazelcast/proxy/queue.py @@ -79,7 +79,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f item_removed_func (function): Function to be called when an item is deleted from this set. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ codec = queue_add_listener_codec request = codec.encode_request(self.name, include_value, self._is_smart) @@ -306,7 +306,7 @@ def remove_listener(self, registration_id): registration_id (str): Id of the listener to be deleted. Returns: - bool: ``True`` if the item listener is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if the item listener is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index 34fdf07399..af5b4da671 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -42,7 +42,7 @@ def add_entry_listener(self, key=None, predicate=None, added_func=None, removed_ clear_all_func (function): Function to be called when entries are cleared from map. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ if key and predicate: codec = replicated_map_add_entry_listener_to_key_with_predicate_codec @@ -268,7 +268,7 @@ def remove_entry_listener(self, registration_id): registration_id (str): Id of registered listener. Returns: - bool: ``True`` if registration is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) diff --git a/hazelcast/proxy/set.py b/hazelcast/proxy/set.py index 5d9f98e3cc..f5f7831a9e 100644 --- a/hazelcast/proxy/set.py +++ b/hazelcast/proxy/set.py @@ -63,7 +63,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f item_removed_func (function): Function to be called when an item is deleted from this set. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ request = set_add_listener_codec.encode_request(self.name, include_value, self._is_smart) @@ -187,7 +187,7 @@ def remove_listener(self, registration_id): registration_id (str): Id of the listener to be deleted. Returns: - bool: ``True`` if the item listener is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if the item listener is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 2a720d75ee..f683ae01d4 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -25,7 +25,7 @@ def add_listener(self, on_message=None): on_message (function): Function to be called when a message is published. Returns: - str: A registration id which is used as a key to remove the listener. + hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener. """ codec = topic_add_message_listener_codec request = codec.encode_request(self.name, self._is_smart) @@ -62,6 +62,6 @@ def remove_listener(self, registration_id): registration_id (str): Registration id of the listener to be removed. Returns: - bool: ``True`` if the listener is removed, ``False`` otherwise. + hazelcast.future.Future[bool]: ``True`` if the listener is removed, ``False`` otherwise. """ return self._deregister_listener(registration_id) diff --git a/tests/proxy/distributed_objects_test.py b/tests/proxy/distributed_objects_test.py index ae57672bea..f2b56c5935 100644 --- a/tests/proxy/distributed_objects_test.py +++ b/tests/proxy/distributed_objects_test.py @@ -51,7 +51,7 @@ def test_get_distributed_objects_clears_destroyed_proxies(self): def test_add_distributed_object_listener_object_created(self): collector = event_collector() - self.client.add_distributed_object_listener(listener_func=collector) + self.client.add_distributed_object_listener(listener_func=collector).result() self.client.get_map("test-map") @@ -65,7 +65,7 @@ def assert_event(): def test_add_distributed_object_listener_object_destroyed(self): collector = event_collector() m = self.client.get_map("test-map") - self.client.add_distributed_object_listener(listener_func=collector) + self.client.add_distributed_object_listener(listener_func=collector).result() m.destroy() @@ -78,7 +78,7 @@ def assert_event(): def test_add_distributed_object_listener_object_created_and_destroyed(self): collector = event_collector() - self.client.add_distributed_object_listener(listener_func=collector) + self.client.add_distributed_object_listener(listener_func=collector).result() m = self.client.get_map("test-map") m.destroy() @@ -96,10 +96,10 @@ def assert_event(): def test_remove_distributed_object_listener(self): collector = event_collector() - reg_id = self.client.add_distributed_object_listener(listener_func=collector) + reg_id = self.client.add_distributed_object_listener(listener_func=collector).result() m = self.client.get_map("test-map") - response = self.client.remove_distributed_object_listener(reg_id) + response = self.client.remove_distributed_object_listener(reg_id).result() self.assertTrue(response) m.destroy() @@ -111,4 +111,4 @@ def assert_event(): self.assertTrueEventually(assert_event) def test_remove_invalid_distributed_object_listener(self): - self.assertFalse(self.client.remove_distributed_object_listener("invalid-reg-id")) + self.assertFalse(self.client.remove_distributed_object_listener("invalid-reg-id").result())