From 3ab47d4ecca8cf93bc7fe605526afa27453cea4d Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Wed, 24 Jul 2019 10:36:02 +0300 Subject: [PATCH 01/14] Draft commit for smart-listener. Not yet tested --- hazelcast/invocation.py | 216 +++++++++++++++++++++++++++++++++++----- 1 file changed, 193 insertions(+), 23 deletions(-) diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index b82b2774b6..db7b047c7c 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -4,21 +4,22 @@ import functools from hazelcast.exception import create_exception, HazelcastInstanceNotActiveError, is_retryable_error, TimeoutError, \ - AuthenticationError, TargetDisconnectedError, HazelcastClientNotActiveException, TargetNotMemberError + AuthenticationError, TargetDisconnectedError, HazelcastClientNotActiveException, TargetNotMemberError, \ + HazelcastError, OperationTimeoutError from hazelcast.future import Future from hazelcast.lifecycle import LIFECYCLE_STATE_CONNECTED from hazelcast.protocol.client_message import LISTENER_FLAG from hazelcast.protocol.custom_codec import EXCEPTION_MESSAGE_TYPE, ErrorCodec -from hazelcast.util import AtomicInteger +from hazelcast.util import AtomicInteger, check_not_none, current_time_in_millis from hazelcast.six.moves import queue from hazelcast import six - +from uuid import uuid4 class Invocation(object): sent_connection = None timer = None - def __init__(self, invocation_service, request, partition_id=-1, address=None, connection=None): + def __init__(self, invocation_service, request, partition_id=-1, address=None, connection=None, event_handler=None): self._event = threading.Event() self._invocation_timeout = invocation_service.invocation_timeout self.timeout = self._invocation_timeout + time.time() @@ -27,6 +28,7 @@ def __init__(self, invocation_service, request, partition_id=-1, address=None, c self.partition_id = partition_id self.request = request self.future = Future() + self.event_handler = event_handler def has_connection(self): return self.connection is not None @@ -55,6 +57,22 @@ def on_timeout(self): self.set_exception(TimeoutError("Request timed out after %d seconds." % self._invocation_timeout)) +class ListenerRegistration(object): + def __init__(self, registration_request, decode_register_response, encode_deregister_request, handler): + self.registration_request = registration_request #of type ClientMessage (already encoded) + self.decode_register_response = decode_register_response #take ClientMessage return String + self.encode_deregister_request = encode_deregister_request #take String return ClientMessage + self.handler = handler + self.connection_registrations = {} #Map ) + + +class EventRegistration(object): + def __init__(self, server_registration_id, correlation_id): + self.server_registration_id = server_registration_id + self.correlation_id = correlation_id + # self.connection = connection --> buna gerek yok bence + +""" class ListenerInvocation(Invocation): # used for storing the original registration id across re-registrations registration_id = None @@ -63,6 +81,7 @@ def __init__(self, invocation_service, request, event_handler, response_decoder= Invocation.__init__(self, invocation_service, request, **kwargs) self.event_handler = event_handler self.response_decoder = response_decoder +""" class ListenerService(object): @@ -70,11 +89,154 @@ class ListenerService(object): def __init__(self, client): self._client = client + self._invocation_service = client.invoker + self.is_smart = client.config.network_config.smart_routing self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} - self.registrations = {} - self.invocation_timeout = self._init_invocation_timeout() + self.active_registrations = {} # Map + self._registration_lock = threading.RLock() + self._event_handlers = {} + + def try_sync_connect_to_all_members(self): + cluster_service = self._client.cluster + start_millis = current_time_in_millis() + while True: + last_failed_member = None + last_exception = None + for member in cluster_service.members(): + try: + self._client.connection_manager.get_or_connect(member.address).result() #! + except Exception as e: + last_failed_member = member + last_exception = e + if last_exception is None: + break + self.time_out_or_sleep_before_next_try(start_millis, last_failed_member, last_exception) + if not self._client.lifecycle.is_live(): + break + + def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, last_exception): + now_in_millis = current_time_in_millis() + elapsed_millis = now_in_millis - start_millis + invocation_time_out_millis = self._invocation_service.invocation_timeout * 1000 + timed_out = elapsed_millis > invocation_time_out_millis + if timed_out: + raise OperationTimeoutError("Registering listeners is timed out." + + " Last failed member : " + str(last_failed_member) + ", " + + " Current time: " + str(now_in_millis) + ", " + + " Start time : " + str(start_millis) + ", " + + " Client invocation timeout : " + str(invocation_time_out_millis) + " ms, " + + " Elapsed time : " + str(elapsed_millis) + " ms. ", last_exception) + else: + self.sleep_before_next_try() + + def sleep_before_next_try(self): + self._invocation_service.invocation_retry_pause() pass + def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + if self.is_smart: + self.try_sync_connect_to_all_members() + + with self._registration_lock: + user_registration_id = str(uuid4()) + listener_registration = ListenerRegistration(registration_request, decode_register_response, + encode_deregister_request, handler) + self.active_registrations.put(user_registration_id, listener_registration) + + active_connections = self._client.connection_manager.connections + for connection in active_connections: + try: + self.register_listener_on_connection(listener_registration, connection) + except Exception as e: + if connection.live(): + self.deregister_listener(user_registration_id) + raise HazelcastError("Listener cannot be added ") # cause'unu include etmeli miyim nasil (,e) + return user_registration_id + + def register_listener_on_connection(self, listener_registration, connection): + # From JAVA: assert(Thread.currentThread().getName().contains("eventRegistration")); + registration_map = listener_registration.connection_registrations + + if connection in registration_map: + return + + registration_request = listener_registration.registration_request + future = self._invocation_service.invoke_on_connection(registration_request, connection, + event_handler=listener_registration.handler) + response = future.result() + + server_registration_id = listener_registration.decode_register_response(response) + correlation_id = registration_request.get_correlation_id() + registration = EventRegistration(server_registration_id, correlation_id) + registration_map.put(connection, registration) + + def deregister_listener(self, user_registration_id): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + check_not_none(user_registration_id, "Null userRegistrationId is not allowed!") + + with self._registration_lock: + listener_registration = self.active_registrations.get(user_registration_id) + if listener_registration is None: + return False + successful = True + for connection, event_registration in listener_registration.connection_registrations.items(): + try: + server_registration_id = event_registration.server_registration_id + deregister_request = listener_registration.encode_deregister_request(server_registration_id) + self._invocation_service.invoke_on_connection(deregister_request, connection).result() + self.remove_event_handler(event_registration.correlation_id) + listener_registration.connection_registrations.pop(connection) + except Exception: + if connection.live(): + successful = False + self.logger.warning("Deregistration for listener with ID {} has failed to address {} ".format + (user_registration_id, "address"), exc_info=True, extra=self._logger_extras) + if successful: + self.active_registrations.pop(user_registration_id) + return successful + + def connection_added(self, connection): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + with self._registration_lock: + for listener_registration in self.active_registrations.values(): + self.register_listener_on_connection(listener_registration, connection) + + def connection_removed(self, connection): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + with self._registration_lock: + for listener_registration in self.active_registrations.values(): + event_registration = listener_registration.connection_registrations.pop(connection) + if event_registration is not None: + self.remove_event_handler(event_registration.correlation_id) + + def start(self): # ? + # clientConnectionManager.addConnectionListener(this); + # ama pythonda add_listener parametre olarak listener almiyor. + self._client.connection_manager.add_listener(self.connection_added, self.connection_removed) + # .NET'te bu var gerekli mi: + # if (IsSmart) + # { + # _connectionReopener = new Timer(ReOpenAllConnectionsIfNotOpen, null, 1000, 1000); + # } + + def handle_client_message(self, message): + correlation_id = message.get_correlation_id() + if correlation_id not in self._event_handlers: + self.logger.warning("Got event message with unknown correlation id: %s", message, extra=self._logger_extras) + event_handler = self._event_handlers.get(correlation_id) + event_handler(message) + # Asagidakini yapmak yerine bu sekilde yaptim + # invocation = self._event_handlers[correlation_id] --> bundan nasil invocation cikabiliyor ki? + # self._handle_event(invocation, message) + + def add_event_handler(self, correlation_id, event_handler): + self._event_handlers[correlation_id] = event_handler + + def remove_event_handler(self, correlation_id): + self._event_handlers.pop(correlation_id) + + """ def start_listening(self, request, event_handler, decode_add_listener, key=None): if key: partition_id = self._client.partition_service.get_partition_id(key) @@ -98,7 +260,9 @@ def stop_listening(self, registration_id, encode_remove_listener): return True except KeyError: return False - + """ + """ + # Gerekli mi? def re_register_listener(self, invocation): registration_id = invocation.registration_id new_invocation = ListenerInvocation(self, invocation.request, invocation.event_handler, @@ -119,11 +283,14 @@ def callback(f): self.logger.debug("Re-registering listener %s for request %s", registration_id, new_invocation.request, extra=self._logger_extras) self._client.invoker.invoke(new_invocation).add_done_callback(callback) - + """ + """ + # Gerekli mi? Bunun aynisi InvocationService'te de var. def _init_invocation_timeout(self): invocation_timeout = self._client.properties.get_seconds_positive_or_default( self._client.properties.INVOCATION_TIMEOUT_SECONDS) return invocation_timeout + """ class InvocationService(object): @@ -131,14 +298,14 @@ class InvocationService(object): def __init__(self, client): self._pending = {} - self._event_handlers = {} self._next_correlation_id = AtomicInteger(1) self._client = client self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} self._event_queue = queue.Queue() self._is_redo_operation = client.config.network_config.redo_operation - self._invocation_retry_pause = self._init_invocation_retry_pause() + self.invocation_retry_pause = self._init_invocation_retry_pause() #eskiden private'ti public yaptim. self.invocation_timeout = self._init_invocation_timeout() + self._listener_service = client.listener if client.config.network_config.smart_routing: self.invoke = self.invoke_smart @@ -148,8 +315,8 @@ def __init__(self, client): self._client.connection_manager.add_listener(on_connection_closed=self.cleanup_connection) client.heartbeat.add_listener(on_heartbeat_stopped=self._heartbeat_stopped) - def invoke_on_connection(self, message, connection, ignore_heartbeat=False): - return self.invoke(Invocation(self, message, connection=connection), ignore_heartbeat) + def invoke_on_connection(self, message, connection, ignore_heartbeat=False, event_handler=None): + return self.invoke(Invocation(self, message, connection=connection, event_handler=event_handler), ignore_heartbeat) def invoke_on_partition(self, message, partition_id, invocation_timeout=None): invocation = Invocation(self, message, partition_id=partition_id) @@ -222,9 +389,10 @@ def _heartbeat_stopped(self, connection): if invocation.sent_connection == connection: self._handle_exception(invocation, TargetDisconnectedError("%s has stopped heart beating." % connection)) - + """ def _remove_event_handler(self, correlation_id): self._event_handlers.pop(correlation_id) + """ def _send_to_address(self, invocation, address, ignore_heartbeat=False): try: @@ -252,8 +420,8 @@ def _send(self, invocation, connection, ignore_heartbeat): if not invocation.timer: invocation.timer = self._client.reactor.add_timer_absolute(invocation.timeout, invocation.on_timeout) - if isinstance(invocation, ListenerInvocation): - self._event_handlers[correlation_id] = invocation + if invocation.event_handler is not None: + self._listener_service.add_event_handler(correlation_id, invocation.event_handler) self.logger.debug("Sending %s to %s", message, connection, extra=self._logger_extras) @@ -265,17 +433,19 @@ def _send(self, invocation, connection, ignore_heartbeat): try: connection.send_message(message) except IOError as e: + self._listener_service.remove_event_handler(correlation_id) # fail ederse event handleri temizlemek icin self._handle_exception(invocation, e) def _handle_client_message(self, message): correlation_id = message.get_correlation_id() if message.has_flags(LISTENER_FLAG): - if correlation_id not in self._event_handlers: - self.logger.warning("Got event message with unknown correlation id: %s", message, - extra=self._logger_extras) - return - invocation = self._event_handlers[correlation_id] - self._handle_event(invocation, message) + self._listener_service.handle_client_message(message) + # if correlation_id not in self._event_handlers: + # self.logger.warning("Got event message with unknown correlation id: %s", message, + # extra=self._logger_extras) + # return + # invocation = self._event_handlers[correlation_id] + # self._handle_event(invocation, message) return if correlation_id not in self._pending: self.logger.warning("Got message with unknown correlation id: %s", message, extra=self._logger_extras) @@ -320,8 +490,8 @@ def _try_retry(self, invocation): invoke_func = functools.partial(self.invoke, invocation) self.logger.debug("Rescheduling request %s to be retried in %s seconds", invocation.request, - self._invocation_retry_pause, extra=self._logger_extras) - self._client.reactor.add_timer(self._invocation_retry_pause, invoke_func) + self.invocation_retry_pause, extra=self._logger_extras) + self._client.reactor.add_timer(self.invocation_retry_pause, invoke_func) return True def _is_member(self, address): From 659ac4d3ace7cc359b18afc1aa0057ba7e10d08e Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Wed, 24 Jul 2019 19:17:49 +0300 Subject: [PATCH 02/14] Introduced start() to listener and invocation services. Linked proxy base to listener service. --- hazelcast/client.py | 5 +- hazelcast/cluster.py | 4 +- hazelcast/invocation.py | 258 ++-------------------------------------- hazelcast/listener.py | 225 +++++++++++++++++++++++++++++++++++ hazelcast/proxy/base.py | 4 +- hazelcast/proxy/map.py | 8 +- start-rc.sh | 2 +- 7 files changed, 244 insertions(+), 262 deletions(-) create mode 100644 hazelcast/listener.py diff --git a/hazelcast/client.py b/hazelcast/client.py index e9a68e5dd0..bd04027b7a 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -6,7 +6,8 @@ from hazelcast.cluster import ClusterService, RandomLoadBalancer from hazelcast.config import ClientConfig, ClientProperties from hazelcast.connection import ConnectionManager, Heartbeat, DefaultAddressProvider, DefaultAddressTranslator -from hazelcast.invocation import InvocationService, ListenerService +from hazelcast.invocation import InvocationService +from hazelcast.listener import ListenerService from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN from hazelcast.partition import PartitionService from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \ @@ -61,8 +62,10 @@ def __init__(self, config=None): def _start(self): self.reactor.start() try: + self.invoker.start() self.cluster.start() self.heartbeat.start() + self.listener.start() self.partition_service.start() self.statistics.start() except: diff --git a/hazelcast/cluster.py b/hazelcast/cluster.py index 89e2b55a74..cb95b9b0a7 100644 --- a/hazelcast/cluster.py +++ b/hazelcast/cluster.py @@ -5,7 +5,6 @@ import uuid from hazelcast.exception import HazelcastError, AuthenticationError, TargetDisconnectedError -from hazelcast.invocation import ListenerInvocation from hazelcast.lifecycle import LIFECYCLE_STATE_CONNECTED, LIFECYCLE_STATE_DISCONNECTED from hazelcast.protocol.codec import client_add_membership_listener_codec, client_authentication_codec from hazelcast.util import get_possible_addresses, get_provider_addresses, calculate_version @@ -181,8 +180,7 @@ def _init_membership_listener(self, connection): def handler(m): client_add_membership_listener_codec.handle(m, self._handle_member, self._handle_member_list) - response = self._client.invoker.invoke( - ListenerInvocation(self._client.listener, request, handler, connection=connection)).result() + response = self._client.invoker.invoke_on_connection(request, connection, True, handler).result() registration_id = client_add_membership_listener_codec.decode_response(response)["response"] self.logger.debug("Registered membership listener with ID " + registration_id, extra=self._logger_extras) self._initial_list_fetched.wait() diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index db7b047c7c..971bbbd8c8 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -4,16 +4,15 @@ import functools from hazelcast.exception import create_exception, HazelcastInstanceNotActiveError, is_retryable_error, TimeoutError, \ - AuthenticationError, TargetDisconnectedError, HazelcastClientNotActiveException, TargetNotMemberError, \ - HazelcastError, OperationTimeoutError + AuthenticationError, TargetDisconnectedError, HazelcastClientNotActiveException, TargetNotMemberError from hazelcast.future import Future from hazelcast.lifecycle import LIFECYCLE_STATE_CONNECTED from hazelcast.protocol.client_message import LISTENER_FLAG from hazelcast.protocol.custom_codec import EXCEPTION_MESSAGE_TYPE, ErrorCodec -from hazelcast.util import AtomicInteger, check_not_none, current_time_in_millis +from hazelcast.util import AtomicInteger from hazelcast.six.moves import queue from hazelcast import six -from uuid import uuid4 + class Invocation(object): sent_connection = None @@ -57,242 +56,6 @@ def on_timeout(self): self.set_exception(TimeoutError("Request timed out after %d seconds." % self._invocation_timeout)) -class ListenerRegistration(object): - def __init__(self, registration_request, decode_register_response, encode_deregister_request, handler): - self.registration_request = registration_request #of type ClientMessage (already encoded) - self.decode_register_response = decode_register_response #take ClientMessage return String - self.encode_deregister_request = encode_deregister_request #take String return ClientMessage - self.handler = handler - self.connection_registrations = {} #Map ) - - -class EventRegistration(object): - def __init__(self, server_registration_id, correlation_id): - self.server_registration_id = server_registration_id - self.correlation_id = correlation_id - # self.connection = connection --> buna gerek yok bence - -""" -class ListenerInvocation(Invocation): - # used for storing the original registration id across re-registrations - registration_id = None - - def __init__(self, invocation_service, request, event_handler, response_decoder=None, **kwargs): - Invocation.__init__(self, invocation_service, request, **kwargs) - self.event_handler = event_handler - self.response_decoder = response_decoder -""" - - -class ListenerService(object): - logger = logging.getLogger("HazelcastClient.ListenerService") - - def __init__(self, client): - self._client = client - self._invocation_service = client.invoker - self.is_smart = client.config.network_config.smart_routing - self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} - self.active_registrations = {} # Map - self._registration_lock = threading.RLock() - self._event_handlers = {} - - def try_sync_connect_to_all_members(self): - cluster_service = self._client.cluster - start_millis = current_time_in_millis() - while True: - last_failed_member = None - last_exception = None - for member in cluster_service.members(): - try: - self._client.connection_manager.get_or_connect(member.address).result() #! - except Exception as e: - last_failed_member = member - last_exception = e - if last_exception is None: - break - self.time_out_or_sleep_before_next_try(start_millis, last_failed_member, last_exception) - if not self._client.lifecycle.is_live(): - break - - def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, last_exception): - now_in_millis = current_time_in_millis() - elapsed_millis = now_in_millis - start_millis - invocation_time_out_millis = self._invocation_service.invocation_timeout * 1000 - timed_out = elapsed_millis > invocation_time_out_millis - if timed_out: - raise OperationTimeoutError("Registering listeners is timed out." - + " Last failed member : " + str(last_failed_member) + ", " - + " Current time: " + str(now_in_millis) + ", " - + " Start time : " + str(start_millis) + ", " - + " Client invocation timeout : " + str(invocation_time_out_millis) + " ms, " - + " Elapsed time : " + str(elapsed_millis) + " ms. ", last_exception) - else: - self.sleep_before_next_try() - - def sleep_before_next_try(self): - self._invocation_service.invocation_retry_pause() - pass - - def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); - if self.is_smart: - self.try_sync_connect_to_all_members() - - with self._registration_lock: - user_registration_id = str(uuid4()) - listener_registration = ListenerRegistration(registration_request, decode_register_response, - encode_deregister_request, handler) - self.active_registrations.put(user_registration_id, listener_registration) - - active_connections = self._client.connection_manager.connections - for connection in active_connections: - try: - self.register_listener_on_connection(listener_registration, connection) - except Exception as e: - if connection.live(): - self.deregister_listener(user_registration_id) - raise HazelcastError("Listener cannot be added ") # cause'unu include etmeli miyim nasil (,e) - return user_registration_id - - def register_listener_on_connection(self, listener_registration, connection): - # From JAVA: assert(Thread.currentThread().getName().contains("eventRegistration")); - registration_map = listener_registration.connection_registrations - - if connection in registration_map: - return - - registration_request = listener_registration.registration_request - future = self._invocation_service.invoke_on_connection(registration_request, connection, - event_handler=listener_registration.handler) - response = future.result() - - server_registration_id = listener_registration.decode_register_response(response) - correlation_id = registration_request.get_correlation_id() - registration = EventRegistration(server_registration_id, correlation_id) - registration_map.put(connection, registration) - - def deregister_listener(self, user_registration_id): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); - check_not_none(user_registration_id, "Null userRegistrationId is not allowed!") - - with self._registration_lock: - listener_registration = self.active_registrations.get(user_registration_id) - if listener_registration is None: - return False - successful = True - for connection, event_registration in listener_registration.connection_registrations.items(): - try: - server_registration_id = event_registration.server_registration_id - deregister_request = listener_registration.encode_deregister_request(server_registration_id) - self._invocation_service.invoke_on_connection(deregister_request, connection).result() - self.remove_event_handler(event_registration.correlation_id) - listener_registration.connection_registrations.pop(connection) - except Exception: - if connection.live(): - successful = False - self.logger.warning("Deregistration for listener with ID {} has failed to address {} ".format - (user_registration_id, "address"), exc_info=True, extra=self._logger_extras) - if successful: - self.active_registrations.pop(user_registration_id) - return successful - - def connection_added(self, connection): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); - with self._registration_lock: - for listener_registration in self.active_registrations.values(): - self.register_listener_on_connection(listener_registration, connection) - - def connection_removed(self, connection): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); - with self._registration_lock: - for listener_registration in self.active_registrations.values(): - event_registration = listener_registration.connection_registrations.pop(connection) - if event_registration is not None: - self.remove_event_handler(event_registration.correlation_id) - - def start(self): # ? - # clientConnectionManager.addConnectionListener(this); - # ama pythonda add_listener parametre olarak listener almiyor. - self._client.connection_manager.add_listener(self.connection_added, self.connection_removed) - # .NET'te bu var gerekli mi: - # if (IsSmart) - # { - # _connectionReopener = new Timer(ReOpenAllConnectionsIfNotOpen, null, 1000, 1000); - # } - - def handle_client_message(self, message): - correlation_id = message.get_correlation_id() - if correlation_id not in self._event_handlers: - self.logger.warning("Got event message with unknown correlation id: %s", message, extra=self._logger_extras) - event_handler = self._event_handlers.get(correlation_id) - event_handler(message) - # Asagidakini yapmak yerine bu sekilde yaptim - # invocation = self._event_handlers[correlation_id] --> bundan nasil invocation cikabiliyor ki? - # self._handle_event(invocation, message) - - def add_event_handler(self, correlation_id, event_handler): - self._event_handlers[correlation_id] = event_handler - - def remove_event_handler(self, correlation_id): - self._event_handlers.pop(correlation_id) - - """ - def start_listening(self, request, event_handler, decode_add_listener, key=None): - if key: - partition_id = self._client.partition_service.get_partition_id(key) - invocation = ListenerInvocation(self, request, event_handler, decode_add_listener, - partition_id=partition_id) - else: - invocation = ListenerInvocation(self, request, event_handler, decode_add_listener) - - future = self._client.invoker.invoke(invocation) - registration_id = decode_add_listener(future.result()) - invocation.registration_id = registration_id # store the original registration id for future reference - self.registrations[registration_id] = (registration_id, request.get_correlation_id()) - return registration_id - - def stop_listening(self, registration_id, encode_remove_listener): - try: - actual_id, correlation_id = self.registrations.pop(registration_id) - self._client.invoker._remove_event_handler(correlation_id) - # TODO: should be invoked on same node as registration? - self._client.invoker.invoke_on_random_target(encode_remove_listener(actual_id)).result() - return True - except KeyError: - return False - """ - """ - # Gerekli mi? - def re_register_listener(self, invocation): - registration_id = invocation.registration_id - new_invocation = ListenerInvocation(self, invocation.request, invocation.event_handler, - invocation.response_decoder, partition_id=invocation.partition_id) - new_invocation.registration_id = registration_id - - # re-send the request - def callback(f): - if f.is_success(): - new_id = new_invocation.response_decoder(f.result()) - self.logger.debug("Re-registered listener with id %s and new_id %s for request %s", - registration_id, new_id, new_invocation.request, extra=self._logger_extras) - self.registrations[registration_id] = (new_id, new_invocation.request.get_correlation_id()) - else: - self.logger.warning("Re-registration for listener with id %s failed.", registration_id, exc_info=True, - extra=self._logger_extras) - - self.logger.debug("Re-registering listener %s for request %s", registration_id, new_invocation.request, - extra=self._logger_extras) - self._client.invoker.invoke(new_invocation).add_done_callback(callback) - """ - """ - # Gerekli mi? Bunun aynisi InvocationService'te de var. - def _init_invocation_timeout(self): - invocation_timeout = self._client.properties.get_seconds_positive_or_default( - self._client.properties.INVOCATION_TIMEOUT_SECONDS) - return invocation_timeout - """ - - class InvocationService(object): logger = logging.getLogger("HazelcastClient.InvocationService") @@ -305,7 +68,7 @@ def __init__(self, client): self._is_redo_operation = client.config.network_config.redo_operation self.invocation_retry_pause = self._init_invocation_retry_pause() #eskiden private'ti public yaptim. self.invocation_timeout = self._init_invocation_timeout() - self._listener_service = client.listener + self._listener_service = None if client.config.network_config.smart_routing: self.invoke = self.invoke_smart @@ -315,6 +78,9 @@ def __init__(self, client): self._client.connection_manager.add_listener(on_connection_closed=self.cleanup_connection) client.heartbeat.add_listener(on_heartbeat_stopped=self._heartbeat_stopped) + def start(self): + self._listener_service = self._client.listener + def invoke_on_connection(self, message, connection, ignore_heartbeat=False, event_handler=None): return self.invoke(Invocation(self, message, connection=connection, event_handler=event_handler), ignore_heartbeat) @@ -389,10 +155,6 @@ def _heartbeat_stopped(self, connection): if invocation.sent_connection == connection: self._handle_exception(invocation, TargetDisconnectedError("%s has stopped heart beating." % connection)) - """ - def _remove_event_handler(self, correlation_id): - self._event_handlers.pop(correlation_id) - """ def _send_to_address(self, invocation, address, ignore_heartbeat=False): try: @@ -440,12 +202,6 @@ def _handle_client_message(self, message): correlation_id = message.get_correlation_id() if message.has_flags(LISTENER_FLAG): self._listener_service.handle_client_message(message) - # if correlation_id not in self._event_handlers: - # self.logger.warning("Got event message with unknown correlation id: %s", message, - # extra=self._logger_extras) - # return - # invocation = self._event_handlers[correlation_id] - # self._handle_event(invocation, message) return if correlation_id not in self._pending: self.logger.warning("Got message with unknown correlation id: %s", message, extra=self._logger_extras) diff --git a/hazelcast/listener.py b/hazelcast/listener.py new file mode 100644 index 0000000000..14b080fcc4 --- /dev/null +++ b/hazelcast/listener.py @@ -0,0 +1,225 @@ +import logging +import threading +from uuid import uuid4 + +from hazelcast.exception import OperationTimeoutError, HazelcastError +from hazelcast.util import current_time_in_millis, check_not_none + + +class ListenerRegistration(object): + def __init__(self, registration_request, decode_register_response, encode_deregister_request, handler): + self.registration_request = registration_request #of type ClientMessage (already encoded) + self.decode_register_response = decode_register_response #take ClientMessage return String + self.encode_deregister_request = encode_deregister_request #take String return ClientMessage + self.handler = handler + self.connection_registrations = {} #Map ) + + +class EventRegistration(object): + def __init__(self, server_registration_id, correlation_id): + self.server_registration_id = server_registration_id + self.correlation_id = correlation_id + + +class ListenerService(object): + logger = logging.getLogger("HazelcastClient.ListenerService") + + def __init__(self, client): + self._client = client + self._invocation_service = client.invoker + self.is_smart = client.config.network_config.smart_routing + self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} + self.active_registrations = {} # Map + self._registration_lock = threading.RLock() + self._event_handlers = {} + + def try_sync_connect_to_all_members(self): + cluster_service = self._client.cluster + start_millis = current_time_in_millis() + while True: + last_failed_member = None + last_exception = None + for member in cluster_service.members: + try: + self._client.connection_manager.get_or_connect(member.address).result() #! + except Exception as e: + last_failed_member = member + last_exception = e + if last_exception is None: + break + self.time_out_or_sleep_before_next_try(start_millis, last_failed_member, last_exception) + if not self._client.lifecycle.is_live(): + break + + def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, last_exception): + now_in_millis = current_time_in_millis() + elapsed_millis = now_in_millis - start_millis + invocation_time_out_millis = self._invocation_service.invocation_timeout * 1000 + timed_out = elapsed_millis > invocation_time_out_millis + if timed_out: + raise OperationTimeoutError("Registering listeners is timed out." + + " Last failed member : " + str(last_failed_member) + ", " + + " Current time: " + str(now_in_millis) + ", " + + " Start time : " + str(start_millis) + ", " + + " Client invocation timeout : " + str(invocation_time_out_millis) + " ms, " + + " Elapsed time : " + str(elapsed_millis) + " ms. ", last_exception) + else: + self.sleep_before_next_try() + + def sleep_before_next_try(self): + self._invocation_service.invocation_retry_pause() + pass + + def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + if self.is_smart: + self.try_sync_connect_to_all_members() + + with self._registration_lock: + user_registration_id = str(uuid4()) + listener_registration = ListenerRegistration(registration_request, decode_register_response, + encode_deregister_request, handler) + self.active_registrations[user_registration_id] = listener_registration + + active_connections = self._client.connection_manager.connections + for connection in active_connections.values(): + try: + self.register_listener_on_connection(listener_registration, connection) + except Exception as e: + if connection.live(): + self.deregister_listener(user_registration_id) + raise HazelcastError("Listener cannot be added ") # cause'unu include etmeli miyim nasil (,e) + return user_registration_id + + def register_listener_on_connection(self, listener_registration, connection): + # From JAVA: assert(Thread.currentThread().getName().contains("eventRegistration")); + registration_map = listener_registration.connection_registrations + + if connection in registration_map: + return + + registration_request = listener_registration.registration_request + future = self._invocation_service.invoke_on_connection(registration_request, connection, + event_handler=listener_registration.handler) + response = future.result() + + server_registration_id = listener_registration.decode_register_response(response) + correlation_id = registration_request.get_correlation_id() + registration = EventRegistration(server_registration_id, correlation_id) + registration_map[connection] = registration + + def deregister_listener(self, user_registration_id): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + check_not_none(user_registration_id, "Null userRegistrationId is not allowed!") + + with self._registration_lock: + listener_registration = self.active_registrations.get(user_registration_id) + if listener_registration is None: + return False + successful = True + for connection, event_registration in list(listener_registration.connection_registrations.items()): + try: + server_registration_id = event_registration.server_registration_id + deregister_request = listener_registration.encode_deregister_request(server_registration_id) + self._invocation_service.invoke_on_connection(deregister_request, connection).result() + self.remove_event_handler(event_registration.correlation_id) + listener_registration.connection_registrations.pop(connection) + except Exception: + if connection.live(): + successful = False + self.logger.warning("Deregistration for listener with ID {} has failed to address {} ".format + (user_registration_id, "address"), exc_info=True, extra=self._logger_extras) + if successful: + self.active_registrations.pop(user_registration_id) + return successful + + def connection_added(self, connection): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + with self._registration_lock: + for listener_registration in self.active_registrations.values(): + self.register_listener_on_connection(listener_registration, connection) + + def connection_removed(self, connection, _): + # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); + with self._registration_lock: + for listener_registration in self.active_registrations.values(): + event_registration = listener_registration.connection_registrations.pop(connection) + if event_registration is not None: + self.remove_event_handler(event_registration.correlation_id) + + def start(self): # ? + # clientConnectionManager.addConnectionListener(this); + # ama pythonda add_listener parametre olarak listener almiyor. + self._client.connection_manager.add_listener(self.connection_added, self.connection_removed) + + def handle_client_message(self, message): + correlation_id = message.get_correlation_id() + if correlation_id not in self._event_handlers: + self.logger.warning("Got event message with unknown correlation id: %s", message, extra=self._logger_extras) + event_handler = self._event_handlers.get(correlation_id) + event_handler(message) + # Asagidakini yapmak yerine bu sekilde yaptim + # invocation = self._event_handlers[correlation_id] --> bundan nasil invocation cikabiliyor ki? + # self._handle_event(invocation, message) + + def add_event_handler(self, correlation_id, event_handler): + self._event_handlers[correlation_id] = event_handler + + def remove_event_handler(self, correlation_id): + self._event_handlers.pop(correlation_id) + + """ + def start_listening(self, request, event_handler, decode_add_listener, key=None): + if key: + partition_id = self._client.partition_service.get_partition_id(key) + invocation = ListenerInvocation(self, request, event_handler, decode_add_listener, + partition_id=partition_id) + else: + invocation = ListenerInvocation(self, request, event_handler, decode_add_listener) + + future = self._client.invoker.invoke(invocation) + registration_id = decode_add_listener(future.result()) + invocation.registration_id = registration_id # store the original registration id for future reference + self.registrations[registration_id] = (registration_id, request.get_correlation_id()) + return registration_id + + def stop_listening(self, registration_id, encode_remove_listener): + try: + actual_id, correlation_id = self.registrations.pop(registration_id) + self._client.invoker._remove_event_handler(correlation_id) + # TODO: should be invoked on same node as registration? + self._client.invoker.invoke_on_random_target(encode_remove_listener(actual_id)).result() + return True + except KeyError: + return False + """ + """ + # Gerekli mi? + def re_register_listener(self, invocation): + registration_id = invocation.registration_id + new_invocation = ListenerInvocation(self, invocation.request, invocation.event_handler, + invocation.response_decoder, partition_id=invocation.partition_id) + new_invocation.registration_id = registration_id + + # re-send the request + def callback(f): + if f.is_success(): + new_id = new_invocation.response_decoder(f.result()) + self.logger.debug("Re-registered listener with id %s and new_id %s for request %s", + registration_id, new_id, new_invocation.request, extra=self._logger_extras) + self.registrations[registration_id] = (new_id, new_invocation.request.get_correlation_id()) + else: + self.logger.warning("Re-registration for listener with id %s failed.", registration_id, exc_info=True, + extra=self._logger_extras) + + self.logger.debug("Re-registering listener %s for request %s", registration_id, new_invocation.request, + extra=self._logger_extras) + self._client.invoker.invoke(new_invocation).add_done_callback(callback) + """ + """ + # Gerekli mi? Bunun aynisi InvocationService'te de var. + def _init_invocation_timeout(self): + invocation_timeout = self._client.properties.get_seconds_positive_or_default( + self._client.properties.INVOCATION_TIMEOUT_SECONDS) + return invocation_timeout + """ \ No newline at end of file diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index c82e9b7e80..feb9a3d7a5 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -34,8 +34,8 @@ def __init__(self, client, service_name, name): self.logger = logging.getLogger("HazelcastClient.%s(%s)" % (type(self).__name__, name)) self._to_object = client.serialization_service.to_object self._to_data = client.serialization_service.to_data - self._start_listening = client.listener.start_listening - self._stop_listening = client.listener.stop_listening + self._register_listener = client.listener.register_listener + self._deregister_listener = client.listener.deregister_listener def destroy(self): """ diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index aa2fa9dfbe..2baa765038 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -118,8 +118,9 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.expired: expired_func(event) - return self._start_listening(request, lambda m: map_add_entry_listener_codec.handle(m, handle_event_entry), - lambda r: map_add_entry_listener_codec.decode_response(r)['response']) + return self._register_listener(request, lambda r: map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: map_add_entry_listener_codec.handle(m, handle_event_entry)) def add_index(self, attribute, ordered=False): """ @@ -640,8 +641,7 @@ def remove_entry_listener(self, registration_id): :param registration_id: (str), id of registered listener. :return: (bool), ``true`` if registration is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: map_remove_entry_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def replace(self, key, value): """ diff --git a/start-rc.sh b/start-rc.sh index 9f509d0d1d..0d21ad7f98 100644 --- a/start-rc.sh +++ b/start-rc.sh @@ -6,7 +6,7 @@ else USER="" fi -HZ_VERSION="3.12.1-SNAPSHOT" +HZ_VERSION="3.12.1" HAZELCAST_TEST_VERSION=${HZ_VERSION} HAZELCAST_ENTERPRISE_TEST_VERSION=${HZ_VERSION} From 51a678206d57770eb1c810f2556d79bdd9c2cd9a Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Thu, 25 Jul 2019 14:54:56 +0300 Subject: [PATCH 03/14] Linked listener service to list, multi_map and replicated_map --- hazelcast/invocation.py | 9 +++++---- hazelcast/proxy/list.py | 9 ++++----- hazelcast/proxy/multi_map.py | 13 ++++++------- hazelcast/proxy/replicated_map.py | 13 ++++++------- run-tests.sh | 2 +- 5 files changed, 22 insertions(+), 24 deletions(-) diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 971bbbd8c8..e52cbe2282 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -135,10 +135,11 @@ def cleanup_connection(self, connection, cause): if invocation.sent_connection == connection: self._handle_exception(invocation, cause) - if self._client.lifecycle.is_live: - for correlation_id, invocation in six.iteritems(dict(self._event_handlers)): - if invocation.sent_connection == connection and invocation.connection is None: - self._client.listener.re_register_listener(invocation) + # TODO + # if self._client.lifecycle.is_live: + # for correlation_id, invocation in six.iteritems(dict(self._event_handlers)): + # if invocation.sent_connection == connection and invocation.connection is None: + # self._client.listener.re_register_listener(invocation) def _init_invocation_retry_pause(self): invocation_retry_pause = self._client.properties.get_seconds_positive_or_default( diff --git a/hazelcast/proxy/list.py b/hazelcast/proxy/list.py index 858913f7fa..da8a646291 100644 --- a/hazelcast/proxy/list.py +++ b/hazelcast/proxy/list.py @@ -110,10 +110,9 @@ def handle_event_item(item, uuid, event_type): if item_removed_func: item_removed_func(item_event) - return self._start_listening(request, - lambda m: list_add_listener_codec.handle(m, handle_event_item), - lambda r: list_add_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, lambda r: list_add_listener_codec.decode_response(r)['response'], + lambda reg_id: list_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: list_add_listener_codec.handle(m, handle_event_item)) def clear(self): """ @@ -254,7 +253,7 @@ def remove_listener(self, registration_id): :param registration_id: (str), id of the listener to be deleted. :return: (bool), ``true`` if the item listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, lambda i: list_remove_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def retain_all(self, items): """ diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index 7c579d5ef3..253ad1e1b4 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -46,11 +46,11 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.clear_all and clear_all_func: clear_all_func(event) - return self._start_listening(request, - lambda m: multi_map_add_entry_listener_codec.handle(m, - handle_event_entry), - lambda r: multi_map_add_entry_listener_codec.decode_response(r)[ - 'response']) + return self._register_listener(request, + lambda r: multi_map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: multi_map_remove_entry_listener_codec.encode_request(self.name, + reg_id), + lambda m: multi_map_add_entry_listener_codec.handle(m, handle_event_entry)) def contains_key(self, key): """ @@ -257,8 +257,7 @@ def remove_entry_listener(self, registration_id): :param registration_id: (str), id of registered listener. :return: (bool), ``true`` if registration is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: multi_map_remove_entry_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def size(self): """ diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index e1503b0c6c..c573c98529 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -70,11 +70,11 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.clear_all and clear_all_func: clear_all_func(event) - return self._start_listening(request, - lambda m: replicated_map_add_entry_listener_codec.handle(m, - handle_event_entry), - lambda r: replicated_map_add_entry_listener_codec.decode_response(r)[ - 'response']) + return self._register_listener(request, + lambda r: replicated_map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: replicated_map_remove_entry_listener_codec.encode_request( + self.name, reg_id), + lambda m: replicated_map_add_entry_listener_codec.handle(m, handle_event_entry)) def clear(self): """ @@ -206,8 +206,7 @@ def remove_entry_listener(self, registration_id): :param registration_id: (str), id of registered listener. :return: (bool), ``true`` if registration is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: replicated_map_remove_entry_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def size(self): """ diff --git a/run-tests.sh b/run-tests.sh index 42b1d07dca..65bf860242 100644 --- a/run-tests.sh +++ b/run-tests.sh @@ -6,7 +6,7 @@ else USER="" fi -HZ_VERSION="3.12.1-SNAPSHOT" +HZ_VERSION="3.12.1" HAZELCAST_TEST_VERSION=${HZ_VERSION} HAZELCAST_ENTERPRISE_TEST_VERSION=${HZ_VERSION} From b596b41d07fb403aebafd592cc399e6839dd6106 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Thu, 25 Jul 2019 16:19:54 +0300 Subject: [PATCH 04/14] Linked listener service to queue, set and topic. --- hazelcast/proxy/map.py | 2 +- hazelcast/proxy/queue.py | 9 ++++----- hazelcast/proxy/set.py | 9 ++++----- hazelcast/proxy/topic.py | 12 ++++++------ 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 2baa765038..1a1bad508b 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -930,7 +930,7 @@ def handle_decode(message): try: request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, False) - self._invalidation_listener_id = self._start_listening(request, handle, handle_decode) + self._invalidation_listener_id = self._start_listening(request, handle, handle_decode) # TODO except: self.logger.severe("-----------------\n Near Cache is not initialized!!! \n-----------------") diff --git a/hazelcast/proxy/queue.py b/hazelcast/proxy/queue.py index d6b5b41ddd..f9cd0fc5f6 100644 --- a/hazelcast/proxy/queue.py +++ b/hazelcast/proxy/queue.py @@ -87,10 +87,9 @@ def handle_event_item(item, uuid, event_type): if item_removed_func: item_removed_func(item_event) - return self._start_listening(request, - lambda m: queue_add_listener_codec.handle(m, handle_event_item), - lambda r: queue_add_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, lambda r: queue_add_listener_codec.decode_response(r)['response'], + lambda reg_id: queue_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: queue_add_listener_codec.handle(m, handle_event_item)) def clear(self): """ @@ -247,7 +246,7 @@ def remove_listener(self, registration_id): :param registration_id: (str), id of the listener to be deleted. :return: (bool), ``true`` if the item listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, lambda i: queue_remove_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def retain_all(self, items): """ diff --git a/hazelcast/proxy/set.py b/hazelcast/proxy/set.py index ad6f2eb000..417080148c 100644 --- a/hazelcast/proxy/set.py +++ b/hazelcast/proxy/set.py @@ -69,10 +69,9 @@ def handle_event_item(item, uuid, event_type): if item_removed_func: item_removed_func(item_event) - return self._start_listening(request, - lambda m: set_add_listener_codec.handle(m, handle_event_item), - lambda r: set_add_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, lambda r: set_add_listener_codec.decode_response(r)['response'], + lambda reg_id: set_remove_listener_codec.encode_request(self.name, reg_id), + lambda m: set_add_listener_codec.handle(m, handle_event_item)) def clear(self): """ @@ -153,7 +152,7 @@ def remove_listener(self, registration_id): :param registration_id: (str), id of the listener to be deleted. :return: (bool), ``true`` if the item listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, lambda i: set_remove_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) def retain_all(self, items): """ diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index f54a68c1cc..760cc0d5b4 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -31,10 +31,11 @@ def handle(item, publish_time, uuid): item_event = TopicMessage(self.name, item, publish_time, member, self._to_object) on_message(item_event) - return self._start_listening(request, - lambda m: topic_add_message_listener_codec.handle(m, handle), - lambda r: topic_add_message_listener_codec.decode_response(r)['response'], - self.partition_key) + return self._register_listener(request, + lambda r: topic_add_message_listener_codec.decode_response(r)['response'], + lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, + reg_id), + lambda m: topic_add_message_listener_codec.handle(m, handle)) def publish(self, message): """ @@ -53,5 +54,4 @@ def remove_listener(self, registration_id): :param registration_id: (str), registration id of the listener to be removed. :return: (bool), ``true`` if the listener is removed, ``false`` otherwise. """ - return self._stop_listening(registration_id, - lambda i: topic_remove_message_listener_codec.encode_request(self.name, i)) + return self._deregister_listener(registration_id) From 71c3e0f726c79a1df427c58fb947133cf756cbe4 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Thu, 25 Jul 2019 16:49:55 +0300 Subject: [PATCH 05/14] Linked listener service to map_feat_near_cache. --- hazelcast/proxy/map.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 1a1bad508b..6998f8efeb 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -925,12 +925,16 @@ def _add_near_cache_invalidation_listener(self): def handle(message): map_add_near_cache_entry_listener_codec.handle(message, self._handle_invalidation, self._handle_batch_invalidation) - def handle_decode(message): + def add_decode(message): return map_add_near_cache_entry_listener_codec.decode_response(message)['response'] + def remove_encode(reg_id): + return map_remove_entry_listener_codec.encode_request(self.name, reg_id) + try: - request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, False) - self._invalidation_listener_id = self._start_listening(request, handle, handle_decode) # TODO + request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, + False) + self._invalidation_listener_id = self._register_listener(request, add_decode, remove_encode, handle) except: self.logger.severe("-----------------\n Near Cache is not initialized!!! \n-----------------") From fe279e7b4be3d3f154c02563783f00c0a5fd2779 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Thu, 25 Jul 2019 17:00:50 +0300 Subject: [PATCH 06/14] Fixed KeyError for remove_event_handler in ListenerService. --- hazelcast/listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 14b080fcc4..d054e8b523 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -166,7 +166,7 @@ def add_event_handler(self, correlation_id, event_handler): self._event_handlers[correlation_id] = event_handler def remove_event_handler(self, correlation_id): - self._event_handlers.pop(correlation_id) + self._event_handlers.pop(correlation_id, None) """ def start_listening(self, request, event_handler, decode_add_listener, key=None): From 8d61d2e63b7e07fb7afffecda675e804c4931f65 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Mon, 29 Jul 2019 12:22:35 +0300 Subject: [PATCH 07/14] Added is_smart field to proxy. Tested if all data structures work local only with smart listener. --- hazelcast/listener.py | 2 +- hazelcast/proxy/base.py | 1 + hazelcast/proxy/list.py | 2 +- hazelcast/proxy/map.py | 14 +++++++------- hazelcast/proxy/queue.py | 2 +- hazelcast/proxy/replicated_map.py | 8 ++++---- hazelcast/proxy/set.py | 2 +- hazelcast/proxy/topic.py | 2 +- 8 files changed, 17 insertions(+), 16 deletions(-) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index d054e8b523..18685b57e0 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -143,7 +143,7 @@ def connection_removed(self, connection, _): # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); with self._registration_lock: for listener_registration in self.active_registrations.values(): - event_registration = listener_registration.connection_registrations.pop(connection) + event_registration = listener_registration.connection_registrations.pop(connection, None) if event_registration is not None: self.remove_event_handler(event_registration.correlation_id) diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index feb9a3d7a5..deb3b5fbd3 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -36,6 +36,7 @@ def __init__(self, client, service_name, name): self._to_data = client.serialization_service.to_data self._register_listener = client.listener.register_listener self._deregister_listener = client.listener.deregister_listener + self._is_smart = client.listener.is_smart def destroy(self): """ diff --git a/hazelcast/proxy/list.py b/hazelcast/proxy/list.py index da8a646291..4027e39156 100644 --- a/hazelcast/proxy/list.py +++ b/hazelcast/proxy/list.py @@ -96,7 +96,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f :param item_removed_func: Function to be called when an item is deleted from this list (optional). :return: (str), a registration id which is used as a key to remove the listener. """ - request = list_add_listener_codec.encode_request(self.name, include_value, False) + request = list_add_listener_codec.encode_request(self.name, include_value, self._is_smart) def handle_event_item(item, uuid, event_type): item = item if include_value else None diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 6998f8efeb..ff72d8cbd5 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -79,25 +79,25 @@ def add_entry_listener(self, include_value=False, key=None, predicate=None, adde .. seealso:: :class:`~hazelcast.serialization.predicate.Predicate` for more info about predicates. """ flags = get_entry_listener_flags(added=added_func, removed=removed_func, updated=updated_func, - evicted=evicted_func, evict_all=evict_all_func, clear_all=clear_all_func, merged=merged_func, - expired=expired_func) + evicted=evicted_func, evict_all=evict_all_func, clear_all=clear_all_func, + merged=merged_func, expired=expired_func) if key and predicate: key_data = self._to_data(key) predicate_data = self._to_data(predicate) request = map_add_entry_listener_to_key_with_predicate_codec.encode_request(self.name, key_data, predicate_data, include_value, - flags, False) + flags, self._is_smart) elif key and not predicate: key_data = self._to_data(key) request = map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, include_value, flags, - False) + self._is_smart) elif not key and predicate: predicate = self._to_data(predicate) request = map_add_entry_listener_with_predicate_codec.encode_request(self.name, predicate, include_value, - flags, False) + flags, self._is_smart) else: - request = map_add_entry_listener_codec.encode_request(self.name, include_value, flags, False) + request = map_add_entry_listener_codec.encode_request(self.name, include_value, flags, self._is_smart) def handle_event_entry(**_kwargs): event = EntryEvent(self._to_object, **_kwargs) @@ -933,7 +933,7 @@ def remove_encode(reg_id): try: request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, - False) + self._is_smart) self._invalidation_listener_id = self._register_listener(request, add_decode, remove_encode, handle) except: self.logger.severe("-----------------\n Near Cache is not initialized!!! \n-----------------") diff --git a/hazelcast/proxy/queue.py b/hazelcast/proxy/queue.py index f9cd0fc5f6..d7f918ff0e 100644 --- a/hazelcast/proxy/queue.py +++ b/hazelcast/proxy/queue.py @@ -73,7 +73,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f :param item_removed_func: Function to be called when an item is deleted from this set (optional). :return: (str), a registration id which is used as a key to remove the listener. """ - request = queue_add_listener_codec.encode_request(self.name, include_value, False) + request = queue_add_listener_codec.encode_request(self.name, include_value, self._is_smart) def handle_event_item(item, uuid, event_type): item = item if include_value else None diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index c573c98529..1a040b1b0f 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -46,16 +46,16 @@ def add_entry_listener(self, key=None, predicate=None, added_func=None, removed_ predicate_data = self._to_data(predicate) request = replicated_map_add_entry_listener_to_key_with_predicate_codec.encode_request(self.name, key_data, predicate_data, - False) + self._is_smart) elif key and not predicate: key_data = self._to_data(key) - request = replicated_map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, False) + request = replicated_map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, self._is_smart) elif not key and predicate: predicate = self._to_data(predicate) request = replicated_map_add_entry_listener_with_predicate_codec.encode_request(self.name, predicate, - False) + self._is_smart) else: - request = replicated_map_add_entry_listener_codec.encode_request(self.name, False) + request = replicated_map_add_entry_listener_codec.encode_request(self.name, self._is_smart) def handle_event_entry(**_kwargs): event = EntryEvent(self._to_object, **_kwargs) diff --git a/hazelcast/proxy/set.py b/hazelcast/proxy/set.py index 417080148c..2416fa2a3a 100644 --- a/hazelcast/proxy/set.py +++ b/hazelcast/proxy/set.py @@ -55,7 +55,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f :param item_removed_func: Function to be called when an item is deleted from this set (optional). :return: (str), a registration id which is used as a key to remove the listener. """ - request = set_add_listener_codec.encode_request(self.name, include_value, False) + request = set_add_listener_codec.encode_request(self.name, include_value, self._is_smart) def handle_event_item(item, uuid, event_type): item = item if include_value else None diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 760cc0d5b4..b646f5a072 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -24,7 +24,7 @@ def add_listener(self, on_message=None): :param on_message: (Function), function to be called when a message is published. :return: (str), a registration id which is used as a key to remove the listener. """ - request = topic_add_message_listener_codec.encode_request(self.name, False) + request = topic_add_message_listener_codec.encode_request(self.name, self._is_smart) def handle(item, publish_time, uuid): member = self._client.cluster.get_member_by_uuid(uuid) From 543138d6626a1861c0e223d6ce010fd6a0fdb354 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Tue, 30 Jul 2019 16:37:30 +0300 Subject: [PATCH 08/14] Added unit tests for connection addition and removal for smart and non- smart listener. --- hazelcast/listener.py | 49 +++++++-------- tests/listener_test.py | 116 +++++++++++++++++++++++++++++++++++ tests/smart_listener_test.py | 116 +++++++++++++++++++++++++++++++++++ 3 files changed, 253 insertions(+), 28 deletions(-) create mode 100644 tests/listener_test.py create mode 100644 tests/smart_listener_test.py diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 18685b57e0..378fb5f077 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -8,9 +8,9 @@ class ListenerRegistration(object): def __init__(self, registration_request, decode_register_response, encode_deregister_request, handler): - self.registration_request = registration_request #of type ClientMessage (already encoded) - self.decode_register_response = decode_register_response #take ClientMessage return String - self.encode_deregister_request = encode_deregister_request #take String return ClientMessage + self.registration_request = registration_request + self.decode_register_response = decode_register_response + self.encode_deregister_request = encode_deregister_request self.handler = handler self.connection_registrations = {} #Map ) @@ -41,7 +41,7 @@ def try_sync_connect_to_all_members(self): last_exception = None for member in cluster_service.members: try: - self._client.connection_manager.get_or_connect(member.address).result() #! + self._client.connection_manager.get_or_connect(member.address).result() except Exception as e: last_failed_member = member last_exception = e @@ -64,14 +64,9 @@ def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, la + " Client invocation timeout : " + str(invocation_time_out_millis) + " ms, " + " Elapsed time : " + str(elapsed_millis) + " ms. ", last_exception) else: - self.sleep_before_next_try() - - def sleep_before_next_try(self): - self._invocation_service.invocation_retry_pause() - pass + self._invocation_service.invocation_retry_pause() # sleep before next try def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); if self.is_smart: self.try_sync_connect_to_all_members() @@ -85,14 +80,13 @@ def register_listener(self, registration_request, decode_register_response, enco for connection in active_connections.values(): try: self.register_listener_on_connection(listener_registration, connection) - except Exception as e: + except: if connection.live(): self.deregister_listener(user_registration_id) raise HazelcastError("Listener cannot be added ") # cause'unu include etmeli miyim nasil (,e) return user_registration_id def register_listener_on_connection(self, listener_registration, connection): - # From JAVA: assert(Thread.currentThread().getName().contains("eventRegistration")); registration_map = listener_registration.connection_registrations if connection in registration_map: @@ -100,16 +94,22 @@ def register_listener_on_connection(self, listener_registration, connection): registration_request = listener_registration.registration_request future = self._invocation_service.invoke_on_connection(registration_request, connection, - event_handler=listener_registration.handler) - response = future.result() + event_handler=listener_registration.handler) - server_registration_id = listener_registration.decode_register_response(response) - correlation_id = registration_request.get_correlation_id() - registration = EventRegistration(server_registration_id, correlation_id) - registration_map[connection] = registration + def callback(f): + try: + response = f.result() + server_registration_id = listener_registration.decode_register_response(response) + correlation_id = registration_request.get_correlation_id() + registration = EventRegistration(server_registration_id, correlation_id) + registration_map[connection] = registration + except: + # Gotta handle exception + pass + + future.add_done_callback(callback) def deregister_listener(self, user_registration_id): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); check_not_none(user_registration_id, "Null userRegistrationId is not allowed!") with self._registration_lock: @@ -124,7 +124,7 @@ def deregister_listener(self, user_registration_id): self._invocation_service.invoke_on_connection(deregister_request, connection).result() self.remove_event_handler(event_registration.correlation_id) listener_registration.connection_registrations.pop(connection) - except Exception: + except: if connection.live(): successful = False self.logger.warning("Deregistration for listener with ID {} has failed to address {} ".format @@ -134,22 +134,18 @@ def deregister_listener(self, user_registration_id): return successful def connection_added(self, connection): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); with self._registration_lock: for listener_registration in self.active_registrations.values(): self.register_listener_on_connection(listener_registration, connection) def connection_removed(self, connection, _): - # From JAVA: assert (!Thread.currentThread().getName().contains("eventRegistration")); with self._registration_lock: for listener_registration in self.active_registrations.values(): event_registration = listener_registration.connection_registrations.pop(connection, None) if event_registration is not None: self.remove_event_handler(event_registration.correlation_id) - def start(self): # ? - # clientConnectionManager.addConnectionListener(this); - # ama pythonda add_listener parametre olarak listener almiyor. + def start(self): self._client.connection_manager.add_listener(self.connection_added, self.connection_removed) def handle_client_message(self, message): @@ -158,9 +154,6 @@ def handle_client_message(self, message): self.logger.warning("Got event message with unknown correlation id: %s", message, extra=self._logger_extras) event_handler = self._event_handlers.get(correlation_id) event_handler(message) - # Asagidakini yapmak yerine bu sekilde yaptim - # invocation = self._event_handlers[correlation_id] --> bundan nasil invocation cikabiliyor ki? - # self._handle_event(invocation, message) def add_event_handler(self, correlation_id, event_handler): self._event_handlers[correlation_id] = event_handler diff --git a/tests/listener_test.py b/tests/listener_test.py new file mode 100644 index 0000000000..180e05a9f4 --- /dev/null +++ b/tests/listener_test.py @@ -0,0 +1,116 @@ +from tests.base import HazelcastTestCase +from tests.util import configure_logging, random_string, event_collector, generate_key_owned_by_instance +from hazelcast.config import ClientConfig + + +class ListenerTest(HazelcastTestCase): + # @classmethod + # def setUpClass(cls): + # configure_logging() + # cls.rc = cls.create_rc() + # cls.cluster = cls.create_cluster(cls.rc, None) + # + # @classmethod + # def tearDownClass(cls): + # cls.rc.exit() + + def setUp(self): + configure_logging() + self.rc = self.create_rc() + self.cluster = self.create_cluster(self.rc, None) + self.m1 = self.cluster.start_member() + self.m2 = self.cluster.start_member() + self.m3 = self.cluster.start_member() + self.client_config = ClientConfig() + self.collector = event_collector() + + def tearDown(self): + self.rc.exit() + self.shutdown_all_clients() + + # -------------------------- test_remove_member ----------------------- # + def test_smart_listener_remove_member(self): + self.client_config.network_config.smart_routing = True + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + key_m1 = generate_key_owned_by_instance(client, self.m1.address) + map.put(key_m1, 'value1') + map.add_entry_listener(updated_func=self.collector) + self.m1.shutdown() + map.put(key_m1, 'value2') + + def assert_event(): + self.assertEqual(1, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + def test_non_smart_listener_remove_partition_owner_member(self): + self.client_config.network_config.smart_routing = False + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + key_m1 = generate_key_owned_by_instance(client, self.m1.address) + map.put(key_m1, 'value1') + map.add_entry_listener(updated_func=self.collector) + self.m1.shutdown() + map.put(key_m1, 'value2') + event_count = len(self.collector.events) + + def assert_event(): + self.assertEqual(1, event_count) + self.assertTrueEventually(assert_event) + + def test_non_smart_listener_remove_connection_owner_member(self): + self.client_config.network_config.smart_routing = False + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector, updated_func=self.collector) + + owner_address = client.cluster.owner_connection_address + generated_key = generate_key_owned_by_instance(client, owner_address) # it might not be necessary to generate + # key for this test. should i remove it to simplify the code? + map.put(generated_key, 'value1') + for m in [self.m1, self.m2, self.m3]: + if m.address == owner_address: + m.shutdown() + map.put(generated_key, 'value2') + + def assert_event(): + self.assertEqual(2, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + # -------------------------- test_add_member ----------------------- # + def test_smart_listener_add_member(self): + self.client_config.network_config.smart_routing = True + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + m4 = self.cluster.start_member() + key_m4 = generate_key_owned_by_instance(client, m4.address) + map.put(key_m4, 'value') + + def assert_event(): + self.assertEqual(1, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + def test_non_smart_listener_add_member(self): + self.client_config.network_config.smart_routing = True + client = self.create_client(self.client_config) + map = client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + m4 = self.cluster.start_member() + key_m4 = generate_key_owned_by_instance(client, m4.address) + map.put(key_m4, 'value') + + def assert_event(): + self.assertEqual(1, len(self.collector.events)) + self.assertTrueEventually(assert_event) + + # ------------------------------------------------------------------ # + # Helpers: + # def remove_member_test(self, client): + # map = client.get_map(random_string()).blocking() + # key_m1 = generate_key_owned_by_instance(client, self.m1.address) + # map.put(key_m1, 'value1') + # map.add_entry_listener(updated_func=self.collector) + # self.m1.shutdown() + # map.put(key_m1, 'value2') + # return len(self.collector.events) \ No newline at end of file diff --git a/tests/smart_listener_test.py b/tests/smart_listener_test.py new file mode 100644 index 0000000000..018725d819 --- /dev/null +++ b/tests/smart_listener_test.py @@ -0,0 +1,116 @@ +from tests.base import HazelcastTestCase +from tests.util import configure_logging, random_string, event_collector +from hazelcast.config import ClientConfig +from time import sleep + + +class SmartListenerTest(HazelcastTestCase): + @classmethod + def setUpClass(cls): + configure_logging() + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, None) # Default config + cls.m1 = cls.cluster.start_member() + cls.m2 = cls.cluster.start_member() + cls.m3 = cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + client_config = ClientConfig() + client_config.network_config.smart_routing = True + self.client = self.create_client(client_config) + self.collector = event_collector() + + def tearDown(self): + self.shutdown_all_clients() + + # -------------------------- test_local_only ----------------------- # + def test_list_smart_listener_local_only(self): + list = self.client.get_list(random_string()).blocking() + list.add_listener(item_added_func=self.collector) + list.add('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_map_smart_listener_local_only(self): + map = self.client.get_map(random_string()).blocking() + map.add_entry_listener(added_func=self.collector) + map.put('key', 'value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_multimap_smart_listener_local_only(self): + multimap = self.client.get_map(random_string()).blocking() + multimap.add_entry_listener(added_func=self.collector) + multimap.put('key', 'value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_queue_smart_listener_local_only(self): + queue = self.client.get_queue(random_string()).blocking() + queue.add_listener(item_added_func=self.collector) + queue.add('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_replicated_map_smart_listener_local_only(self): + replicated_map = self.client.get_replicated_map(random_string()).blocking() + replicated_map.add_entry_listener(added_func=self.collector) + replicated_map.put('key', 'value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_set_smart_listener_local_only(self): + set = self.client.get_set(random_string()).blocking() + set.add_listener(item_added_func=self.collector) + set.add('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + def test_topic_smart_listener_local_only(self): + topic = self.client.get_topic(random_string()).blocking() + topic.add_listener(on_message=self.collector) + topic.publish('item-value') + sleep(5) + self.assertEqual(1, len(self.collector.events)) + + """ + def test_non_smart_listeners_terminate_random_node(self): + pass + cluster = self.create_cluster(self.rc) + for i in range(3): + cluster.start_member() + client_config = self.get_non_smart_client_config() + self.create_client(client_config) + self.listeners_terminate_random_node() + + def test_smart_listeners_terminate_random_node(self): + pass + cluster = self.create_cluster(self.rc) + for i in range(3): + cluster.start_member() + client_config = self.get_smart_client_config() + self.create_client(client_config) + self.listeners_terminate_random_node() + + def listeners_terminate_random_node(self): + pass + + # Helpers: + def set_up_listener(self): + pass + + def get_smart_client_config(self): + client_config = ClientConfig() + client_config.network_config.connection_attempt_limit = MAXSIZE + client_config.network_config.redo_operation = True # Isn't this dangerous for non-idempotent operations? + return client_config + + def get_non_smart_client_config(self): + client_config = self.get_smart_client_config() + client_config.network_config.smart_routing = False + return client_config + """ From d332ffb083ec984cac290768de512d3a20c88737 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Tue, 30 Jul 2019 17:48:55 +0300 Subject: [PATCH 09/14] Fixes for connection related listener tests. --- hazelcast/listener.py | 58 +----------------------------------------- tests/listener_test.py | 58 ++++++++++-------------------------------- 2 files changed, 15 insertions(+), 101 deletions(-) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 378fb5f077..8928344037 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -159,60 +159,4 @@ def add_event_handler(self, correlation_id, event_handler): self._event_handlers[correlation_id] = event_handler def remove_event_handler(self, correlation_id): - self._event_handlers.pop(correlation_id, None) - - """ - def start_listening(self, request, event_handler, decode_add_listener, key=None): - if key: - partition_id = self._client.partition_service.get_partition_id(key) - invocation = ListenerInvocation(self, request, event_handler, decode_add_listener, - partition_id=partition_id) - else: - invocation = ListenerInvocation(self, request, event_handler, decode_add_listener) - - future = self._client.invoker.invoke(invocation) - registration_id = decode_add_listener(future.result()) - invocation.registration_id = registration_id # store the original registration id for future reference - self.registrations[registration_id] = (registration_id, request.get_correlation_id()) - return registration_id - - def stop_listening(self, registration_id, encode_remove_listener): - try: - actual_id, correlation_id = self.registrations.pop(registration_id) - self._client.invoker._remove_event_handler(correlation_id) - # TODO: should be invoked on same node as registration? - self._client.invoker.invoke_on_random_target(encode_remove_listener(actual_id)).result() - return True - except KeyError: - return False - """ - """ - # Gerekli mi? - def re_register_listener(self, invocation): - registration_id = invocation.registration_id - new_invocation = ListenerInvocation(self, invocation.request, invocation.event_handler, - invocation.response_decoder, partition_id=invocation.partition_id) - new_invocation.registration_id = registration_id - - # re-send the request - def callback(f): - if f.is_success(): - new_id = new_invocation.response_decoder(f.result()) - self.logger.debug("Re-registered listener with id %s and new_id %s for request %s", - registration_id, new_id, new_invocation.request, extra=self._logger_extras) - self.registrations[registration_id] = (new_id, new_invocation.request.get_correlation_id()) - else: - self.logger.warning("Re-registration for listener with id %s failed.", registration_id, exc_info=True, - extra=self._logger_extras) - - self.logger.debug("Re-registering listener %s for request %s", registration_id, new_invocation.request, - extra=self._logger_extras) - self._client.invoker.invoke(new_invocation).add_done_callback(callback) - """ - """ - # Gerekli mi? Bunun aynisi InvocationService'te de var. - def _init_invocation_timeout(self): - invocation_timeout = self._client.properties.get_seconds_positive_or_default( - self._client.properties.INVOCATION_TIMEOUT_SECONDS) - return invocation_timeout - """ \ No newline at end of file + self._event_handlers.pop(correlation_id, None) \ No newline at end of file diff --git a/tests/listener_test.py b/tests/listener_test.py index 180e05a9f4..6fffc35c5c 100644 --- a/tests/listener_test.py +++ b/tests/listener_test.py @@ -4,16 +4,6 @@ class ListenerTest(HazelcastTestCase): - # @classmethod - # def setUpClass(cls): - # configure_logging() - # cls.rc = cls.create_rc() - # cls.cluster = cls.create_cluster(cls.rc, None) - # - # @classmethod - # def tearDownClass(cls): - # cls.rc.exit() - def setUp(self): configure_logging() self.rc = self.create_rc() @@ -43,35 +33,26 @@ def assert_event(): self.assertEqual(1, len(self.collector.events)) self.assertTrueEventually(assert_event) - def test_non_smart_listener_remove_partition_owner_member(self): - self.client_config.network_config.smart_routing = False - client = self.create_client(self.client_config) - map = client.get_map(random_string()).blocking() - key_m1 = generate_key_owned_by_instance(client, self.m1.address) - map.put(key_m1, 'value1') - map.add_entry_listener(updated_func=self.collector) - self.m1.shutdown() - map.put(key_m1, 'value2') - event_count = len(self.collector.events) - - def assert_event(): - self.assertEqual(1, event_count) - self.assertTrueEventually(assert_event) - - def test_non_smart_listener_remove_connection_owner_member(self): + def test_non_smart_listener_remove_connected_member(self): self.client_config.network_config.smart_routing = False client = self.create_client(self.client_config) map = client.get_map(random_string()).blocking() - map.add_entry_listener(added_func=self.collector, updated_func=self.collector) + map.add_entry_listener(added_func=self.collector) owner_address = client.cluster.owner_connection_address - generated_key = generate_key_owned_by_instance(client, owner_address) # it might not be necessary to generate - # key for this test. should i remove it to simplify the code? - map.put(generated_key, 'value1') - for m in [self.m1, self.m2, self.m3]: + + # Test listener re-registers properly when owner connection is removed. + members = [self.m1, self.m2, self.m3] + for m in members: if m.address == owner_address: m.shutdown() - map.put(generated_key, 'value2') + members.remove(m) + + # There are 2 members left. We execute a put operation to each of their partitions + # to test that non-smart listener works in both local and non-local cases. + for m in members: + generated_key = generate_key_owned_by_instance(client, m.address) + map.put(generated_key, 'value') def assert_event(): self.assertEqual(2, len(self.collector.events)) @@ -102,15 +83,4 @@ def test_non_smart_listener_add_member(self): def assert_event(): self.assertEqual(1, len(self.collector.events)) - self.assertTrueEventually(assert_event) - - # ------------------------------------------------------------------ # - # Helpers: - # def remove_member_test(self, client): - # map = client.get_map(random_string()).blocking() - # key_m1 = generate_key_owned_by_instance(client, self.m1.address) - # map.put(key_m1, 'value1') - # map.add_entry_listener(updated_func=self.collector) - # self.m1.shutdown() - # map.put(key_m1, 'value2') - # return len(self.collector.events) \ No newline at end of file + self.assertTrueEventually(assert_event) \ No newline at end of file From ec250df1606fbbc618e0ce113e224bc53bfaf163 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Wed, 31 Jul 2019 17:26:37 +0300 Subject: [PATCH 10/14] smart_listener cleanup --- hazelcast/invocation.py | 16 ++++++--------- hazelcast/listener.py | 37 ++++++++++++++++++----------------- hazelcast/proxy/map.py | 28 +++++++++++--------------- hazelcast/proxy/multi_map.py | 18 ++++++++--------- tests/listener_test.py | 2 +- tests/smart_listener_test.py | 38 ------------------------------------ 6 files changed, 45 insertions(+), 94 deletions(-) diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index e52cbe2282..31a597b510 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -66,7 +66,7 @@ def __init__(self, client): self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} self._event_queue = queue.Queue() self._is_redo_operation = client.config.network_config.redo_operation - self.invocation_retry_pause = self._init_invocation_retry_pause() #eskiden private'ti public yaptim. + self.invocation_retry_pause = self._init_invocation_retry_pause() self.invocation_timeout = self._init_invocation_timeout() self._listener_service = None @@ -82,7 +82,8 @@ def start(self): self._listener_service = self._client.listener def invoke_on_connection(self, message, connection, ignore_heartbeat=False, event_handler=None): - return self.invoke(Invocation(self, message, connection=connection, event_handler=event_handler), ignore_heartbeat) + return self.invoke(Invocation(self, message, connection=connection, event_handler=event_handler), + ignore_heartbeat) def invoke_on_partition(self, message, partition_id, invocation_timeout=None): invocation = Invocation(self, message, partition_id=partition_id) @@ -111,7 +112,8 @@ def invoke_smart(self, invocation, ignore_heartbeat=False): self._send_to_address(invocation, addr) elif invocation.has_address(): if not self._is_member(invocation.address): - self._handle_exception(invocation, TargetNotMemberError("Target '{}' is not a member.".format(invocation.address))) + self._handle_exception(invocation, TargetNotMemberError("Target '{}' is not a member.".format + (invocation.address))) else: self._send_to_address(invocation, invocation.address) else: # send to random address @@ -135,12 +137,6 @@ def cleanup_connection(self, connection, cause): if invocation.sent_connection == connection: self._handle_exception(invocation, cause) - # TODO - # if self._client.lifecycle.is_live: - # for correlation_id, invocation in six.iteritems(dict(self._event_handlers)): - # if invocation.sent_connection == connection and invocation.connection is None: - # self._client.listener.re_register_listener(invocation) - def _init_invocation_retry_pause(self): invocation_retry_pause = self._client.properties.get_seconds_positive_or_default( self._client.properties.INVOCATION_RETRY_PAUSE_MILLIS) @@ -196,7 +192,7 @@ def _send(self, invocation, connection, ignore_heartbeat): try: connection.send_message(message) except IOError as e: - self._listener_service.remove_event_handler(correlation_id) # fail ederse event handleri temizlemek icin + self._listener_service.remove_event_handler(correlation_id) self._handle_exception(invocation, e) def _handle_client_message(self, message): diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 8928344037..425a5de0b0 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -12,7 +12,7 @@ def __init__(self, registration_request, decode_register_response, encode_deregi self.decode_register_response = decode_register_response self.encode_deregister_request = encode_deregister_request self.handler = handler - self.connection_registrations = {} #Map ) + self.connection_registrations = {} # Dict of Connection, EventRegistration class EventRegistration(object): @@ -29,7 +29,7 @@ def __init__(self, client): self._invocation_service = client.invoker self.is_smart = client.config.network_config.smart_routing self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} - self.active_registrations = {} # Map + self.active_registrations = {} # Dict of user_registration_id, ListenerRegistration self._registration_lock = threading.RLock() self._event_handlers = {} @@ -57,12 +57,10 @@ def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, la invocation_time_out_millis = self._invocation_service.invocation_timeout * 1000 timed_out = elapsed_millis > invocation_time_out_millis if timed_out: - raise OperationTimeoutError("Registering listeners is timed out." - + " Last failed member : " + str(last_failed_member) + ", " - + " Current time: " + str(now_in_millis) + ", " - + " Start time : " + str(start_millis) + ", " - + " Client invocation timeout : " + str(invocation_time_out_millis) + " ms, " - + " Elapsed time : " + str(elapsed_millis) + " ms. ", last_exception) + raise OperationTimeoutError\ + ("Registering listeners is timed out. Last failed member: %s, Current time: %s, Start time: %s, " + "Client invocation timeout: %s, Elapsed time: %s ms, Cause: %s", last_failed_member, now_in_millis, + start_millis, invocation_time_out_millis, elapsed_millis, last_exception.args[0]) else: self._invocation_service.invocation_retry_pause() # sleep before next try @@ -79,14 +77,15 @@ def register_listener(self, registration_request, decode_register_response, enco active_connections = self._client.connection_manager.connections for connection in active_connections.values(): try: - self.register_listener_on_connection(listener_registration, connection) + self.register_listener_on_connection_async(user_registration_id, listener_registration, connection)\ + .result() except: if connection.live(): self.deregister_listener(user_registration_id) - raise HazelcastError("Listener cannot be added ") # cause'unu include etmeli miyim nasil (,e) + raise HazelcastError("Listener cannot be added ") return user_registration_id - def register_listener_on_connection(self, listener_registration, connection): + def register_listener_on_connection_async(self, user_registration_id, listener_registration, connection): registration_map = listener_registration.connection_registrations if connection in registration_map: @@ -103,11 +102,13 @@ def callback(f): correlation_id = registration_request.get_correlation_id() registration = EventRegistration(server_registration_id, correlation_id) registration_map[connection] = registration - except: - # Gotta handle exception - pass + except Exception as e: + if connection.live(): + self.logger.warning("Listener %s can not be added to a new connection: %s, reason: %s", + user_registration_id, connection, e.args[0]) future.add_done_callback(callback) + return future def deregister_listener(self, user_registration_id): check_not_none(user_registration_id, "Null userRegistrationId is not allowed!") @@ -127,16 +128,16 @@ def deregister_listener(self, user_registration_id): except: if connection.live(): successful = False - self.logger.warning("Deregistration for listener with ID {} has failed to address {} ".format - (user_registration_id, "address"), exc_info=True, extra=self._logger_extras) + self.logger.warning("Deregistration for listener with ID %s has failed to address %s ", + user_registration_id, "address", exc_info=True, extra=self._logger_extras) if successful: self.active_registrations.pop(user_registration_id) return successful def connection_added(self, connection): with self._registration_lock: - for listener_registration in self.active_registrations.values(): - self.register_listener_on_connection(listener_registration, connection) + for user_reg_id, listener_registration in self.active_registrations.items(): + self.register_listener_on_connection_async(user_reg_id, listener_registration, connection) def connection_removed(self, connection, _): with self._registration_lock: diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index ff72d8cbd5..306404e3b1 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -85,17 +85,16 @@ def add_entry_listener(self, include_value=False, key=None, predicate=None, adde if key and predicate: key_data = self._to_data(key) predicate_data = self._to_data(predicate) - request = map_add_entry_listener_to_key_with_predicate_codec.encode_request(self.name, key_data, - predicate_data, include_value, - flags, self._is_smart) + request = map_add_entry_listener_to_key_with_predicate_codec.encode_request( + self.name, key_data, predicate_data, include_value, flags, self._is_smart) elif key and not predicate: key_data = self._to_data(key) - request = map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, include_value, flags, - self._is_smart) + request = map_add_entry_listener_to_key_codec.encode_request( + self.name, key_data, include_value, flags, self._is_smart) elif not key and predicate: predicate = self._to_data(predicate) - request = map_add_entry_listener_with_predicate_codec.encode_request(self.name, predicate, include_value, - flags, self._is_smart) + request = map_add_entry_listener_with_predicate_codec.encode_request( + self.name, predicate, include_value, flags, self._is_smart) else: request = map_add_entry_listener_codec.encode_request(self.name, include_value, flags, self._is_smart) @@ -922,19 +921,14 @@ def _on_destroy(self): super(MapFeatNearCache, self)._on_destroy() def _add_near_cache_invalidation_listener(self): - def handle(message): - map_add_near_cache_entry_listener_codec.handle(message, self._handle_invalidation, self._handle_batch_invalidation) - - def add_decode(message): - return map_add_near_cache_entry_listener_codec.decode_response(message)['response'] - - def remove_encode(reg_id): - return map_remove_entry_listener_codec.encode_request(self.name, reg_id) - try: request = map_add_near_cache_entry_listener_codec.encode_request(self.name, EntryEventType.invalidation, self._is_smart) - self._invalidation_listener_id = self._register_listener(request, add_decode, remove_encode, handle) + self._invalidation_listener_id = self._register_listener( + request, lambda r: map_add_near_cache_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: map_add_near_cache_entry_listener_codec.handle(m, self._handle_invalidation, + self._handle_batch_invalidation)) except: self.logger.severe("-----------------\n Near Cache is not initialized!!! \n-----------------") diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index 253ad1e1b4..bcbd6fdbed 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -30,12 +30,11 @@ def add_entry_listener(self, include_value=False, key=None, added_func=None, rem """ if key: key_data = self._to_data(key) - request = multi_map_add_entry_listener_to_key_codec.encode_request(name=self.name, key=key_data, - include_value=include_value, - local_only=False) + request = multi_map_add_entry_listener_to_key_codec.encode_request( + name=self.name, key=key_data, include_value=include_value, local_only=False) else: - request = multi_map_add_entry_listener_codec.encode_request(name=self.name, include_value=include_value, - local_only=False) + request = multi_map_add_entry_listener_codec.encode_request( + name=self.name, include_value=include_value, local_only=False) def handle_event_entry(**_kwargs): event = EntryEvent(self._to_object, **_kwargs) @@ -46,11 +45,10 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.clear_all and clear_all_func: clear_all_func(event) - return self._register_listener(request, - lambda r: multi_map_add_entry_listener_codec.decode_response(r)['response'], - lambda reg_id: multi_map_remove_entry_listener_codec.encode_request(self.name, - reg_id), - lambda m: multi_map_add_entry_listener_codec.handle(m, handle_event_entry)) + return self._register_listener( + request, lambda r: multi_map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: multi_map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: multi_map_add_entry_listener_codec.handle(m, handle_event_entry)) def contains_key(self, key): """ diff --git a/tests/listener_test.py b/tests/listener_test.py index 6fffc35c5c..84609293b1 100644 --- a/tests/listener_test.py +++ b/tests/listener_test.py @@ -41,7 +41,7 @@ def test_non_smart_listener_remove_connected_member(self): owner_address = client.cluster.owner_connection_address - # Test listener re-registers properly when owner connection is removed. + # Test if listener re-registers properly when owner connection is removed. members = [self.m1, self.m2, self.m3] for m in members: if m.address == owner_address: diff --git a/tests/smart_listener_test.py b/tests/smart_listener_test.py index 018725d819..d4bd41c73a 100644 --- a/tests/smart_listener_test.py +++ b/tests/smart_listener_test.py @@ -76,41 +76,3 @@ def test_topic_smart_listener_local_only(self): topic.publish('item-value') sleep(5) self.assertEqual(1, len(self.collector.events)) - - """ - def test_non_smart_listeners_terminate_random_node(self): - pass - cluster = self.create_cluster(self.rc) - for i in range(3): - cluster.start_member() - client_config = self.get_non_smart_client_config() - self.create_client(client_config) - self.listeners_terminate_random_node() - - def test_smart_listeners_terminate_random_node(self): - pass - cluster = self.create_cluster(self.rc) - for i in range(3): - cluster.start_member() - client_config = self.get_smart_client_config() - self.create_client(client_config) - self.listeners_terminate_random_node() - - def listeners_terminate_random_node(self): - pass - - # Helpers: - def set_up_listener(self): - pass - - def get_smart_client_config(self): - client_config = ClientConfig() - client_config.network_config.connection_attempt_limit = MAXSIZE - client_config.network_config.redo_operation = True # Isn't this dangerous for non-idempotent operations? - return client_config - - def get_non_smart_client_config(self): - client_config = self.get_smart_client_config() - client_config.network_config.smart_routing = False - return client_config - """ From a97c669984f36b35ea1dd3e20b7506a2d16bffc8 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Wed, 31 Jul 2019 17:43:02 +0300 Subject: [PATCH 11/14] Smart-listener cleanup --- hazelcast/listener.py | 2 +- hazelcast/proxy/topic.py | 9 ++++----- tests/proxy/map_test.py | 6 ++++++ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 425a5de0b0..50048e0991 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -111,7 +111,7 @@ def callback(f): return future def deregister_listener(self, user_registration_id): - check_not_none(user_registration_id, "Null userRegistrationId is not allowed!") + check_not_none(user_registration_id, "None userRegistrationId is not allowed!") with self._registration_lock: listener_registration = self.active_registrations.get(user_registration_id) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index b646f5a072..fb43e68935 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -31,11 +31,10 @@ def handle(item, publish_time, uuid): item_event = TopicMessage(self.name, item, publish_time, member, self._to_object) on_message(item_event) - return self._register_listener(request, - lambda r: topic_add_message_listener_codec.decode_response(r)['response'], - lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, - reg_id), - lambda m: topic_add_message_listener_codec.handle(m, handle)) + return self._register_listener( + request, lambda r: topic_add_message_listener_codec.decode_response(r)['response'], + lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, reg_id), + lambda m: topic_add_message_listener_codec.handle(m, handle)) def publish(self, message): """ diff --git a/tests/proxy/map_test.py b/tests/proxy/map_test.py index 971ecfa128..3b5fdcda96 100644 --- a/tests/proxy/map_test.py +++ b/tests/proxy/map_test.py @@ -408,6 +408,12 @@ def test_remove_entry_listener(self): time.sleep(1) self.assertEqual(len(collector.events), 1) + def test_remove_entry_listener_with_none_id(self): + with self.assertRaises(AssertionError) as cm: + self.map.remove_entry_listener(None) + e = cm.exception + self.assertEqual(e.args[0],"None userRegistrationId is not allowed!") + def test_replace(self): self.map.put("key", "value") From 7a5cb722fdcc095621d6aff645c88e1f25b5c481 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Wed, 31 Jul 2019 17:54:53 +0300 Subject: [PATCH 12/14] cleanup --- hazelcast/proxy/replicated_map.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index 1a040b1b0f..3c2f9db757 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -44,16 +44,15 @@ def add_entry_listener(self, key=None, predicate=None, added_func=None, removed_ if key and predicate: key_data = self._to_data(key) predicate_data = self._to_data(predicate) - request = replicated_map_add_entry_listener_to_key_with_predicate_codec.encode_request(self.name, key_data, - predicate_data, - self._is_smart) + request = replicated_map_add_entry_listener_to_key_with_predicate_codec.encode_request( + self.name, key_data, predicate_data, self._is_smart) elif key and not predicate: key_data = self._to_data(key) request = replicated_map_add_entry_listener_to_key_codec.encode_request(self.name, key_data, self._is_smart) elif not key and predicate: predicate = self._to_data(predicate) - request = replicated_map_add_entry_listener_with_predicate_codec.encode_request(self.name, predicate, - self._is_smart) + request = replicated_map_add_entry_listener_with_predicate_codec.encode_request( + self.name, predicate, self._is_smart) else: request = replicated_map_add_entry_listener_codec.encode_request(self.name, self._is_smart) @@ -70,11 +69,10 @@ def handle_event_entry(**_kwargs): elif event.event_type == EntryEventType.clear_all and clear_all_func: clear_all_func(event) - return self._register_listener(request, - lambda r: replicated_map_add_entry_listener_codec.decode_response(r)['response'], - lambda reg_id: replicated_map_remove_entry_listener_codec.encode_request( - self.name, reg_id), - lambda m: replicated_map_add_entry_listener_codec.handle(m, handle_event_entry)) + return self._register_listener( + request, lambda r: replicated_map_add_entry_listener_codec.decode_response(r)['response'], + lambda reg_id: replicated_map_remove_entry_listener_codec.encode_request(self.name, reg_id), + lambda m: replicated_map_add_entry_listener_codec.handle(m, handle_event_entry)) def clear(self): """ From 298df217534a7becac2ede0642f07e58c99dadf6 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Thu, 1 Aug 2019 17:38:43 +0300 Subject: [PATCH 13/14] fixes --- hazelcast/invocation.py | 3 ++- hazelcast/listener.py | 22 ++++++++++++---------- tests/listener_test.py | 2 +- tests/reconnect_test.py | 9 +++++++-- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 31a597b510..cc8e85e218 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -192,7 +192,8 @@ def _send(self, invocation, connection, ignore_heartbeat): try: connection.send_message(message) except IOError as e: - self._listener_service.remove_event_handler(correlation_id) + if invocation.event_handler is not None: + self._listener_service.remove_event_handler(correlation_id) self._handle_exception(invocation, e) def _handle_client_message(self, message): diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 50048e0991..1813a0ec31 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -4,6 +4,7 @@ from hazelcast.exception import OperationTimeoutError, HazelcastError from hazelcast.util import current_time_in_millis, check_not_none +from time import sleep class ListenerRegistration(object): @@ -29,7 +30,7 @@ def __init__(self, client): self._invocation_service = client.invoker self.is_smart = client.config.network_config.smart_routing self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name} - self.active_registrations = {} # Dict of user_registration_id, ListenerRegistration + self._active_registrations = {} # Dict of user_registration_id, ListenerRegistration self._registration_lock = threading.RLock() self._event_handlers = {} @@ -62,7 +63,7 @@ def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, la "Client invocation timeout: %s, Elapsed time: %s ms, Cause: %s", last_failed_member, now_in_millis, start_millis, invocation_time_out_millis, elapsed_millis, last_exception.args[0]) else: - self._invocation_service.invocation_retry_pause() # sleep before next try + sleep(self._invocation_service.invocation_retry_pause()) # sleep before next try def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): if self.is_smart: @@ -72,7 +73,7 @@ def register_listener(self, registration_request, decode_register_response, enco user_registration_id = str(uuid4()) listener_registration = ListenerRegistration(registration_request, decode_register_response, encode_deregister_request, handler) - self.active_registrations[user_registration_id] = listener_registration + self._active_registrations[user_registration_id] = listener_registration active_connections = self._client.connection_manager.connections for connection in active_connections.values(): @@ -105,7 +106,7 @@ def callback(f): except Exception as e: if connection.live(): self.logger.warning("Listener %s can not be added to a new connection: %s, reason: %s", - user_registration_id, connection, e.args[0]) + user_registration_id, connection, e.args[0], extra=self._logger_extras) future.add_done_callback(callback) return future @@ -114,7 +115,7 @@ def deregister_listener(self, user_registration_id): check_not_none(user_registration_id, "None userRegistrationId is not allowed!") with self._registration_lock: - listener_registration = self.active_registrations.get(user_registration_id) + listener_registration = self._active_registrations.get(user_registration_id) if listener_registration is None: return False successful = True @@ -131,17 +132,17 @@ def deregister_listener(self, user_registration_id): self.logger.warning("Deregistration for listener with ID %s has failed to address %s ", user_registration_id, "address", exc_info=True, extra=self._logger_extras) if successful: - self.active_registrations.pop(user_registration_id) + self._active_registrations.pop(user_registration_id) return successful def connection_added(self, connection): with self._registration_lock: - for user_reg_id, listener_registration in self.active_registrations.items(): + for user_reg_id, listener_registration in self._active_registrations.items(): self.register_listener_on_connection_async(user_reg_id, listener_registration, connection) def connection_removed(self, connection, _): with self._registration_lock: - for listener_registration in self.active_registrations.values(): + for listener_registration in self._active_registrations.values(): event_registration = listener_registration.connection_registrations.pop(connection, None) if event_registration is not None: self.remove_event_handler(event_registration.correlation_id) @@ -153,8 +154,9 @@ def handle_client_message(self, message): correlation_id = message.get_correlation_id() if correlation_id not in self._event_handlers: self.logger.warning("Got event message with unknown correlation id: %s", message, extra=self._logger_extras) - event_handler = self._event_handlers.get(correlation_id) - event_handler(message) + else: + event_handler = self._event_handlers.get(correlation_id) + event_handler(message) def add_event_handler(self, correlation_id, event_handler): self._event_handlers[correlation_id] = event_handler diff --git a/tests/listener_test.py b/tests/listener_test.py index 84609293b1..3a9adb730e 100644 --- a/tests/listener_test.py +++ b/tests/listener_test.py @@ -15,8 +15,8 @@ def setUp(self): self.collector = event_collector() def tearDown(self): - self.rc.exit() self.shutdown_all_clients() + self.rc.exit() # -------------------------- test_remove_member ----------------------- # def test_smart_listener_remove_member(self): diff --git a/tests/reconnect_test.py b/tests/reconnect_test.py index 342c26b7fb..4edfb508c2 100644 --- a/tests/reconnect_test.py +++ b/tests/reconnect_test.py @@ -2,7 +2,7 @@ from time import sleep from hazelcast import ClientConfig -from hazelcast.exception import HazelcastError +from hazelcast.exception import HazelcastError, TargetDisconnectedError from hazelcast.lifecycle import LIFECYCLE_STATE_DISCONNECTED, LIFECYCLE_STATE_CONNECTED from hazelcast.util import AtomicInteger from tests.base import HazelcastTestCase @@ -76,7 +76,12 @@ def test_listener_re_register(self): def assert_events(): if client.lifecycle.is_live: - map.put("key-%d" % count.get_and_increment(), "value").result() + while True: + try: + map.put("key-%d" % count.get_and_increment(), "value").result() + break + except TargetDisconnectedError: + pass self.assertGreater(len(collector.events), 0) else: self.fail("Client disconnected...") From 3521c71efce6211f6b90f00abb2680bf68f03212 Mon Sep 17 00:00:00 2001 From: Yagmur Dulger Date: Fri, 2 Aug 2019 12:14:11 +0300 Subject: [PATCH 14/14] fixed usage of invocation_retry_pause --- hazelcast/listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 1813a0ec31..1314d9e8b3 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -63,7 +63,7 @@ def time_out_or_sleep_before_next_try(self, start_millis, last_failed_member, la "Client invocation timeout: %s, Elapsed time: %s ms, Cause: %s", last_failed_member, now_in_millis, start_millis, invocation_time_out_millis, elapsed_millis, last_exception.args[0]) else: - sleep(self._invocation_service.invocation_retry_pause()) # sleep before next try + sleep(self._invocation_service.invocation_retry_pause) # sleep before next try def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): if self.is_smart: