Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from hazelcast.cluster import ClusterService, RandomLoadBalancer
from hazelcast.config import ClientConfig, ClientProperties
from hazelcast.connection import ConnectionManager, Heartbeat, DefaultAddressProvider, DefaultAddressTranslator
from hazelcast.invocation import InvocationService, ListenerService
from hazelcast.invocation import InvocationService
from hazelcast.listener import ListenerService
from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN
from hazelcast.partition import PartitionService
from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \
Expand Down Expand Up @@ -61,8 +62,10 @@ def __init__(self, config=None):
def _start(self):
self.reactor.start()
try:
self.invoker.start()
self.cluster.start()
self.heartbeat.start()
self.listener.start()
self.partition_service.start()
self.statistics.start()
except:
Expand Down
4 changes: 1 addition & 3 deletions hazelcast/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import uuid

from hazelcast.exception import HazelcastError, AuthenticationError, TargetDisconnectedError
from hazelcast.invocation import ListenerInvocation
from hazelcast.lifecycle import LIFECYCLE_STATE_CONNECTED, LIFECYCLE_STATE_DISCONNECTED
from hazelcast.protocol.codec import client_add_membership_listener_codec, client_authentication_codec
from hazelcast.util import get_possible_addresses, get_provider_addresses, calculate_version
Expand Down Expand Up @@ -181,8 +180,7 @@ def _init_membership_listener(self, connection):
def handler(m):
client_add_membership_listener_codec.handle(m, self._handle_member, self._handle_member_list)

response = self._client.invoker.invoke(
ListenerInvocation(self._client.listener, request, handler, connection=connection)).result()
response = self._client.invoker.invoke_on_connection(request, connection, True, handler).result()
registration_id = client_add_membership_listener_codec.decode_response(response)["response"]
self.logger.debug("Registered membership listener with ID " + registration_id, extra=self._logger_extras)
self._initial_list_fetched.wait()
Expand Down
114 changes: 19 additions & 95 deletions hazelcast/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Invocation(object):
sent_connection = None
timer = None

def __init__(self, invocation_service, request, partition_id=-1, address=None, connection=None):
def __init__(self, invocation_service, request, partition_id=-1, address=None, connection=None, event_handler=None):
self._event = threading.Event()
self._invocation_timeout = invocation_service.invocation_timeout
self.timeout = self._invocation_timeout + time.time()
Expand All @@ -27,6 +27,7 @@ def __init__(self, invocation_service, request, partition_id=-1, address=None, c
self.partition_id = partition_id
self.request = request
self.future = Future()
self.event_handler = event_handler

def has_connection(self):
return self.connection is not None
Expand Down Expand Up @@ -55,90 +56,19 @@ def on_timeout(self):
self.set_exception(TimeoutError("Request timed out after %d seconds." % self._invocation_timeout))


class ListenerInvocation(Invocation):
# used for storing the original registration id across re-registrations
registration_id = None

def __init__(self, invocation_service, request, event_handler, response_decoder=None, **kwargs):
Invocation.__init__(self, invocation_service, request, **kwargs)
self.event_handler = event_handler
self.response_decoder = response_decoder


class ListenerService(object):
logger = logging.getLogger("HazelcastClient.ListenerService")

def __init__(self, client):
self._client = client
self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name}
self.registrations = {}
self.invocation_timeout = self._init_invocation_timeout()
pass

def start_listening(self, request, event_handler, decode_add_listener, key=None):
if key:
partition_id = self._client.partition_service.get_partition_id(key)
invocation = ListenerInvocation(self, request, event_handler, decode_add_listener,
partition_id=partition_id)
else:
invocation = ListenerInvocation(self, request, event_handler, decode_add_listener)

future = self._client.invoker.invoke(invocation)
registration_id = decode_add_listener(future.result())
invocation.registration_id = registration_id # store the original registration id for future reference
self.registrations[registration_id] = (registration_id, request.get_correlation_id())
return registration_id

def stop_listening(self, registration_id, encode_remove_listener):
try:
actual_id, correlation_id = self.registrations.pop(registration_id)
self._client.invoker._remove_event_handler(correlation_id)
# TODO: should be invoked on same node as registration?
self._client.invoker.invoke_on_random_target(encode_remove_listener(actual_id)).result()
return True
except KeyError:
return False

def re_register_listener(self, invocation):
registration_id = invocation.registration_id
new_invocation = ListenerInvocation(self, invocation.request, invocation.event_handler,
invocation.response_decoder, partition_id=invocation.partition_id)
new_invocation.registration_id = registration_id

# re-send the request
def callback(f):
if f.is_success():
new_id = new_invocation.response_decoder(f.result())
self.logger.debug("Re-registered listener with id %s and new_id %s for request %s",
registration_id, new_id, new_invocation.request, extra=self._logger_extras)
self.registrations[registration_id] = (new_id, new_invocation.request.get_correlation_id())
else:
self.logger.warning("Re-registration for listener with id %s failed.", registration_id, exc_info=True,
extra=self._logger_extras)

self.logger.debug("Re-registering listener %s for request %s", registration_id, new_invocation.request,
extra=self._logger_extras)
self._client.invoker.invoke(new_invocation).add_done_callback(callback)

def _init_invocation_timeout(self):
invocation_timeout = self._client.properties.get_seconds_positive_or_default(
self._client.properties.INVOCATION_TIMEOUT_SECONDS)
return invocation_timeout


class InvocationService(object):
logger = logging.getLogger("HazelcastClient.InvocationService")

def __init__(self, client):
self._pending = {}
self._event_handlers = {}
self._next_correlation_id = AtomicInteger(1)
self._client = client
self._logger_extras = {"client_name": client.name, "group_name": client.config.group_config.name}
self._event_queue = queue.Queue()
self._is_redo_operation = client.config.network_config.redo_operation
self._invocation_retry_pause = self._init_invocation_retry_pause()
self.invocation_retry_pause = self._init_invocation_retry_pause()
self.invocation_timeout = self._init_invocation_timeout()
self._listener_service = None

if client.config.network_config.smart_routing:
self.invoke = self.invoke_smart
Expand All @@ -148,8 +78,12 @@ def __init__(self, client):
self._client.connection_manager.add_listener(on_connection_closed=self.cleanup_connection)
client.heartbeat.add_listener(on_heartbeat_stopped=self._heartbeat_stopped)

def invoke_on_connection(self, message, connection, ignore_heartbeat=False):
return self.invoke(Invocation(self, message, connection=connection), ignore_heartbeat)
def start(self):
self._listener_service = self._client.listener

def invoke_on_connection(self, message, connection, ignore_heartbeat=False, event_handler=None):
return self.invoke(Invocation(self, message, connection=connection, event_handler=event_handler),
ignore_heartbeat)

def invoke_on_partition(self, message, partition_id, invocation_timeout=None):
invocation = Invocation(self, message, partition_id=partition_id)
Expand Down Expand Up @@ -178,7 +112,8 @@ def invoke_smart(self, invocation, ignore_heartbeat=False):
self._send_to_address(invocation, addr)
elif invocation.has_address():
if not self._is_member(invocation.address):
self._handle_exception(invocation, TargetNotMemberError("Target '{}' is not a member.".format(invocation.address)))
self._handle_exception(invocation, TargetNotMemberError("Target '{}' is not a member.".format
(invocation.address)))
else:
self._send_to_address(invocation, invocation.address)
else: # send to random address
Expand All @@ -202,11 +137,6 @@ def cleanup_connection(self, connection, cause):
if invocation.sent_connection == connection:
self._handle_exception(invocation, cause)

if self._client.lifecycle.is_live:
for correlation_id, invocation in six.iteritems(dict(self._event_handlers)):
if invocation.sent_connection == connection and invocation.connection is None:
self._client.listener.re_register_listener(invocation)

def _init_invocation_retry_pause(self):
invocation_retry_pause = self._client.properties.get_seconds_positive_or_default(
self._client.properties.INVOCATION_RETRY_PAUSE_MILLIS)
Expand All @@ -223,9 +153,6 @@ def _heartbeat_stopped(self, connection):
self._handle_exception(invocation,
TargetDisconnectedError("%s has stopped heart beating." % connection))

def _remove_event_handler(self, correlation_id):
self._event_handlers.pop(correlation_id)

def _send_to_address(self, invocation, address, ignore_heartbeat=False):
try:
conn = self._client.connection_manager.connections[address]
Expand All @@ -252,8 +179,8 @@ def _send(self, invocation, connection, ignore_heartbeat):
if not invocation.timer:
invocation.timer = self._client.reactor.add_timer_absolute(invocation.timeout, invocation.on_timeout)

if isinstance(invocation, ListenerInvocation):
self._event_handlers[correlation_id] = invocation
if invocation.event_handler is not None:
self._listener_service.add_event_handler(correlation_id, invocation.event_handler)

self.logger.debug("Sending %s to %s", message, connection, extra=self._logger_extras)

Expand All @@ -265,17 +192,14 @@ def _send(self, invocation, connection, ignore_heartbeat):
try:
connection.send_message(message)
except IOError as e:
if invocation.event_handler is not None:
self._listener_service.remove_event_handler(correlation_id)
self._handle_exception(invocation, e)

def _handle_client_message(self, message):
correlation_id = message.get_correlation_id()
if message.has_flags(LISTENER_FLAG):
if correlation_id not in self._event_handlers:
self.logger.warning("Got event message with unknown correlation id: %s", message,
extra=self._logger_extras)
return
invocation = self._event_handlers[correlation_id]
self._handle_event(invocation, message)
self._listener_service.handle_client_message(message)
return
if correlation_id not in self._pending:
self.logger.warning("Got message with unknown correlation id: %s", message, extra=self._logger_extras)
Expand Down Expand Up @@ -320,8 +244,8 @@ def _try_retry(self, invocation):

invoke_func = functools.partial(self.invoke, invocation)
self.logger.debug("Rescheduling request %s to be retried in %s seconds", invocation.request,
self._invocation_retry_pause, extra=self._logger_extras)
self._client.reactor.add_timer(self._invocation_retry_pause, invoke_func)
self.invocation_retry_pause, extra=self._logger_extras)
self._client.reactor.add_timer(self.invocation_retry_pause, invoke_func)
return True

def _is_member(self, address):
Expand Down
Loading