diff --git a/hazelcast/client.py b/hazelcast/client.py index e9a68e5dd0..bd04027b7a 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -6,7 +6,8 @@ from hazelcast.cluster import ClusterService, RandomLoadBalancer from hazelcast.config import ClientConfig, ClientProperties from hazelcast.connection import ConnectionManager, Heartbeat, DefaultAddressProvider, DefaultAddressTranslator -from hazelcast.invocation import InvocationService, ListenerService +from hazelcast.invocation import InvocationService +from hazelcast.listener import ListenerService from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN from hazelcast.partition import PartitionService from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \ @@ -61,8 +62,10 @@ def __init__(self, config=None): def _start(self): self.reactor.start() try: + self.invoker.start() self.cluster.start() self.heartbeat.start() + self.listener.start() self.partition_service.start() self.statistics.start() except: diff --git a/hazelcast/cluster.py b/hazelcast/cluster.py index 89e2b55a74..cb95b9b0a7 100644 --- a/hazelcast/cluster.py +++ b/hazelcast/cluster.py @@ -5,7 +5,6 @@ import uuid from hazelcast.exception import HazelcastError, AuthenticationError, TargetDisconnectedError -from hazelcast.invocation import ListenerInvocation from hazelcast.lifecycle import LIFECYCLE_STATE_CONNECTED, LIFECYCLE_STATE_DISCONNECTED from hazelcast.protocol.codec import client_add_membership_listener_codec, client_authentication_codec from hazelcast.util import get_possible_addresses, get_provider_addresses, calculate_version @@ -181,8 +180,7 @@ def _init_membership_listener(self, connection): def handler(m): client_add_membership_listener_codec.handle(m, self._handle_member, self._handle_member_list) - response = self._client.invoker.invoke( - ListenerInvocation(self._client.listener, request, handler, connection=connection)).result() + response = self._client.invoker.invoke_on_connection(request, connection, True, handler).result() registration_id = client_add_membership_listener_codec.decode_response(response)["response"] self.logger.debug("Registered membership listener with ID " + registration_id, extra=self._logger_extras) self._initial_list_fetched.wait() diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index b82b2774b6..cc8e85e218 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -18,7 +18,7 @@ class Invocation(object): sent_connection = None timer = None - def __init__(self, invocation_service, request, partition_id=-1, address=None, connection=None): + def __init__(self, invocation_service, request, partition_id=-1, address=None, connection=None, event_handler=None): self._event = threading.Event() self._invocation_timeout = invocation_service.invocation_timeout self.timeout = self._invocation_timeout + time.time() @@ -27,6 +27,7 @@ def __init__(self, invocation_service, request, partition_id=-1, address=None, c self.partition_id = partition_id self.request = request self.future = Future() + self.event_handler = event_handler def has_connection(self): return self.connection is not None @@ -55,90 +56,19 @@ def on_timeout(self): self.set_exception(TimeoutError("Request timed out after %d seconds." % self._invocation_timeout)) -class ListenerInvocation(Invocation): - # used for storing the original registration id across re-registrations - registration_id = None - - def __init__(self, invocation_service, request, event_handler, response_decoder=None, **kwargs): - Invocation.__init__(self, invocation_service, request, **kwargs) - self.event_handler = event_handler - self.response_decoder = response_decoder - - -class ListenerService(object): - logger = logging.getLogger("HazelcastClient.ListenerService") - - def __init__(self, client): - self._client = client - self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} - self.registrations = {} - self.invocation_timeout = self._init_invocation_timeout() - pass - - def start_listening(self, request, event_handler, decode_add_listener, key=None): - if key: - partition_id = self._client.partition_service.get_partition_id(key) - invocation = ListenerInvocation(self, request, event_handler, decode_add_listener, - partition_id=partition_id) - else: - invocation = ListenerInvocation(self, request, event_handler, decode_add_listener) - - future = self._client.invoker.invoke(invocation) - registration_id = decode_add_listener(future.result()) - invocation.registration_id = registration_id # store the original registration id for future reference - self.registrations[registration_id] = (registration_id, request.get_correlation_id()) - return registration_id - - def stop_listening(self, registration_id, encode_remove_listener): - try: - actual_id, correlation_id = self.registrations.pop(registration_id) - self._client.invoker._remove_event_handler(correlation_id) - # TODO: should be invoked on same node as registration? - self._client.invoker.invoke_on_random_target(encode_remove_listener(actual_id)).result() - return True - except KeyError: - return False - - def re_register_listener(self, invocation): - registration_id = invocation.registration_id - new_invocation = ListenerInvocation(self, invocation.request, invocation.event_handler, - invocation.response_decoder, partition_id=invocation.partition_id) - new_invocation.registration_id = registration_id - - # re-send the request - def callback(f): - if f.is_success(): - new_id = new_invocation.response_decoder(f.result()) - self.logger.debug("Re-registered listener with id %s and new_id %s for request %s", - registration_id, new_id, new_invocation.request, extra=self._logger_extras) - self.registrations[registration_id] = (new_id, new_invocation.request.get_correlation_id()) - else: - self.logger.warning("Re-registration for listener with id %s failed.", registration_id, exc_info=True, - extra=self._logger_extras) - - self.logger.debug("Re-registering listener %s for request %s", registration_id, new_invocation.request, - extra=self._logger_extras) - self._client.invoker.invoke(new_invocation).add_done_callback(callback) - - def _init_invocation_timeout(self): - invocation_timeout = self._client.properties.get_seconds_positive_or_default( - self._client.properties.INVOCATION_TIMEOUT_SECONDS) - return invocation_timeout - - class InvocationService(object): logger = logging.getLogger("HazelcastClient.InvocationService") def __init__(self, client): self._pending = {} - self._event_handlers = {} self._next_correlation_id = AtomicInteger(1) self._client = client self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} self._event_queue = queue.Queue() self._is_redo_operation = client.config.network_config.redo_operation - self._invocation_retry_pause = self._init_invocation_retry_pause() + self.invocation_retry_pause = self._init_invocation_retry_pause() self.invocation_timeout = self._init_invocation_timeout() + self._listener_service = None if client.config.network_config.smart_routing: self.invoke = self.invoke_smart @@ -148,8 +78,12 @@ def __init__(self, client): self._client.connection_manager.add_listener(on_connection_closed=self.cleanup_connection) client.heartbeat.add_listener(on_heartbeat_stopped=self._heartbeat_stopped) - def invoke_on_connection(self, message, connection, ignore_heartbeat=False): - return self.invoke(Invocation(self, message, connection=connection), ignore_heartbeat) + def start(self): + self._listener_service = self._client.listener + + def invoke_on_connection(self, message, connection, ignore_heartbeat=False, event_handler=None): + return self.invoke(Invocation(self, message, connection=connection, event_handler=event_handler), + ignore_heartbeat) def invoke_on_partition(self, message, partition_id, invocation_timeout=None): invocation = Invocation(self, message, partition_id=partition_id) @@ -178,7 +112,8 @@ def invoke_smart(self, invocation, ignore_heartbeat=False): self._send_to_address(invocation, addr) elif invocation.has_address(): if not self._is_member(invocation.address): - self._handle_exception(invocation, TargetNotMemberError("Target '{}' is not a member.".format(invocation.address))) + self._handle_exception(invocation, TargetNotMemberError("Target '{}' is not a member.".format + (invocation.address))) else: self._send_to_address(invocation, invocation.address) else: # send to random address @@ -202,11 +137,6 @@ def cleanup_connection(self, connection, cause): if invocation.sent_connection == connection: self._handle_exception(invocation, cause) - if self._client.lifecycle.is_live: - for correlation_id, invocation in six.iteritems(dict(self._event_handlers)): - if invocation.sent_connection == connection and invocation.connection is None: - self._client.listener.re_register_listener(invocation) - def _init_invocation_retry_pause(self): invocation_retry_pause = self._client.properties.get_seconds_positive_or_default( self._client.properties.INVOCATION_RETRY_PAUSE_MILLIS) @@ -223,9 +153,6 @@ def _heartbeat_stopped(self, connection): self._handle_exception(invocation, TargetDisconnectedError("%s has stopped heart beating." % connection)) - def _remove_event_handler(self, correlation_id): - self._event_handlers.pop(correlation_id) - def _send_to_address(self, invocation, address, ignore_heartbeat=False): try: conn = self._client.connection_manager.connections[address] @@ -252,8 +179,8 @@ def _send(self, invocation, connection, ignore_heartbeat): if not invocation.timer: invocation.timer = self._client.reactor.add_timer_absolute(invocation.timeout, invocation.on_timeout) - if isinstance(invocation, ListenerInvocation): - self._event_handlers[correlation_id] = invocation + if invocation.event_handler is not None: + self._listener_service.add_event_handler(correlation_id, invocation.event_handler) self.logger.debug("Sending %s to %s", message, connection, extra=self._logger_extras) @@ -265,17 +192,14 @@ def _send(self, invocation, connection, ignore_heartbeat): try: connection.send_message(message) except IOError as e: + if invocation.event_handler is not None: + self._listener_service.remove_event_handler(correlation_id) self._handle_exception(invocation, e) def _handle_client_message(self, message): correlation_id = message.get_correlation_id() if message.has_flags(LISTENER_FLAG): - if correlation_id not in self._event_handlers: - self.logger.warning("Got event message with unknown correlation id: %s", message, - extra=self._logger_extras) - return - invocation = self._event_handlers[correlation_id] - self._handle_event(invocation, message) + self._listener_service.handle_client_message(message) return if correlation_id not in self._pending: self.logger.warning("Got message with unknown correlation id: %s", message, extra=self._logger_extras) @@ -320,8 +244,8 @@ def _try_retry(self, invocation): invoke_func = functools.partial(self.invoke, invocation) self.logger.debug("Rescheduling request %s to be retried in %s seconds", invocation.request, - self._invocation_retry_pause, extra=self._logger_extras) - self._client.reactor.add_timer(self._invocation_retry_pause, invoke_func) + self.invocation_retry_pause, extra=self._logger_extras) + self._client.reactor.add_timer(self.invocation_retry_pause, invoke_func) return True def _is_member(self, address): diff --git a/hazelcast/listener.py b/hazelcast/listener.py new file mode 100644 index 0000000000..1314d9e8b3 --- /dev/null +++ b/hazelcast/listener.py @@ -0,0 +1,165 @@ +import logging +import threading +from uuid import uuid4 + +from hazelcast.exception import OperationTimeoutError, HazelcastError +from hazelcast.util import current_time_in_millis, check_not_none +from time import sleep + + +class ListenerRegistration(object): + def __init__(self, registration_request, decode_register_response, encode_deregister_request, handler): + self.registration_request = registration_request + self.decode_register_response = decode_register_response + self.encode_deregister_request = encode_deregister_request + self.handler = handler + self.connection_registrations = {} # Dict of Connection, EventRegistration + + +class EventRegistration(object): + def __init__(self, server_registration_id, correlation_id): + self.server_registration_id = server_registration_id + self.correlation_id = correlation_id + + +class ListenerService(object): + logger = logging.getLogger("HazelcastClient.ListenerService") + + def __init__(self, client): + self._client = client + self._invocation_service = client.invoker + self.is_smart = client.config.network_config.smart_routing + self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} + self._active_registrations = {} # Dict of user_registration_id, ListenerRegistration + self._registration_lock = threading.RLock() + self._event_handlers = {} + + def try_sync_connect_to_all_members(self): + cluster_service = self._client.cluster + start_millis = current_time_in_millis() + while True: + last_failed_member = None + last_exception = None + for member in cluster_service.members: + try: + self._client.connection_manager.get_or_connect(member.address).result() + except Exception as e: + last_failed_member = member + last_exception = e + if last_exception is None: + break + self.time_out_or_sleep_before_next_try(start_millis, last_failed_member, last_exception) + if not self._client.lifecycle.is_live(): + break + + def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, last_exception): + now_in_millis = current_time_in_millis() + elapsed_millis = now_in_millis - start_millis + invocation_time_out_millis = self._invocation_service.invocation_timeout * 1000 + timed_out = elapsed_millis > invocation_time_out_millis + if timed_out: + raise OperationTimeoutError\ + ("Registering listeners is timed out. Last failed member: %s, Current time: %s, Start time: %s, " + "Client invocation timeout: %s, Elapsed time: %s ms, Cause: %s", last_failed_member, now_in_millis, + start_millis, invocation_time_out_millis, elapsed_millis, last_exception.args[0]) + else: + sleep(self._invocation_service.invocation_retry_pause) # sleep before next try + + def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): + if self.is_smart: + self.try_sync_connect_to_all_members() + + with self._registration_lock: + user_registration_id = str(uuid4()) + listener_registration = ListenerRegistration(registration_request, decode_register_response, + encode_deregister_request, handler) + self._active_registrations[user_registration_id] = listener_registration + + active_connections = self._client.connection_manager.connections + for connection in active_connections.values(): + try: + self.register_listener_on_connection_async(user_registration_id, listener_registration, connection)\ + .result() + except: + if connection.live(): + self.deregister_listener(user_registration_id) + raise HazelcastError("Listener cannot be added ") + return user_registration_id + + def register_listener_on_connection_async(self, user_registration_id, listener_registration, connection): + registration_map = listener_registration.connection_registrations + + if connection in registration_map: + return + + registration_request = listener_registration.registration_request + future = self._invocation_service.invoke_on_connection(registration_request, connection, + event_handler=listener_registration.handler) + + def callback(f): + try: + response = f.result() + server_registration_id = listener_registration.decode_register_response(response) + correlation_id = registration_request.get_correlation_id() + registration = EventRegistration(server_registration_id, correlation_id) + registration_map[connection] = registration + except Exception as e: + if connection.live(): + self.logger.warning("Listener %s can not be added to a new connection: %s, reason: %s", + user_registration_id, connection, e.args[0], extra=self._logger_extras) + + future.add_done_callback(callback) + return future + + def deregister_listener(self, user_registration_id): + check_not_none(user_registration_id, "None userRegistrationId is not allowed!") + + with self._registration_lock: + listener_registration = self._active_registrations.get(user_registration_id) + if listener_registration is None: + return False + successful = True + for connection, event_registration in list(listener_registration.connection_registrations.items()): + try: + server_registration_id = event_registration.server_registration_id + deregister_request = listener_registration.encode_deregister_request(server_registration_id) + self._invocation_service.invoke_on_connection(deregister_request, connection).result() + self.remove_event_handler(event_registration.correlation_id) + listener_registration.connection_registrations.pop(connection) + except: + if connection.live(): + successful = False + self.logger.warning("Deregistration for listener with ID %s has failed to address %s ", + user_registration_id, "address", exc_info=True, extra=self._logger_extras) + if successful: + self._active_registrations.pop(user_registration_id) + return successful + + def connection_added(self, connection): + with self._registration_lock: + for user_reg_id, listener_registration in self._active_registrations.items(): + self.register_listener_on_connection_async(user_reg_id, listener_registration, connection) + + def connection_removed(self, connection, _): + with self._registration_lock: + for listener_registration in self._active_registrations.values(): + event_registration = listener_registration.connection_registrations.pop(connection, None) + if event_registration is not None: + self.remove_event_handler(event_registration.correlation_id) + + def start(self): + self._client.connection_manager.add_listener(self.connection_added, self.connection_removed) + + def handle_client_message(self, message): + correlation_id = message.get_correlation_id() + if correlation_id not in self._event_handlers: + self.logger.warning("Got event message with unknown correlation id: %s", message, extra=self._logger_extras) + else: + event_handler = self._event_handlers.get(correlation_id) + event_handler(message) + + def add_event_handler(self, correlation_id, event_handler): + self._event_handlers[correlation_id] = event_handler + + def remove_event_handler(self, correlation_id): + self._event_handlers.pop(correlation_id, None) \ No newline at end of file diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index c82e9b7e80..deb3b5fbd3 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -34,8 +34,9 @@ def __init__(self, client, service_name, name): self.logger = logging.getLogger("HazelcastClient.%s(%s)" % (type(self).__name__, name)) self._to_object = client.serialization_service.to_object self._to_data = client.serialization_service.to_data - self._start_listening = client.listener.start_listening - self._stop_listening = client.listener.stop_listening + self._register_listener = client.listener.register_listener + self._deregister_listener = client.listener.deregister_listener + self._is_smart = client.listener.is_smart def destroy(self): """ diff --git a/hazelcast/proxy/list.py b/hazelcast/proxy/list.py index 858913f7fa..4027e39156 100644 --- a/hazelcast/proxy/list.py +++ b/hazelcast/proxy/list.py @@ -96,7 +96,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f :param item_removed_func: Function to be called when an item is deleted from this list (optional). :return: (str), a registration id which is used as a key to remove the listener. """ - request = list_add_listener_codec.encode_request(self.name, include_value, False) + request = list_add_listener_codec.encode_request(self.name, include_value, self._is_smart) def handle_event_item(item, uuid, event_type): item = item if include_value else None @@ -110,10 +110,9 @@ def handle_event_item(item, uuid, event_type): if item_removed_func: item_removed_func(item_event) - return self._start_listening(request, - lambda m: list_add_listener_codec.handle(m, handle_event_item), - lambda r: list_add_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, lambda r: list_add_listener_codec.decode_response(r)['response'], + lambda reg_id: list_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: list_add_listener_codec.handle(m, handle_event_item)) def clear(self): """ @@ -254,7 +253,7 @@ def remove_listener(self, registration_id): :param registration_id: (str), id of the listener to be deleted. :return: (bool), ``true`` if the item listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, lambda i: list_remove_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def retain_all(self, items): """ diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index aa2fa9dfbe..306404e3b1 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -79,25 +79,24 @@ def add_entry_listener(self, include_value=False, key=None, predicate=None, adde .. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates. """ flags = get_entry_listener_flags(added=added_func, removed=removed_func, updated=updated_func, - evicted=evicted_func, evict_all=evict_all_func, clear_all=clear_all_func, merged=merged_func, - expired=expired_func) + evicted=evicted_func, evict_all=evict_all_func, clear_all=clear_all_func, + merged=merged_func, expired=expired_func) if key and predicate: key_data = self._to_data(key) predicate_data = self._to_data(predicate) - request = map_add_entry_listener_to_key_with_predicate_codec.encode_request(self.name, key_data, - predicate_data, include_value, - flags, False) + request = map_add_entry_listener_to_key_with_predicate_codec.encode_request( + self.name, key_data, predicate_data, include_value, flags, self._is_smart) elif key and not predicate: key_data = self._to_data(key) - request = map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, include_value, flags, - False) + request = map_add_entry_listener_to_key_codec.encode_request( + self.name, key_data, include_value, flags, self._is_smart) elif not key and predicate: predicate = self._to_data(predicate) - request = map_add_entry_listener_with_predicate_codec.encode_request(self.name, predicate, include_value, - flags, False) + request = map_add_entry_listener_with_predicate_codec.encode_request( + self.name, predicate, include_value, flags, self._is_smart) else: - request = map_add_entry_listener_codec.encode_request(self.name, include_value, flags, False) + request = map_add_entry_listener_codec.encode_request(self.name, include_value, flags, self._is_smart) def handle_event_entry(**_kwargs): event = EntryEvent(self._to_object, **_kwargs) @@ -118,8 +117,9 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.expired: expired_func(event) - return self._start_listening(request, lambda m: map_add_entry_listener_codec.handle(m, handle_event_entry), - lambda r: map_add_entry_listener_codec.decode_response(r)['response']) + return self._register_listener(request, lambda r: map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: map_add_entry_listener_codec.handle(m, handle_event_entry)) def add_index(self, attribute, ordered=False): """ @@ -640,8 +640,7 @@ def remove_entry_listener(self, registration_id): :param registration_id: (str), id of registered listener. :return: (bool), ``true`` if registration is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: map_remove_entry_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def replace(self, key, value): """ @@ -922,15 +921,14 @@ def _on_destroy(self): super(MapFeatNearCache, self)._on_destroy() def _add_near_cache_invalidation_listener(self): - def handle(message): - map_add_near_cache_entry_listener_codec.handle(message, self._handle_invalidation, self._handle_batch_invalidation) - - def handle_decode(message): - return map_add_near_cache_entry_listener_codec.decode_response(message)['response'] - try: - request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, False) - self._invalidation_listener_id = self._start_listening(request, handle, handle_decode) + request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, + self._is_smart) + self._invalidation_listener_id = self._register_listener( + request, lambda r: map_add_near_cache_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: map_add_near_cache_entry_listener_codec.handle(m, self._handle_invalidation, + self._handle_batch_invalidation)) except: self.logger.severe("-----------------\n Near Cache is not initialized!!! \n-----------------") diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index 7c579d5ef3..bcbd6fdbed 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -30,12 +30,11 @@ def add_entry_listener(self, include_value=False, key=None, added_func=None, rem """ if key: key_data = self._to_data(key) - request = multi_map_add_entry_listener_to_key_codec.encode_request(name=self.name, key=key_data, - include_value=include_value, - local_only=False) + request = multi_map_add_entry_listener_to_key_codec.encode_request( + name=self.name, key=key_data, include_value=include_value, local_only=False) else: - request = multi_map_add_entry_listener_codec.encode_request(name=self.name, include_value=include_value, - local_only=False) + request = multi_map_add_entry_listener_codec.encode_request( + name=self.name, include_value=include_value, local_only=False) def handle_event_entry(**_kwargs): event = EntryEvent(self._to_object, **_kwargs) @@ -46,11 +45,10 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.clear_all and clear_all_func: clear_all_func(event) - return self._start_listening(request, - lambda m: multi_map_add_entry_listener_codec.handle(m, - handle_event_entry), - lambda r: multi_map_add_entry_listener_codec.decode_response(r)[ - 'response']) + return self._register_listener( + request, lambda r: multi_map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: multi_map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: multi_map_add_entry_listener_codec.handle(m, handle_event_entry)) def contains_key(self, key): """ @@ -257,8 +255,7 @@ def remove_entry_listener(self, registration_id): :param registration_id: (str), id of registered listener. :return: (bool), ``true`` if registration is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: multi_map_remove_entry_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def size(self): """ diff --git a/hazelcast/proxy/queue.py b/hazelcast/proxy/queue.py index d6b5b41ddd..d7f918ff0e 100644 --- a/hazelcast/proxy/queue.py +++ b/hazelcast/proxy/queue.py @@ -73,7 +73,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f :param item_removed_func: Function to be called when an item is deleted from this set (optional). :return: (str), a registration id which is used as a key to remove the listener. """ - request = queue_add_listener_codec.encode_request(self.name, include_value, False) + request = queue_add_listener_codec.encode_request(self.name, include_value, self._is_smart) def handle_event_item(item, uuid, event_type): item = item if include_value else None @@ -87,10 +87,9 @@ def handle_event_item(item, uuid, event_type): if item_removed_func: item_removed_func(item_event) - return self._start_listening(request, - lambda m: queue_add_listener_codec.handle(m, handle_event_item), - lambda r: queue_add_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, lambda r: queue_add_listener_codec.decode_response(r)['response'], + lambda reg_id: queue_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: queue_add_listener_codec.handle(m, handle_event_item)) def clear(self): """ @@ -247,7 +246,7 @@ def remove_listener(self, registration_id): :param registration_id: (str), id of the listener to be deleted. :return: (bool), ``true`` if the item listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, lambda i: queue_remove_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def retain_all(self, items): """ diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index e1503b0c6c..3c2f9db757 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -44,18 +44,17 @@ def add_entry_listener(self, key=None, predicate=None, added_func=None, removed_ if key and predicate: key_data = self._to_data(key) predicate_data = self._to_data(predicate) - request = replicated_map_add_entry_listener_to_key_with_predicate_codec.encode_request(self.name, key_data, - predicate_data, - False) + request = replicated_map_add_entry_listener_to_key_with_predicate_codec.encode_request( + self.name, key_data, predicate_data, self._is_smart) elif key and not predicate: key_data = self._to_data(key) - request = replicated_map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, False) + request = replicated_map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, self._is_smart) elif not key and predicate: predicate = self._to_data(predicate) - request = replicated_map_add_entry_listener_with_predicate_codec.encode_request(self.name, predicate, - False) + request = replicated_map_add_entry_listener_with_predicate_codec.encode_request( + self.name, predicate, self._is_smart) else: - request = replicated_map_add_entry_listener_codec.encode_request(self.name, False) + request = replicated_map_add_entry_listener_codec.encode_request(self.name, self._is_smart) def handle_event_entry(**_kwargs): event = EntryEvent(self._to_object, **_kwargs) @@ -70,11 +69,10 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.clear_all and clear_all_func: clear_all_func(event) - return self._start_listening(request, - lambda m: replicated_map_add_entry_listener_codec.handle(m, - handle_event_entry), - lambda r: replicated_map_add_entry_listener_codec.decode_response(r)[ - 'response']) + return self._register_listener( + request, lambda r: replicated_map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: replicated_map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: replicated_map_add_entry_listener_codec.handle(m, handle_event_entry)) def clear(self): """ @@ -206,8 +204,7 @@ def remove_entry_listener(self, registration_id): :param registration_id: (str), id of registered listener. :return: (bool), ``true`` if registration is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: replicated_map_remove_entry_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def size(self): """ diff --git a/hazelcast/proxy/set.py b/hazelcast/proxy/set.py index ad6f2eb000..2416fa2a3a 100644 --- a/hazelcast/proxy/set.py +++ b/hazelcast/proxy/set.py @@ -55,7 +55,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f :param item_removed_func: Function to be called when an item is deleted from this set (optional). :return: (str), a registration id which is used as a key to remove the listener. """ - request = set_add_listener_codec.encode_request(self.name, include_value, False) + request = set_add_listener_codec.encode_request(self.name, include_value, self._is_smart) def handle_event_item(item, uuid, event_type): item = item if include_value else None @@ -69,10 +69,9 @@ def handle_event_item(item, uuid, event_type): if item_removed_func: item_removed_func(item_event) - return self._start_listening(request, - lambda m: set_add_listener_codec.handle(m, handle_event_item), - lambda r: set_add_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, lambda r: set_add_listener_codec.decode_response(r)['response'], + lambda reg_id: set_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: set_add_listener_codec.handle(m, handle_event_item)) def clear(self): """ @@ -153,7 +152,7 @@ def remove_listener(self, registration_id): :param registration_id: (str), id of the listener to be deleted. :return: (bool), ``true`` if the item listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, lambda i: set_remove_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def retain_all(self, items): """ diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index f54a68c1cc..fb43e68935 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -24,17 +24,17 @@ def add_listener(self, on_message=None): :param on_message: (Function), function to be called when a message is published. :return: (str), a registration id which is used as a key to remove the listener. """ - request = topic_add_message_listener_codec.encode_request(self.name, False) + request = topic_add_message_listener_codec.encode_request(self.name, self._is_smart) def handle(item, publish_time, uuid): member = self._client.cluster.get_member_by_uuid(uuid) item_event = TopicMessage(self.name, item, publish_time, member, self._to_object) on_message(item_event) - return self._start_listening(request, - lambda m: topic_add_message_listener_codec.handle(m, handle), - lambda r: topic_add_message_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener( + request, lambda r: topic_add_message_listener_codec.decode_response(r)['response'], + lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, reg_id), + lambda m: topic_add_message_listener_codec.handle(m, handle)) def publish(self, message): """ @@ -53,5 +53,4 @@ def remove_listener(self, registration_id): :param registration_id: (str), registration id of the listener to be removed. :return: (bool), ``true`` if the listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: topic_remove_message_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) diff --git a/run-tests.sh b/run-tests.sh index 42b1d07dca..65bf860242 100644 --- a/run-tests.sh +++ b/run-tests.sh @@ -6,7 +6,7 @@ else USER="" fi -HZ_VERSION="3.12.1-SNAPSHOT" +HZ_VERSION="3.12.1" HAZELCAST_TEST_VERSION=${HZ_VERSION} HAZELCAST_ENTERPRISE_TEST_VERSION=${HZ_VERSION} diff --git a/start-rc.sh b/start-rc.sh index 9f509d0d1d..0d21ad7f98 100644 --- a/start-rc.sh +++ b/start-rc.sh @@ -6,7 +6,7 @@ else USER="" fi -HZ_VERSION="3.12.1-SNAPSHOT" +HZ_VERSION="3.12.1" HAZELCAST_TEST_VERSION=${HZ_VERSION} HAZELCAST_ENTERPRISE_TEST_VERSION=${HZ_VERSION} diff --git a/tests/listener_test.py b/tests/listener_test.py new file mode 100644 index 0000000000..3a9adb730e --- /dev/null +++ b/tests/listener_test.py @@ -0,0 +1,86 @@ +from tests.base import HazelcastTestCase +from tests.util import configure_logging, random_string, event_collector, generate_key_owned_by_instance +from hazelcast.config import ClientConfig + + +class ListenerTest(HazelcastTestCase): + def setUp(self): + configure_logging() + self.rc = self.create_rc() + self.cluster = self.create_cluster(self.rc, None) + self.m1 = self.cluster.start_member() + self.m2 = self.cluster.start_member() + self.m3 = self.cluster.start_member() + self.client_config = ClientConfig() + self.collector = event_collector() + + def tearDown(self): + self.shutdown_all_clients() + self.rc.exit() + + # -------------------------- test_remove_member ----------------------- # + def test_smart_listener_remove_member(self): + self.client_config.network_config.smart_routing = True + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + key_m1 = generate_key_owned_by_instance(client, self.m1.address) + map.put(key_m1, 'value1') + map.add_entry_listener(updated_func=self.collector) + self.m1.shutdown() + map.put(key_m1, 'value2') + + def assert_event(): + self.assertEqual(1, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + def test_non_smart_listener_remove_connected_member(self): + self.client_config.network_config.smart_routing = False + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + + owner_address = client.cluster.owner_connection_address + + # Test if listener re-registers properly when owner connection is removed. + members = [self.m1, self.m2, self.m3] + for m in members: + if m.address == owner_address: + m.shutdown() + members.remove(m) + + # There are 2 members left. We execute a put operation to each of their partitions + # to test that non-smart listener works in both local and non-local cases. + for m in members: + generated_key = generate_key_owned_by_instance(client, m.address) + map.put(generated_key, 'value') + + def assert_event(): + self.assertEqual(2, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + # -------------------------- test_add_member ----------------------- # + def test_smart_listener_add_member(self): + self.client_config.network_config.smart_routing = True + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + m4 = self.cluster.start_member() + key_m4 = generate_key_owned_by_instance(client, m4.address) + map.put(key_m4, 'value') + + def assert_event(): + self.assertEqual(1, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + def test_non_smart_listener_add_member(self): + self.client_config.network_config.smart_routing = True + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + m4 = self.cluster.start_member() + key_m4 = generate_key_owned_by_instance(client, m4.address) + map.put(key_m4, 'value') + + def assert_event(): + self.assertEqual(1, len(self.collector.events)) + self.assertTrueEventually(assert_event) \ No newline at end of file diff --git a/tests/proxy/map_test.py b/tests/proxy/map_test.py index 971ecfa128..3b5fdcda96 100644 --- a/tests/proxy/map_test.py +++ b/tests/proxy/map_test.py @@ -408,6 +408,12 @@ def test_remove_entry_listener(self): time.sleep(1) self.assertEqual(len(collector.events), 1) + def test_remove_entry_listener_with_none_id(self): + with self.assertRaises(AssertionError) as cm: + self.map.remove_entry_listener(None) + e = cm.exception + self.assertEqual(e.args[0],"None userRegistrationId is not allowed!") + def test_replace(self): self.map.put("key", "value") diff --git a/tests/reconnect_test.py b/tests/reconnect_test.py index 342c26b7fb..4edfb508c2 100644 --- a/tests/reconnect_test.py +++ b/tests/reconnect_test.py @@ -2,7 +2,7 @@ from time import sleep from hazelcast import ClientConfig -from hazelcast.exception import HazelcastError +from hazelcast.exception import HazelcastError, TargetDisconnectedError from hazelcast.lifecycle import LIFECYCLE_STATE_DISCONNECTED, LIFECYCLE_STATE_CONNECTED from hazelcast.util import AtomicInteger from tests.base import HazelcastTestCase @@ -76,7 +76,12 @@ def test_listener_re_register(self): def assert_events(): if client.lifecycle.is_live: - map.put("key-%d" % count.get_and_increment(), "value").result() + while True: + try: + map.put("key-%d" % count.get_and_increment(), "value").result() + break + except TargetDisconnectedError: + pass self.assertGreater(len(collector.events), 0) else: self.fail("Client disconnected...") diff --git a/tests/smart_listener_test.py b/tests/smart_listener_test.py new file mode 100644 index 0000000000..d4bd41c73a --- /dev/null +++ b/tests/smart_listener_test.py @@ -0,0 +1,78 @@ +from tests.base import HazelcastTestCase +from tests.util import configure_logging, random_string, event_collector +from hazelcast.config import ClientConfig +from time import sleep + + +class SmartListenerTest(HazelcastTestCase): + @classmethod + def setUpClass(cls): + configure_logging() + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, None) # Default config + cls.m1 = cls.cluster.start_member() + cls.m2 = cls.cluster.start_member() + cls.m3 = cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + client_config = ClientConfig() + client_config.network_config.smart_routing = True + self.client = self.create_client(client_config) + self.collector = event_collector() + + def tearDown(self): + self.shutdown_all_clients() + + # -------------------------- test_local_only ----------------------- # + def test_list_smart_listener_local_only(self): + list = self.client.get_list(random_string()).blocking() + list.add_listener(item_added_func=self.collector) + list.add('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_map_smart_listener_local_only(self): + map = self.client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + map.put('key', 'value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_multimap_smart_listener_local_only(self): + multimap = self.client.get_map(random_string()).blocking() + multimap.add_entry_listener(added_func=self.collector) + multimap.put('key', 'value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_queue_smart_listener_local_only(self): + queue = self.client.get_queue(random_string()).blocking() + queue.add_listener(item_added_func=self.collector) + queue.add('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_replicated_map_smart_listener_local_only(self): + replicated_map = self.client.get_replicated_map(random_string()).blocking() + replicated_map.add_entry_listener(added_func=self.collector) + replicated_map.put('key', 'value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_set_smart_listener_local_only(self): + set = self.client.get_set(random_string()).blocking() + set.add_listener(item_added_func=self.collector) + set.add('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_topic_smart_listener_local_only(self): + topic = self.client.get_topic(random_string()).blocking() + topic.add_listener(on_message=self.collector) + topic.publish('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events))