Skip to content
Permalink
Browse files
IGNITE-15102 Implement event handling and monitoring for python thin …
…client - Fixes #46.
  • Loading branch information
ivandasch committed Jul 20, 2021
1 parent 8fc14f8 commit 82f29e202ea4526b49721993e6ea356640ef66e6
Showing 20 changed files with 1,047 additions and 173 deletions.
@@ -31,3 +31,4 @@ of `pyignite`, intended for end users.
datatypes/parsers
datatypes/cache_props
Exceptions <source/pyignite.exceptions>
Monitoring and handling events <source/pyignite.monitoring>
@@ -0,0 +1,20 @@
.. Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
pyignite.connection.protocol_context package
===========================

.. automodule:: pyignite.connection.protocol_context
:members:
@@ -20,3 +20,9 @@ pyignite.connection package
:members:
:undoc-members:
:show-inheritance:

Submodules
----------

.. toctree::
pyignite.connection.protocol_context
@@ -0,0 +1,21 @@
.. Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
pyignite.monitoring module
===========================

.. automodule:: pyignite.monitoring
:members:
:member-order: bysource
@@ -44,4 +44,5 @@ Submodules
pyignite.transaction
pyignite.cursors
pyignite.exceptions
pyignite.monitoring

@@ -16,7 +16,7 @@
import random
import sys
from itertools import chain
from typing import Iterable, Type, Union, Any, Dict, Optional
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence

from .aio_cluster import AioCluster
from .api import cache_get_node_partitions_async
@@ -60,7 +60,8 @@ class AioClient(BaseClient):
Asynchronous Client implementation.
"""

def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
event_listeners: Optional[Sequence] = None, **kwargs):
"""
Initialize client.
@@ -71,9 +72,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node, `True` by default.
server node, `True` by default,
:param event_listeners: (optional) event listeners.
"""
super().__init__(compact_footer, partition_aware, **kwargs)
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
self._registry_mux = asyncio.Lock()
self._affinity_query_mux = asyncio.Lock()

@@ -99,9 +101,8 @@ async def _connect(self, nodes):

# do not try to open more nodes
self._current_node = i

except connection_errors:
conn.failed = True
pass

self._nodes.append(conn)

@@ -301,7 +302,7 @@ async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> D
"""
for _ in range(AFFINITY_RETRIES or 1):
result = await cache_get_node_partitions_async(conn, caches)
if result.status == 0 and result.value['partition_mapping']:
if result.status == 0:
break
await asyncio.sleep(AFFINITY_DELAY)

@@ -341,7 +342,7 @@ async def get_best_node(

asyncio.ensure_future(
asyncio.gather(
*[conn.reconnect() for conn in self._nodes if not conn.alive],
*[node.reconnect() for node in self._nodes if not node.alive],
return_exceptions=True
)
)
@@ -44,7 +44,7 @@
import random
import re
from itertools import chain
from typing import Iterable, Type, Union, Any, Dict, Optional
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence

from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
@@ -66,6 +66,7 @@
get_field_by_id, unsigned
)
from .binary import GenericObjectMeta
from .monitoring import _EventListeners


__all__ = ['Client']
@@ -76,7 +77,8 @@ class BaseClient:
_identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
_ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)

def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs):
def __init__(self, compact_footer: bool = None, partition_aware: bool = False,
event_listeners: Optional[Sequence] = None, **kwargs):
self._compact_footer = compact_footer
self._partition_aware = partition_aware
self._connection_args = kwargs
@@ -87,6 +89,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
self.affinity_version = (0, 0)
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
self._protocol_context = None
self._event_listeners = _EventListeners(event_listeners)

@property
def protocol_context(self):
@@ -338,7 +341,8 @@ class Client(BaseClient):
Synchronous Client implementation.
"""

def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
event_listeners: Optional[Sequence] = None, **kwargs):
"""
Initialize client.
@@ -349,9 +353,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node, `True` by default.
server node, `True` by default,
:param event_listeners: (optional) event listeners.
"""
super().__init__(compact_footer, partition_aware, **kwargs)
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)

def connect(self, *args):
"""
@@ -382,7 +387,6 @@ def _connect(self, nodes):
self._current_node = i

except connection_errors:
conn.failed = True
if self.partition_aware:
# schedule the reconnection
conn.reconnect()
@@ -565,7 +569,7 @@ def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict:
"""
for _ in range(AFFINITY_RETRIES or 1):
result = cache_get_node_partitions(conn, caches)
if result.status == 0 and result.value['partition_mapping']:
if result.status == 0:
break
time.sleep(AFFINITY_DELAY)

@@ -608,9 +612,9 @@ def get_best_node(

self._update_affinity(full_affinity)

for conn in self._nodes:
if not conn.alive:
conn.reconnect()
for node in self._nodes:
if not node.alive:
node.reconnect()

c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
@@ -190,10 +190,10 @@ async def _connect(self):
self._on_handshake_fail(e)
raise e
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
self._on_handshake_fail(e)
raise e

self._on_handshake_success(result)
@@ -99,7 +99,9 @@ def _process_handshake_error(self, response):
def _on_handshake_start(self):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
self.host, self.port, self.client.protocol_context)
self.host, self.port, self.protocol_context)
if self._enabled_connection_listener:
self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context)

def _on_handshake_success(self, result):
features = BitmaskFeature.from_array(result.get('features', None))
@@ -109,24 +111,45 @@ def _on_handshake_success(self, result):

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
self.host, self.port, self.uuid, self.client.protocol_context)
self.host, self.port, self.uuid, self.protocol_context)
if self._enabled_connection_listener:
self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid)

def _on_handshake_fail(self, err):
self.failed = True

if isinstance(err, AuthenticationError):
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
self.host, self.port, err)
if self._enabled_connection_listener:
self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err)
else:
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
"with protocol context %s failed: %s",
self.host, self.port, self.client.protocol_context, err, exc_info=True)
self.host, self.port, self.protocol_context, err, exc_info=True)
if self._enabled_connection_listener:
self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err)

def _on_connection_lost(self, err=None, expected=False):
if expected and logger.isEnabledFor(logging.DEBUG):
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
self.host, self.port, self.uuid)
if expected:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
self.host, self.port, self.uuid)
if self._enabled_connection_listener:
self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid)
else:
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
self.host, self.port, self.uuid, err)
if self._enabled_connection_listener:
self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err)

@property
def _enabled_connection_listener(self):
return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener

@property
def _connection_listener(self):
return self.client._event_listeners


class Connection(BaseConnection):
@@ -216,10 +239,10 @@ def connect(self):
self._on_handshake_fail(e)
raise e
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
self._on_handshake_fail(e)
raise e

self._on_handshake_success(result)
@@ -260,7 +283,7 @@ def reconnect(self):
if self.alive:
return

self.close()
self.close(on_reconnect=True)

# connect and silence the connection errors
try:
@@ -352,7 +375,7 @@ def recv(self, flags=None, reconnect=True) -> bytearray:

return data

def close(self):
def close(self, on_reconnect=False):
"""
Try to mark socket closed, then unlink it. This is recommended but
not required, since sockets are automatically closed when
@@ -364,5 +387,6 @@ def close(self):
self._socket.close()
except connection_errors:
pass
self._on_connection_lost(expected=True)
if not on_reconnect and not self.failed:
self._on_connection_lost(expected=True)
self._socket = None
@@ -44,6 +44,9 @@ def _ensure_consistency(self):
if not self.is_feature_flags_supported():
self._features = None

def copy(self):
return ProtocolContext(self.version, self.features)

@property
def version(self):
return getattr(self, '_version', None)

0 comments on commit 82f29e2

Please sign in to comment.