Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ A Topic usage example is shown below.
print("Got message:", topic_message.message)

# Get a Topic called "my-distributed-topic"
topic = client.get_topic("my-distributed-topic")
topic = client.get_topic("my-distributed-topic").blocking()

# Add a Listener to the Topic
topic.add_listener(print_on_message)
Expand Down Expand Up @@ -2342,15 +2342,15 @@ client.

client.add_distributed_object_listener(
listener_func=distributed_object_listener
)
).result()

map_name = "test_map"

# This call causes a CREATED event
test_map = client.get_map(map_name)
test_map = client.get_map(map_name).blocking()

# This causes no event because map was already created
test_map2 = client.get_map(map_name)
test_map2 = client.get_map(map_name).blocking()

# This causes a DESTROYED event
test_map.destroy()
Expand Down Expand Up @@ -2483,7 +2483,7 @@ See the following example.


customer_map.add_entry_listener(include_value=True, clear_all_func=cleared)
customer_map.clear().result()
customer_map.clear()

7.6. Distributed Computing
--------------------------
Expand Down
4 changes: 2 additions & 2 deletions examples/monitoring/distributed_object_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def distributed_object_listener(event):
client = hazelcast.HazelcastClient()

# Register the listener
reg_id = client.add_distributed_object_listener(distributed_object_listener)
reg_id = client.add_distributed_object_listener(distributed_object_listener).result()

map_name = "test_map"

Expand All @@ -22,6 +22,6 @@ def distributed_object_listener(event):
test_map.destroy()

# De-register the listener
client.remove_distributed_object_listener(reg_id)
client.remove_distributed_object_listener(reg_id).result()

client.shutdown()
2 changes: 1 addition & 1 deletion examples/org-website/topic_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def print_on_message(topic_message):
# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1
hz = hazelcast.HazelcastClient()
# Get a Topic called "my-distributed-topic"
topic = hz.get_topic("my-distributed-topic")
topic = hz.get_topic("my-distributed-topic").blocking()
# Add a Listener to the Topic
topic.add_listener(print_on_message)
# Publish a message to the Topic
Expand Down
2 changes: 1 addition & 1 deletion examples/topic/topic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def on_message(event):

client = hazelcast.HazelcastClient()

topic = client.get_topic("topic")
topic = client.get_topic("topic").blocking()
topic.add_listener(on_message)

for i in range(10):
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ def add_distributed_object_listener(self, listener_func):
listener_func (function): Function to be called when a distributed object is created or destroyed.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
is_smart = self.config.smart_routing
request = client_add_distributed_object_listener_codec.encode_request(is_smart)
Expand Down Expand Up @@ -552,7 +552,7 @@ def remove_distributed_object_listener(self, registration_id):
registration_id (str): The id of registered listener.

Returns:
bool: ``True`` if registration is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise.
"""
return self._listener_service.deregister_listener(registration_id)

Expand Down
2 changes: 1 addition & 1 deletion hazelcast/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def _register_backup_listener(self):
self._listener_service.register_listener(request,
codec.decode_response,
lambda reg_id: None,
lambda m: codec.handle(m, self._backup_event_handler))
lambda m: codec.handle(m, self._backup_event_handler)).result()

def _backup_event_handler(self, correlation_id):
invocation = self._pending.get(correlation_id, None)
Expand Down
76 changes: 42 additions & 34 deletions hazelcast/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from uuid import uuid4

from hazelcast import six
from hazelcast.errors import HazelcastError
from hazelcast.future import combine_futures
from hazelcast.errors import HazelcastError, HazelcastClientNotActiveError, TargetDisconnectedError
from hazelcast.future import combine_futures, ImmediateFuture
from hazelcast.invocation import Invocation
from hazelcast.protocol.codec import client_add_cluster_view_listener_codec
from hazelcast.util import check_not_none
Expand Down Expand Up @@ -54,68 +54,76 @@ def register_listener(self, registration_request, decode_register_response, enco

futures = []
for connection in list(six.itervalues(self._connection_manager.active_connections)):
future = self._register_on_connection_async(registration_id, registration, connection)
future = self._register_on_connection(registration_id, registration, connection)
futures.append(future)

try:
combine_futures(futures).result()
except:
self.deregister_listener(registration_id)
raise HazelcastError("Listener cannot be added")
def handler(f):
try:
f.result()
return registration_id
except:
self.deregister_listener(registration_id)
raise HazelcastError("Listener cannot be added")

return registration_id
return combine_futures(futures).continue_with(handler)

def deregister_listener(self, user_registration_id):
check_not_none(user_registration_id, "None user_registration_id is not allowed!")

with self._registration_lock:
listener_registration = self._active_registrations.get(user_registration_id)
listener_registration = self._active_registrations.pop(user_registration_id, None)
if not listener_registration:
return False
return ImmediateFuture(False)

successful = True
# Need to copy items to avoid getting runtime modification errors
for connection, event_registration in list(six.iteritems(listener_registration.connection_registrations)):
try:
server_registration_id = event_registration.server_registration_id
deregister_request = listener_registration.encode_deregister_request(server_registration_id)
invocation = Invocation(deregister_request, connection=connection)
self._invocation_service.invoke(invocation)
invocation.future.result()
self.remove_event_handler(event_registration.correlation_id)
listener_registration.connection_registrations.pop(connection)
except:
if connection.live:
successful = False
_logger.warning("Deregistration for listener with ID %s has failed to address %s ",
user_registration_id, "address", exc_info=True)
if successful:
self._active_registrations.pop(user_registration_id)
for connection, event_registration in six.iteritems(listener_registration.connection_registrations):
# Remove local handler
self.remove_event_handler(event_registration.correlation_id)
# The rest is for deleting the remote registration
server_registration_id = event_registration.server_registration_id
deregister_request = listener_registration.encode_deregister_request(server_registration_id)
if deregister_request is None:
# None means no remote registration (e.g. for backup acks)
continue

invocation = Invocation(deregister_request, connection=connection, timeout=six.MAXSIZE, urgent=True)
self._invocation_service.invoke(invocation)

def handler(f):
e = f.exception()
if e:
if isinstance(e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)):
return

_logger.warning("Deregistration of listener with ID %s has failed for address %s",
user_registration_id, connection.remote_address)

invocation.future.add_done_callback(handler)

return successful
listener_registration.connection_registrations.clear()
return ImmediateFuture(True)

def handle_client_message(self, message, correlation_id):
handler = self._event_handlers.get(correlation_id, None)
if handler:
handler(message)
else:
_logger.warning("Got event message with unknown correlation id: %s", message)
_logger.debug("Got event message with unknown correlation id: %s", message)

def add_event_handler(self, correlation_id, event_handler):
self._event_handlers[correlation_id] = event_handler

def remove_event_handler(self, correlation_id):
self._event_handlers.pop(correlation_id, None)

def _register_on_connection_async(self, user_registration_id, listener_registration, connection):
def _register_on_connection(self, user_registration_id, listener_registration, connection):
registration_map = listener_registration.connection_registrations

if connection in registration_map:
return

registration_request = listener_registration.registration_request.copy()
invocation = Invocation(registration_request, connection=connection,
event_handler=listener_registration.handler, response_handler=lambda m: m)
event_handler=listener_registration.handler, response_handler=lambda m: m, urgent=True)
self._invocation_service.invoke(invocation)

def callback(f):
Expand All @@ -136,7 +144,7 @@ def callback(f):
def _connection_added(self, connection):
with self._registration_lock:
for user_reg_id, listener_registration in six.iteritems(self._active_registrations):
self._register_on_connection_async(user_reg_id, listener_registration, connection)
self._register_on_connection(user_reg_id, listener_registration, connection)

def _connection_removed(self, connection, _):
with self._registration_lock:
Expand Down
5 changes: 5 additions & 0 deletions hazelcast/protocol/client_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def drop_fragmentation_frame(self):
self.start_frame = self.start_frame.next
self._next_frame = self.start_frame

def __repr__(self):
message_type = self.get_message_type()
correlation_id = self.get_correlation_id()
return "InboundMessage(message_type=%s, correlation_id=%s)" % (message_type, correlation_id)


NULL_FRAME_BUF = bytearray(SIZE_OF_FRAME_LENGTH_AND_FLAGS)
LE_INT.pack_into(NULL_FRAME_BUF, 0, SIZE_OF_FRAME_LENGTH_AND_FLAGS)
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f
item_removed_func (function): To be called when an item is deleted from this list.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
request = list_add_listener_codec.encode_request(self.name, include_value, self._is_smart)

Expand Down Expand Up @@ -341,7 +341,7 @@ def remove_listener(self, registration_id):
registration_id (str): Id of the listener to be deleted.

Returns:
bool: ``True`` if the item listener is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if the item listener is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)

Expand Down
19 changes: 8 additions & 11 deletions hazelcast/proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def add_entry_listener(self, include_value=False, key=None, predicate=None, adde
loaded_func (function): Function to be called when an entry is loaded from a map loader.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
flags = get_entry_listener_flags(ADDED=added_func, REMOVED=removed_func, UPDATED=updated_func,
EVICTED=evicted_func, EXPIRED=expired_func, EVICT_ALL=evict_all_func,
Expand Down Expand Up @@ -850,7 +850,7 @@ def remove_entry_listener(self, registration_id):
registration_id (str): Id of registered listener.

Returns:
bool: ``True`` if registration is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)

Expand Down Expand Up @@ -1246,15 +1246,12 @@ def _on_destroy(self):
super(MapFeatNearCache, self)._on_destroy()

def _add_near_cache_invalidation_listener(self):
try:
codec = map_add_near_cache_invalidation_listener_codec
request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart)
self._invalidation_listener_id = self._register_listener(
request, lambda r: codec.decode_response(r),
lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation))
except:
pass
codec = map_add_near_cache_invalidation_listener_codec
request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart)
self._invalidation_listener_id = self._register_listener(
request, lambda r: codec.decode_response(r),
lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation)).result()

def _remove_near_cache_invalidation_listener(self):
if self._invalidation_listener_id:
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/multi_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def add_entry_listener(self, include_value=False, key=None, added_func=None, rem
clear_all_func (function): Function to be called when entries are cleared from map.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
if key:
codec = multi_map_add_entry_listener_to_key_codec
Expand Down Expand Up @@ -318,7 +318,7 @@ def remove_entry_listener(self, registration_id):
registration_id (str): Id of registered listener.

Returns:
bool: ``True`` if registration is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)

Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f
item_removed_func (function): Function to be called when an item is deleted from this set.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
codec = queue_add_listener_codec
request = codec.encode_request(self.name, include_value, self._is_smart)
Expand Down Expand Up @@ -306,7 +306,7 @@ def remove_listener(self, registration_id):
registration_id (str): Id of the listener to be deleted.

Returns:
bool: ``True`` if the item listener is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if the item listener is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)

Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/replicated_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def add_entry_listener(self, key=None, predicate=None, added_func=None, removed_
clear_all_func (function): Function to be called when entries are cleared from map.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
if key and predicate:
codec = replicated_map_add_entry_listener_to_key_with_predicate_codec
Expand Down Expand Up @@ -268,7 +268,7 @@ def remove_entry_listener(self, registration_id):
registration_id (str): Id of registered listener.

Returns:
bool: ``True`` if registration is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if registration is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)

Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def add_listener(self, include_value=False, item_added_func=None, item_removed_f
item_removed_func (function): Function to be called when an item is deleted from this set.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
request = set_add_listener_codec.encode_request(self.name, include_value, self._is_smart)

Expand Down Expand Up @@ -187,7 +187,7 @@ def remove_listener(self, registration_id):
registration_id (str): Id of the listener to be deleted.

Returns:
bool: ``True`` if the item listener is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if the item listener is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)

Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def add_listener(self, on_message=None):
on_message (function): Function to be called when a message is published.

Returns:
str: A registration id which is used as a key to remove the listener.
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
codec = topic_add_message_listener_codec
request = codec.encode_request(self.name, self._is_smart)
Expand Down Expand Up @@ -62,6 +62,6 @@ def remove_listener(self, registration_id):
registration_id (str): Registration id of the listener to be removed.

Returns:
bool: ``True`` if the listener is removed, ``False`` otherwise.
hazelcast.future.Future[bool]: ``True`` if the listener is removed, ``False`` otherwise.
"""
return self._deregister_listener(registration_id)
Loading