Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
* [7.5. Distributed Events](#75-distributed-events)
* [7.5.1. Cluster Events](#751-cluster-events)
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
* [7.5.1.2. Listening for Lifecycle Events](#7512-listening-for-lifecycle-events)
* [7.5.1.2. Listenring for Distributed Object Events](#7512-listening-for-distributed-object-events)
* [7.5.1.3. Listening for Lifecycle Events](#7513-listening-for-lifecycle-events)
* [7.5.2. Distributed Data Structure Events](#752-distributed-data-structure-events)
* [7.5.2.1. Listening for Map Events](#7521-listening-for-map-events)
* [7.6. Distributed Computing](#76-distributed-computing)
Expand Down Expand Up @@ -1737,7 +1738,42 @@ The following is a membership listener registration by using the `add_listener()
client.cluster.add_listener(member_added=lambda m: print("Member Added: The address is {}".format(m.address)))
```

#### 7.5.1.2. Listening for Lifecycle Events
#### 7.5.1.2. Listening for Distributed Object Events

The events for distributed objects are invoked when they are created and destroyed in the cluster. When an event
is received, listener function will be called. The parameter passed into the listener function will be of the type
``DistributedObjectEvent``. A ``DistributedObjectEvent`` contains the following fields:
* ``name``: Name of the distributed object.
* ``service_name``: Service name of the distributed object.
* ``event_type``: Type of the invoked event. It is either ``CREATED`` or ``DESTROYED``.

The following is example of adding a distributed object listener to a client.

```python
def distributed_object_listener(event):
print("Distributed object event >>>", event.name, event.service_name, event.event_type)

client.add_distributed_object_listener(listener_func=distributed_object_listener)

map_name = "test_map"

# This call causes a CREATED event
test_map = client.get_map(map_name)

# This causes no event because map was already created
test_map2 = client.get_map(map_name)

# This causes a DESTROYED event
test_map.destroy()
```

**Output**
```
Distributed object event >>> test_map hz:impl:mapService CREATED
Distributed object event >>> test_map hz:impl:mapService DESTROYED
```

#### 7.5.1.3. Listening for Lifecycle Events

The `Lifecycle Listener` notifies for the following events:

Expand Down
Empty file.
28 changes: 28 additions & 0 deletions examples/monitoring/distributed_object_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import hazelcast


def distributed_object_listener(event):
print("Distributed object event >>>", event.name, event.service_name, event.event_type)


if __name__ == "__main__":
client = hazelcast.HazelcastClient()

# Register the listener
reg_id = client.add_distributed_object_listener(distributed_object_listener)

map_name = "test_map"

# This call causes a CREATED event
test_map = client.get_map(map_name)

# This causes no event because map was already created
test_map2 = client.get_map(map_name)

# This causes a DESTROYED event
test_map.destroy()

# Deregister the listener
client.remove_distributed_object_listener(reg_id)

client.shutdown()
44 changes: 44 additions & 0 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from hazelcast.cluster import ClusterService, RandomLoadBalancer
from hazelcast.config import ClientConfig, ClientProperties
from hazelcast.connection import ConnectionManager, Heartbeat, DefaultAddressProvider, DefaultAddressTranslator
from hazelcast.core import DistributedObjectInfo
from hazelcast.invocation import InvocationService
from hazelcast.listener import ListenerService
from hazelcast.lifecycle import LifecycleService, LIFECYCLE_STATE_SHUTTING_DOWN, LIFECYCLE_STATE_SHUTDOWN
from hazelcast.partition import PartitionService
from hazelcast.protocol.codec import client_get_distributed_objects_codec
from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \
REPLICATED_MAP_SERVICE, ATOMIC_LONG_SERVICE, ATOMIC_REFERENCE_SERVICE, RINGBUFFER_SERVICE, COUNT_DOWN_LATCH_SERVICE, \
TOPIC_SERVICE, RELIABLE_TOPIC_SERVICE, SEMAPHORE_SERVICE, LOCK_SERVICE, ID_GENERATOR_SERVICE, \
Expand Down Expand Up @@ -251,6 +253,48 @@ def new_transaction(self, timeout=120, durability=1, type=TWO_PHASE):
"""
return self.transaction_manager.new_transaction(timeout, durability, type)

def add_distributed_object_listener(self, listener_func):
"""
Adds a listener which will be notified when a
new distributed object is created or destroyed.
:param listener_func: Function to be called when a distributed object is created or destroyed.
:return: (str), a registration id which is used as a key to remove the listener.
"""
return self.proxy.add_distributed_object_listener(listener_func)

def remove_distributed_object_listener(self, registration_id):
"""
Removes the specified distributed object listener. Returns silently if there is no such listener added before.
:param registration_id: (str), id of registered listener.
:return: (bool), ``true`` if registration is removed, ``false`` otherwise.
"""
return self.proxy.remove_distributed_object_listener(registration_id)

def get_distributed_objects(self):
"""
Returns all distributed objects such as; queue, map, set, list, topic, lock, multimap.
Also, as a side effect, it clears the local instances of the destroyed proxies.
:return:(Sequence), List of instances created by Hazelcast.
"""
request = client_get_distributed_objects_codec.encode_request()
to_object = self.serialization_service.to_object
future = self.invoker.invoke_on_random_target(request)
response = client_get_distributed_objects_codec.decode_response(future.result(), to_object)["response"]

distributed_objects = self.proxy.get_distributed_objects()
local_distributed_object_infos = set()
for dist_obj in distributed_objects:
local_distributed_object_infos.add(DistributedObjectInfo(dist_obj.name, dist_obj.service_name))

for dist_obj_info in response:
local_distributed_object_infos.discard(dist_obj_info)
self.proxy.get_or_create(dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False)

for dist_obj_info in local_distributed_object_infos:
self.proxy.destroy_proxy(dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False)

return self.proxy.get_distributed_objects()

def shutdown(self):
"""
Shuts down this HazelcastClient.
Expand Down
34 changes: 34 additions & 0 deletions hazelcast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from hazelcast import six
from hazelcast import util
from hazelcast.util import enum


class Member(object):
Expand Down Expand Up @@ -55,6 +56,39 @@ def __init__(self, name, service_name):
def __repr__(self):
return "DistributedObjectInfo(name={}, serviceName={})".format(self.name, self.service_name)

def __hash__(self):
return hash((self.name, self.service_name))

def __eq__(self, other):
if isinstance(other, DistributedObjectInfo):
return self.name == other.name and self.service_name == other.service_name
return False


DistributedObjectEventType = enum(CREATED="CREATED", DESTROYED="DESTROYED")
"""
Type of the distributed object event.

* CREATED : DistributedObject is created.
* DESTROYED: DistributedObject is destroyed.
"""


class DistributedObjectEvent(object):
"""
Distributed Object Event
"""

def __init__(self, name, service_name, event_type):
self.name = name
self.service_name = service_name
self.event_type = DistributedObjectEventType.reverse.get(event_type, None)

def __repr__(self):
return "DistributedObjectEvent[name={}, " \
"service_name={}, " \
"event_type={}]".format(self.name, self.service_name, self.event_type)


class EntryView(object):
"""
Expand Down
52 changes: 42 additions & 10 deletions hazelcast/proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec
from hazelcast.core import DistributedObjectEvent
from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec, \
client_add_distributed_object_listener_codec, client_remove_distributed_object_listener_codec
from hazelcast.proxy.atomic_long import AtomicLong
from hazelcast.proxy.atomic_reference import AtomicReference
from hazelcast.proxy.count_down_latch import CountDownLatch
Expand All @@ -17,6 +19,7 @@
from hazelcast.proxy.topic import Topic
from hazelcast.proxy.pn_counter import PNCounter
from hazelcast.proxy.flake_id_generator import FlakeIdGenerator
from hazelcast.util import to_list

ATOMIC_LONG_SERVICE = "hz:impl:atomicLongService"
ATOMIC_REFERENCE_SERVICE = "hz:impl:atomicReferenceService"
Expand Down Expand Up @@ -66,31 +69,60 @@ def __init__(self, client):
self._client = client
self._proxies = {}

def get_or_create(self, service_name, name, **kwargs):
def get_or_create(self, service_name, name, create_on_remote=True, **kwargs):
ns = (service_name, name)
if ns in self._proxies:
return self._proxies[ns]

proxy = self.create_proxy(service_name, name, **kwargs)
proxy = self.create_proxy(service_name, name, create_on_remote, **kwargs)
self._proxies[ns] = proxy
return proxy

def create_proxy(self, service_name, name, **kwargs):
message = client_create_proxy_codec.encode_request(name=name, service_name=service_name,
target=self._find_next_proxy_address())
self._client.invoker.invoke_on_random_target(message).result()
def create_proxy(self, service_name, name, create_on_remote, **kwargs):
if create_on_remote:
message = client_create_proxy_codec.encode_request(name=name, service_name=service_name,
target=self._find_next_proxy_address())
self._client.invoker.invoke_on_random_target(message).result()

return _proxy_init[service_name](client=self._client, service_name=service_name, name=name, **kwargs)

def destroy_proxy(self, service_name, name):
def destroy_proxy(self, service_name, name, destroy_on_remote=True):
ns = (service_name, name)
try:
self._proxies.pop(ns)
message = client_destroy_proxy_codec.encode_request(name=name, service_name=service_name)
self._client.invoker.invoke_on_random_target(message).result()
if destroy_on_remote:
message = client_destroy_proxy_codec.encode_request(name=name, service_name=service_name)
self._client.invoker.invoke_on_random_target(message).result()
return True
except KeyError:
return False

def get_distributed_objects(self):
return to_list(self._proxies.values())

def add_distributed_object_listener(self, listener_func):
is_smart = self._client.config.network_config.smart_routing
request = client_add_distributed_object_listener_codec.encode_request(is_smart)

def handle_distributed_object_event(**kwargs):
event = DistributedObjectEvent(**kwargs)
listener_func(event)

def event_handler(client_message):
return client_add_distributed_object_listener_codec.handle(client_message, handle_distributed_object_event)

def decode_add_listener(response):
return client_add_distributed_object_listener_codec.decode_response(response)["response"]

def encode_remove_listener(registration_id):
return client_remove_distributed_object_listener_codec.encode_request(registration_id)

return self._client.listener.register_listener(request, decode_add_listener,
encode_remove_listener, event_handler)

def remove_distributed_object_listener(self, registration_id):
return self._client.listener.deregister_listener(registration_id)

def _find_next_proxy_address(self):
# TODO: filter out lite members
return self._client.load_balancer.next_address()
4 changes: 4 additions & 0 deletions hazelcast/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,7 @@ def create_git_info():
if GIT_COMMIT_ID:
return "(" + GIT_COMMIT_ID + ") "
return ""


def to_list(*args, **kwargs):
return list(*args, **kwargs)
5 changes: 5 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def assertEntryEvent(self, event, event_type, key=None, value=None, old_value=No
self.assertEqual(event.old_value, old_value)
self.assertEqual(event.number_of_affected_entries, number_of_affected_entries)

def assertDistributedObjectEvent(self, event, name, service_name, event_type):
self.assertEqual(name, event.name)
self.assertEqual(service_name, event.service_name)
self.assertEqual(event_type, event.event_type)

def set_logging_level(self, level):
logging.getLogger().setLevel(level)

Expand Down
Loading