diff --git a/hazelcast/client.py b/hazelcast/client.py index e1308a7b83..82b0378344 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -11,12 +11,26 @@ from hazelcast.listener import ListenerService, ClusterViewListenerService from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService from hazelcast.partition import PartitionService, _InternalPartitionService -from hazelcast.protocol.codec import client_get_distributed_objects_codec, \ - client_add_distributed_object_listener_codec, client_remove_distributed_object_listener_codec -from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \ - REPLICATED_MAP_SERVICE, RINGBUFFER_SERVICE, \ - TOPIC_SERVICE, RELIABLE_TOPIC_SERVICE, \ - EXECUTOR_SERVICE, PN_COUNTER_SERVICE, FLAKE_ID_GENERATOR_SERVICE +from hazelcast.protocol.codec import ( + client_get_distributed_objects_codec, + client_add_distributed_object_listener_codec, + client_remove_distributed_object_listener_codec, +) +from hazelcast.proxy import ( + ProxyManager, + MAP_SERVICE, + QUEUE_SERVICE, + LIST_SERVICE, + SET_SERVICE, + MULTI_MAP_SERVICE, + REPLICATED_MAP_SERVICE, + RINGBUFFER_SERVICE, + TOPIC_SERVICE, + RELIABLE_TOPIC_SERVICE, + EXECUTOR_SERVICE, + PN_COUNTER_SERVICE, + FLAKE_ID_GENERATOR_SERVICE, +) from hazelcast.near_cache import NearCacheManager from hazelcast.reactor import AsyncoreReactor from hazelcast.serialization import SerializationServiceV1 @@ -313,45 +327,71 @@ def __init__(self, **kwargs): self._invocation_service = InvocationService(self, self._reactor) self._address_provider = self._create_address_provider() self._internal_partition_service = _InternalPartitionService(self) - self.partition_service = PartitionService(self._internal_partition_service, self._serialization_service) + self.partition_service = PartitionService( + self._internal_partition_service, self._serialization_service + ) self._internal_cluster_service = _InternalClusterService(self) self.cluster_service = ClusterService(self._internal_cluster_service) - self._connection_manager = ConnectionManager(self, self._reactor, self._address_provider, - self._internal_lifecycle_service, - self._internal_partition_service, - self._internal_cluster_service, - self._invocation_service, - self._near_cache_manager) + self._connection_manager = ConnectionManager( + self, + self._reactor, + self._address_provider, + self._internal_lifecycle_service, + self._internal_partition_service, + self._internal_cluster_service, + self._invocation_service, + self._near_cache_manager, + ) self._load_balancer = self._init_load_balancer(config) - self._listener_service = ListenerService(self, self._connection_manager, - self._invocation_service) + self._listener_service = ListenerService( + self, self._connection_manager, self._invocation_service + ) self._proxy_manager = ProxyManager(self._context) self.cp_subsystem = CPSubsystem(self._context) self._proxy_session_manager = ProxySessionManager(self._context) self._transaction_manager = TransactionManager(self._context) self._lock_reference_id_generator = AtomicInteger(1) - self._statistics = Statistics(self, self._reactor, self._connection_manager, - self._invocation_service, self._near_cache_manager) - self._cluster_view_listener = ClusterViewListenerService(self, self._connection_manager, - self._internal_partition_service, - self._internal_cluster_service, - self._invocation_service) + self._statistics = Statistics( + self, + self._reactor, + self._connection_manager, + self._invocation_service, + self._near_cache_manager, + ) + self._cluster_view_listener = ClusterViewListenerService( + self, + self._connection_manager, + self._internal_partition_service, + self._internal_cluster_service, + self._invocation_service, + ) self._shutdown_lock = threading.RLock() self._init_context() self._start() def _init_context(self): - self._context.init_context(self.config, self._invocation_service, self._internal_partition_service, - self._internal_cluster_service, self._connection_manager, - self._serialization_service, self._listener_service, self._proxy_manager, - self._near_cache_manager, self._lock_reference_id_generator, - self.name, self._proxy_session_manager, self._reactor) + self._context.init_context( + self.config, + self._invocation_service, + self._internal_partition_service, + self._internal_cluster_service, + self._connection_manager, + self._serialization_service, + self._listener_service, + self._proxy_manager, + self._near_cache_manager, + self._lock_reference_id_generator, + self.name, + self._proxy_session_manager, + self._reactor, + ) def _start(self): self._reactor.start() try: - self._invocation_service.init(self._internal_partition_service, self._connection_manager, - self._listener_service) + self._invocation_service.init( + self._internal_partition_service, self._connection_manager, self._listener_service + ) self._internal_lifecycle_service.start() membership_listeners = self.config.membership_listeners self._internal_cluster_service.start(self._connection_manager, membership_listeners) @@ -540,9 +580,12 @@ def handle_distributed_object_event(name, service_name, event_type, source): def event_handler(client_message): return codec.handle(client_message, handle_distributed_object_event) - return self._listener_service.register_listener(request, codec.decode_response, - client_remove_distributed_object_listener_codec.encode_request, - event_handler) + return self._listener_service.register_listener( + request, + codec.decode_response, + client_remove_distributed_object_listener_codec.encode_request, + event_handler, + ) def remove_distributed_object_listener(self, registration_id): """Removes the specified distributed object listener. @@ -573,14 +616,20 @@ def get_distributed_objects(self): distributed_objects = self._proxy_manager.get_distributed_objects() local_distributed_object_infos = set() for dist_obj in distributed_objects: - local_distributed_object_infos.add(DistributedObjectInfo(dist_obj.service_name, dist_obj.name)) + local_distributed_object_infos.add( + DistributedObjectInfo(dist_obj.service_name, dist_obj.name) + ) for dist_obj_info in response: local_distributed_object_infos.discard(dist_obj_info) - self._proxy_manager.get_or_create(dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False) + self._proxy_manager.get_or_create( + dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False + ) for dist_obj_info in local_distributed_object_infos: - self._proxy_manager.destroy_proxy(dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False) + self._proxy_manager.destroy_proxy( + dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False + ) return self._proxy_manager.get_distributed_objects() @@ -605,9 +654,11 @@ def _create_address_provider(self): cloud_discovery_token = config.cloud_discovery_token cloud_enabled = cloud_discovery_token is not None if address_list_provided and cloud_enabled: - raise IllegalStateError("Only one discovery method can be enabled at a time. " - "Cluster members given explicitly: %s, Hazelcast Cloud enabled: %s" - % (address_list_provided, cloud_enabled)) + raise IllegalStateError( + "Only one discovery method can be enabled at a time. " + "Cluster members given explicitly: %s, Hazelcast Cloud enabled: %s" + % (address_list_provided, cloud_enabled) + ) if cloud_enabled: connection_timeout = self._get_connection_timeout(config) @@ -654,10 +705,22 @@ def __init__(self): self.proxy_session_manager = None self.reactor = None - def init_context(self, config, invocation_service, partition_service, - cluster_service, connection_manager, serialization_service, - listener_service, proxy_manager, near_cache_manager, - lock_reference_id_generator, name, proxy_session_manager, reactor): + def init_context( + self, + config, + invocation_service, + partition_service, + cluster_service, + connection_manager, + serialization_service, + listener_service, + proxy_manager, + near_cache_manager, + lock_reference_id_generator, + name, + proxy_session_manager, + reactor, + ): self.config = config self.invocation_service = invocation_service self.partition_service = partition_service diff --git a/hazelcast/cluster.py b/hazelcast/cluster.py index a15b901b28..bf933812d3 100644 --- a/hazelcast/cluster.py +++ b/hazelcast/cluster.py @@ -37,7 +37,12 @@ def __init__(self, client_uuid, address, name, labels): self.labels = labels def __repr__(self): - return "ClientInfo(uuid=%s, address=%s, name=%s, labels=%s)" % (self.uuid, self.address, self.name, self.labels) + return "ClientInfo(uuid=%s, address=%s, name=%s, labels=%s)" % ( + self.uuid, + self.address, + self.name, + self.labels, + ) _EMPTY_SNAPSHOT = _MemberListSnapshot(-1, OrderedDict()) @@ -103,7 +108,6 @@ def get_members(self, member_selector=None): class _InternalClusterService(object): - def __init__(self, client): self._client = client self._connection_manager = None @@ -150,7 +154,9 @@ def get_local_client(self): connection_manager = self._connection_manager connection = connection_manager.get_random_connection() local_address = None if not connection else connection.local_address - return ClientInfo(connection_manager.client_uuid, local_address, self._client.name, self._labels) + return ClientInfo( + connection_manager.client_uuid, local_address, self._client.name, self._labels + ) def add_listener(self, member_added=None, member_removed=None, fire_for_existing=False): registration_id = str(uuid.uuid4()) @@ -192,8 +198,11 @@ def clear_member_list_version(self): def handle_members_view_event(self, version, member_infos): snapshot = self._create_snapshot(version, member_infos) if _logger.isEnabledFor(logging.DEBUG): - _logger.debug("Handling new snapshot with membership version: %s, member string: %s", - version, self._members_string(snapshot)) + _logger.debug( + "Handling new snapshot with membership version: %s, member string: %s", + version, + self._members_string(snapshot), + ) current = self._member_list_snapshot if version >= current.version: @@ -235,9 +244,14 @@ def _detect_membership_events(self, old, new): for dead_member in dead_members: connection = self._connection_manager.get_connection(dead_member.uuid) if connection: - connection.close(None, TargetDisconnectedError("The client has closed the connection to this member, " - "after receiving a member left event from the cluster. " - "%s" % connection)) + connection.close( + None, + TargetDisconnectedError( + "The client has closed the connection to this member, " + "after receiving a member left event from the cluster. " + "%s" % connection + ), + ) if (len(new_members) + len(dead_members)) > 0: if len(new.members) > 0: @@ -261,7 +275,7 @@ def _create_snapshot(version, member_infos): class VectorClock(object): """Vector clock consisting of distinct replica logical clocks. - + The vector clock may be read from different thread but concurrent updates must be synchronized externally. There is no guarantee for concurrent updates. @@ -309,7 +323,7 @@ def entry_set(self): """Returns the entry set of the replica timestamps in a format of list of tuples. Each tuple contains the replica ID and the timestamp associated with it. - + Returns: list: List of tuples. """ @@ -317,7 +331,7 @@ def entry_set(self): def size(self): """Returns the number of timestamps that are in the replica timestamps dictionary. - + Returns: int: Number of timestamps in the replica timestamps. """ diff --git a/hazelcast/config.py b/hazelcast/config.py index 51dd88c1c6..dffeb96c3f 100644 --- a/hazelcast/config.py +++ b/hazelcast/config.py @@ -4,7 +4,13 @@ from hazelcast.errors import InvalidConfigurationError from hazelcast.serialization.api import StreamSerializer, IdentifiedDataSerializable, Portable from hazelcast.serialization.portable.classdef import ClassDefinition -from hazelcast.util import check_not_none, number_types, LoadBalancer, none_type, try_to_get_enum_value +from hazelcast.util import ( + check_not_none, + number_types, + LoadBalancer, + none_type, + try_to_get_enum_value, +) class IntType(object): @@ -221,7 +227,7 @@ def unique_key(self, value): @property def unique_key_transformation(self): return self._unique_key_transformation - + @unique_key_transformation.setter def unique_key_transformation(self, value): self._unique_key_transformation = try_to_get_enum_value(value, UniqueKeyTransformation) @@ -233,12 +239,16 @@ def from_dict(cls, d): try: options.__setattr__(k, v) except AttributeError: - raise InvalidConfigurationError("Unrecognized config option for the bitmap index options: %s" % k) + raise InvalidConfigurationError( + "Unrecognized config option for the bitmap index options: %s" % k + ) return options def __repr__(self): - return "BitmapIndexOptions(unique_key=%s, unique_key_transformation=%s)" \ - % (self.unique_key, self.unique_key_transformation) + return "BitmapIndexOptions(unique_key=%s, unique_key_transformation=%s)" % ( + self.unique_key, + self.unique_key_transformation, + ) class IndexConfig(object): @@ -319,12 +329,18 @@ def from_dict(cls, d): try: config.__setattr__(k, v) except AttributeError: - raise InvalidConfigurationError("Unrecognized config option for the index config: %s" % k) + raise InvalidConfigurationError( + "Unrecognized config option for the index config: %s" % k + ) return config def __repr__(self): - return "IndexConfig(name=%s, type=%s, attributes=%s, bitmap_index_options=%s)" \ - % (self.name, self.type, self.attributes, self.bitmap_index_options) + return "IndexConfig(name=%s, type=%s, attributes=%s, bitmap_index_options=%s)" % ( + self.name, + self.type, + self.attributes, + self.bitmap_index_options, + ) class IndexUtil(object): @@ -353,7 +369,9 @@ def validate_and_normalize(map_name, index_config): if len(original_attributes) > IndexUtil._MAX_ATTRIBUTES: raise ValueError( - "Index cannot have more than %s attributes %s" % (IndexUtil._MAX_ATTRIBUTES, index_config)) + "Index cannot have more than %s attributes %s" + % (IndexUtil._MAX_ATTRIBUTES, index_config) + ) if index_config.type == IndexType.BITMAP and len(original_attributes) > 1: raise ValueError("Composite bitmap indexes are not supported: %s" % index_config) @@ -372,12 +390,16 @@ def validate_and_normalize(map_name, index_config): else: duplicate_original_attribute = original_attributes[idx] if duplicate_original_attribute == original_attribute: - raise ValueError("Duplicate attribute name [attribute_name=%s, index_config=%s]" - % (original_attribute, index_config)) + raise ValueError( + "Duplicate attribute name [attribute_name=%s, index_config=%s]" + % (original_attribute, index_config) + ) else: - raise ValueError("Duplicate attribute names [attribute_name1=%s, attribute_name2=%s, " - "index_config=%s]" - % (duplicate_original_attribute, original_attribute, index_config)) + raise ValueError( + "Duplicate attribute names [attribute_name1=%s, attribute_name2=%s, " + "index_config=%s]" + % (duplicate_original_attribute, original_attribute, index_config) + ) normalized_attributes.append(normalized_attribute) @@ -385,15 +407,18 @@ def validate_and_normalize(map_name, index_config): if name and not name.strip(): name = None - normalized_config = IndexUtil.build_normalized_config(map_name, index_config.type, name, - normalized_attributes) + normalized_config = IndexUtil.build_normalized_config( + map_name, index_config.type, name, normalized_attributes + ) if index_config.type == IndexType.BITMAP: unique_key = index_config.bitmap_index_options.unique_key unique_key_transformation = index_config.bitmap_index_options.unique_key_transformation IndexUtil.validate_attribute(unique_key) unique_key = IndexUtil.canonicalize_attribute(unique_key) normalized_config.bitmap_index_options.unique_key = unique_key - normalized_config.bitmap_index_options.unique_key_transformation = unique_key_transformation + normalized_config.bitmap_index_options.unique_key_transformation = ( + unique_key_transformation + ) return normalized_config @@ -406,7 +431,11 @@ def build_normalized_config(map_name, index_type, index_name, normalized_attribu new_config = IndexConfig() new_config.type = index_type - name = map_name + "_" + IndexUtil._index_type_to_name(index_type) if index_name is None else None + name = ( + map_name + "_" + IndexUtil._index_type_to_name(index_type) + if index_name is None + else None + ) for normalized_attribute in normalized_attributes: new_config.add_attribute(normalized_attribute) if name: @@ -431,22 +460,55 @@ def _index_type_to_name(index_type): class _Config(object): - __slots__ = ("_cluster_members", "_cluster_name", "_client_name", - "_connection_timeout", "_socket_options", "_redo_operation", - "_smart_routing", "_ssl_enabled", "_ssl_cafile", - "_ssl_certfile", "_ssl_keyfile", "_ssl_password", - "_ssl_protocol", "_ssl_ciphers", "_cloud_discovery_token", - "_async_start", "_reconnect_mode", "_retry_initial_backoff", - "_retry_max_backoff", "_retry_jitter", "_retry_multiplier", - "_cluster_connect_timeout", "_portable_version", "_data_serializable_factories", - "_portable_factories", "_class_definitions", "_check_class_definition_errors", - "_is_big_endian", "_default_int_type", "_global_serializer", - "_custom_serializers", "_near_caches", "_load_balancer", - "_membership_listeners", "_lifecycle_listeners", "_flake_id_generators", - "_labels", "_heartbeat_interval", "_heartbeat_timeout", - "_invocation_timeout", "_invocation_retry_pause", "_statistics_enabled", - "_statistics_period", "_shuffle_member_list", "_backup_ack_to_client_enabled", - "_operation_backup_timeout", "_fail_on_indeterminate_operation_state") + __slots__ = ( + "_cluster_members", + "_cluster_name", + "_client_name", + "_connection_timeout", + "_socket_options", + "_redo_operation", + "_smart_routing", + "_ssl_enabled", + "_ssl_cafile", + "_ssl_certfile", + "_ssl_keyfile", + "_ssl_password", + "_ssl_protocol", + "_ssl_ciphers", + "_cloud_discovery_token", + "_async_start", + "_reconnect_mode", + "_retry_initial_backoff", + "_retry_max_backoff", + "_retry_jitter", + "_retry_multiplier", + "_cluster_connect_timeout", + "_portable_version", + "_data_serializable_factories", + "_portable_factories", + "_class_definitions", + "_check_class_definition_errors", + "_is_big_endian", + "_default_int_type", + "_global_serializer", + "_custom_serializers", + "_near_caches", + "_load_balancer", + "_membership_listeners", + "_lifecycle_listeners", + "_flake_id_generators", + "_labels", + "_heartbeat_interval", + "_heartbeat_timeout", + "_invocation_timeout", + "_invocation_retry_pause", + "_statistics_enabled", + "_statistics_period", + "_shuffle_member_list", + "_backup_ack_to_client_enabled", + "_operation_backup_timeout", + "_fail_on_indeterminate_operation_state", + ) def __init__(self): self._cluster_members = [] @@ -785,11 +847,17 @@ def data_serializable_factories(self, value): for class_id, clazz in six.iteritems(factory): if not isinstance(class_id, six.integer_types): - raise TypeError("Keys of factories of data_serializable_factories must be integers") - - if not (isinstance(clazz, type) and issubclass(clazz, IdentifiedDataSerializable)): - raise TypeError("Values of factories of data_serializable_factories must be " - "subclasses of IdentifiedDataSerializable") + raise TypeError( + "Keys of factories of data_serializable_factories must be integers" + ) + + if not ( + isinstance(clazz, type) and issubclass(clazz, IdentifiedDataSerializable) + ): + raise TypeError( + "Values of factories of data_serializable_factories must be " + "subclasses of IdentifiedDataSerializable" + ) self._data_serializable_factories = value else: @@ -814,8 +882,10 @@ def portable_factories(self, value): raise TypeError("Keys of factories of portable_factories must be integers") if not (isinstance(clazz, type) and issubclass(clazz, Portable)): - raise TypeError("Values of factories of portable_factories must be " - "subclasses of Portable") + raise TypeError( + "Values of factories of portable_factories must be " + "subclasses of Portable" + ) self._portable_factories = value else: @@ -830,7 +900,9 @@ def class_definitions(self, value): if isinstance(value, list): for cd in value: if not isinstance(cd, ClassDefinition): - raise TypeError("class_definitions must contain objects of type ClassDefinition") + raise TypeError( + "class_definitions must contain objects of type ClassDefinition" + ) self._class_definitions = value else: @@ -889,7 +961,9 @@ def custom_serializers(self, value): raise TypeError("Keys of custom_serializers must be types") if not (isinstance(serializer, type) and issubclass(serializer, StreamSerializer)): - raise TypeError("Values of custom_serializers must be subclasses of StreamSerializer") + raise TypeError( + "Values of custom_serializers must be subclasses of StreamSerializer" + ) self._custom_serializers = value else: @@ -939,10 +1013,14 @@ def membership_listeners(self, value): try: added, removed = item except TypeError: - raise TypeError("membership_listeners must contain tuples of length 2 as items") + raise TypeError( + "membership_listeners must contain tuples of length 2 as items" + ) if not (callable(added) or callable(removed)): - raise TypeError("At least one of the listeners in the tuple most be callable") + raise TypeError( + "At least one of the listeners in the tuple most be callable" + ) self._membership_listeners = value except ValueError: @@ -1137,9 +1215,16 @@ def from_dict(cls, d): class _NearCacheConfig(object): - __slots__ = ("_invalidate_on_change", "_in_memory_format", "_time_to_live", "_max_idle", - "_eviction_policy", "_eviction_max_size", "_eviction_sampling_count", - "_eviction_sampling_pool_size") + __slots__ = ( + "_invalidate_on_change", + "_in_memory_format", + "_time_to_live", + "_max_idle", + "_eviction_policy", + "_eviction_max_size", + "_eviction_sampling_count", + "_eviction_sampling_pool_size", + ) def __init__(self): self._invalidate_on_change = True @@ -1250,7 +1335,9 @@ def from_dict(cls, d): try: config.__setattr__(k, v) except AttributeError: - raise InvalidConfigurationError("Unrecognized config option for the near cache: %s" % k) + raise InvalidConfigurationError( + "Unrecognized config option for the near cache: %s" % k + ) return config @@ -1294,6 +1381,7 @@ def from_dict(cls, d): try: config.__setattr__(k, v) except AttributeError: - raise InvalidConfigurationError("Unrecognized config option for the flake id generator: %s" % k) + raise InvalidConfigurationError( + "Unrecognized config option for the flake id generator: %s" % k + ) return config - diff --git a/hazelcast/connection.py b/hazelcast/connection.py index be289030a0..6a2f043599 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -10,13 +10,24 @@ from hazelcast.config import ReconnectMode from hazelcast.core import AddressHelper, CLIENT_TYPE, SERIALIZATION_VERSION -from hazelcast.errors import AuthenticationError, TargetDisconnectedError, HazelcastClientNotActiveError, \ - InvalidConfigurationError, ClientNotAllowedInClusterError, IllegalStateError, ClientOfflineError +from hazelcast.errors import ( + AuthenticationError, + TargetDisconnectedError, + HazelcastClientNotActiveError, + InvalidConfigurationError, + ClientNotAllowedInClusterError, + IllegalStateError, + ClientOfflineError, +) from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture from hazelcast.invocation import Invocation from hazelcast.lifecycle import LifecycleState -from hazelcast.protocol.client_message import SIZE_OF_FRAME_LENGTH_AND_FLAGS, Frame, InboundMessage, \ - ClientMessageBuilder +from hazelcast.protocol.client_message import ( + SIZE_OF_FRAME_LENGTH_AND_FLAGS, + Frame, + InboundMessage, + ClientMessageBuilder, +) from hazelcast.protocol.codec import client_authentication_codec, client_ping_codec from hazelcast.util import AtomicInteger, calculate_version, UNKNOWN_VERSION from hazelcast import six, __version__ @@ -25,8 +36,7 @@ class _WaitStrategy(object): - def __init__(self, initial_backoff, max_backoff, multiplier, - cluster_connect_timeout, jitter): + def __init__(self, initial_backoff, max_backoff, multiplier, cluster_connect_timeout, jitter): self._initial_backoff = initial_backoff self._max_backoff = max_backoff self._multiplier = multiplier @@ -46,16 +56,27 @@ def sleep(self): now = time.time() time_passed = now - self._cluster_connect_attempt_begin if time_passed > self._cluster_connect_timeout: - _logger.warning("Unable to get live cluster connection, cluster connect timeout (%d) is reached. " - "Attempt %d.", self._cluster_connect_timeout, self._attempt) + _logger.warning( + "Unable to get live cluster connection, cluster connect timeout (%d) is reached. " + "Attempt %d.", + self._cluster_connect_timeout, + self._attempt, + ) return False # random between (-jitter * current_backoff, jitter * current_backoff) - sleep_time = self._current_backoff + self._current_backoff * self._jitter * (2 * random.random() - 1) + sleep_time = self._current_backoff + self._current_backoff * self._jitter * ( + 2 * random.random() - 1 + ) sleep_time = min(sleep_time, self._cluster_connect_timeout - time_passed) - _logger.warning("Unable to get live cluster connection, retry in %ds, attempt: %d, " - "cluster connect timeout: %ds, max backoff: %ds", - sleep_time, self._attempt, self._cluster_connect_timeout, self._max_backoff) + _logger.warning( + "Unable to get live cluster connection, retry in %ds, attempt: %d, " + "cluster connect timeout: %ds, max backoff: %ds", + sleep_time, + self._attempt, + self._cluster_connect_timeout, + self._max_backoff, + ) time.sleep(sleep_time) self._current_backoff = min(self._current_backoff * self._multiplier, self._max_backoff) return True @@ -71,9 +92,17 @@ class _AuthenticationStatus(object): class ConnectionManager(object): """ConnectionManager is responsible for managing ``Connection`` objects.""" - def __init__(self, client, reactor, address_provider, lifecycle_service, - partition_service, cluster_service, invocation_service, - near_cache_manager): + def __init__( + self, + client, + reactor, + address_provider, + lifecycle_service, + partition_service, + cluster_service, + invocation_service, + near_cache_manager, + ): self.live = False self.active_connections = dict() # uuid to connection, must be modified under the _lock self.client_uuid = uuid.uuid4() @@ -96,7 +125,7 @@ def __init__(self, client, reactor, address_provider, lifecycle_service, self._async_start = config.async_start self._connect_to_cluster_thread_running = False self._pending_connections = dict() # must be modified under the _lock - self._addresses_to_connections = dict() # address to connection, must be modified under the _lock + self._addresses_to_connections = dict() # must be modified under the _lock self._shuffle_member_list = config.shuffle_member_list self._lock = threading.RLock() self._connection_id_generator = AtomicInteger() @@ -159,7 +188,9 @@ def shutdown(self): with self._lock: for connection_future in six.itervalues(self._pending_connections): - connection_future.set_exception(HazelcastClientNotActiveError("Hazelcast client is shutting down")) + connection_future.set_exception( + HazelcastClientNotActiveError("Hazelcast client is shutting down") + ) # Need to create copy of connection values to avoid modification errors on runtime for connection in list(six.itervalues(self.active_connections)): @@ -187,8 +218,11 @@ def on_connection_close(self, closed_connection, cause): remote_address = closed_connection.remote_address if not connected_address: - _logger.debug("Destroying %s, but it has no remote address, hence nothing is " - "removed from the connection dictionary", closed_connection) + _logger.debug( + "Destroying %s, but it has no remote address, hence nothing is " + "removed from the connection dictionary", + closed_connection, + ) with self._lock: pending = self._pending_connections.pop(connected_address, None) @@ -199,8 +233,12 @@ def on_connection_close(self, closed_connection, cause): pending.set_exception(cause) if connection: - _logger.info("Removed connection to %s:%s, connection: %s", - connected_address, remote_uuid, connection) + _logger.info( + "Removed connection to %s:%s, connection: %s", + connected_address, + remote_uuid, + connection, + ) if not self.active_connections: self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) self._trigger_cluster_reconnection() @@ -214,8 +252,11 @@ def on_connection_close(self, closed_connection, cause): _logger.exception("Exception in connection listener") else: if remote_uuid: - _logger.debug("Destroying %s, but there is no mapping for %s in the connection dictionary", - closed_connection, remote_uuid) + _logger.debug( + "Destroying %s, but there is no mapping for %s in the connection dictionary", + closed_connection, + remote_uuid, + ) def check_invocation_allowed(self): if self.active_connections: @@ -236,8 +277,13 @@ def _trigger_cluster_reconnection(self): self._start_connect_to_cluster_thread() def _init_wait_strategy(self, config): - return _WaitStrategy(config.retry_initial_backoff, config.retry_max_backoff, config.retry_multiplier, - config.cluster_connect_timeout, config.retry_jitter) + return _WaitStrategy( + config.retry_initial_backoff, + config.retry_max_backoff, + config.retry_multiplier, + config.cluster_connect_timeout, + config.retry_jitter, + ) def _start_connect_all_members_timer(self): connecting_addresses = set() @@ -249,7 +295,10 @@ def run(): for member in self._cluster_service.get_members(): address = member.address - if not self.get_connection_from_address(address) and address not in connecting_addresses: + if ( + not self.get_connection_from_address(address) + and address not in connecting_addresses + ): connecting_addresses.add(address) if not self._lifecycle_service.running: break @@ -291,7 +340,7 @@ def run(): _logger.exception("Could not connect to any cluster, shutting down the client") self._shutdown_client() - t = threading.Thread(target=run, name='hazelcast_async_connection') + t = threading.Thread(target=run, name="hazelcast_async_connection") t.daemon = True t.start() @@ -323,8 +372,12 @@ def _sync_connect_to_cluster(self): _logger.exception("Stopped trying on cluster %s", cluster_name) cluster_name = self._client.config.cluster_name - _logger.info("Unable to connect to any address from the cluster with name: %s. " - "The following addresses were tried: %s", cluster_name, tried_addresses) + _logger.info( + "Unable to connect to any address from the cluster with name: %s. " + "The following addresses were tried: %s", + cluster_name, + tried_addresses, + ) if self._lifecycle_service.running: msg = "Unable to connect to any cluster" else: @@ -359,18 +412,26 @@ def _get_or_connect(self, address): try: translated = self._address_provider.translate(address) if not translated: - return ImmediateExceptionFuture( - ValueError("Address translator could not translate address %s" % address)) + error = ValueError( + "Address translator could not translate address %s" % address + ) + return ImmediateExceptionFuture(error) factory = self._reactor.connection_factory - connection = factory(self, self._connection_id_generator.get_and_increment(), - translated, self._client.config, - self._invocation_service.handle_client_message) + connection = factory( + self, + self._connection_id_generator.get_and_increment(), + translated, + self._client.config, + self._invocation_service.handle_client_message, + ) except IOError: error = sys.exc_info() return ImmediateExceptionFuture(error[1], error[2]) - future = self._authenticate(connection).continue_with(self._on_auth, connection, address) + future = self._authenticate(connection).continue_with( + self._on_auth, connection, address + ) self._pending_connections[address] = future return future @@ -378,11 +439,21 @@ def _authenticate(self, connection): client = self._client cluster_name = client.config.cluster_name client_name = client.name - request = client_authentication_codec.encode_request(cluster_name, None, None, self.client_uuid, - CLIENT_TYPE, SERIALIZATION_VERSION, __version__, - client_name, self._labels) - - invocation = Invocation(request, connection=connection, urgent=True, response_handler=lambda m: m) + request = client_authentication_codec.encode_request( + cluster_name, + None, + None, + self.client_uuid, + CLIENT_TYPE, + SERIALIZATION_VERSION, + __version__, + client_name, + self._labels, + ) + + invocation = Invocation( + request, connection=connection, urgent=True, response_handler=lambda m: m + ) self._invocation_service.invoke(invocation) return invocation.future @@ -394,14 +465,18 @@ def _on_auth(self, response, connection, address): return self._handle_successful_auth(response, connection, address) if status == _AuthenticationStatus.CREDENTIALS_FAILED: - err = AuthenticationError("Authentication failed. The configured cluster name on " - "the client does not match the one configured in the cluster.") + err = AuthenticationError( + "Authentication failed. The configured cluster name on " + "the client does not match the one configured in the cluster." + ) elif status == _AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER: err = ClientNotAllowedInClusterError("Client is not allowed in the cluster") elif status == _AuthenticationStatus.SERIALIZATION_VERSION_MISMATCH: err = IllegalStateError("Server serialization version does not match to client") else: - err = AuthenticationError("Authentication status code not supported. status: %s" % status) + err = AuthenticationError( + "Authentication status code not supported. status: %s" % status + ) connection.close("Failed to authenticate connection", err) raise err @@ -425,10 +500,17 @@ def _handle_successful_auth(self, response, connection, address): new_cluster_id = response["cluster_id"] is_initial_connection = not self.active_connections - changed_cluster = is_initial_connection and self._cluster_id is not None and self._cluster_id != new_cluster_id + changed_cluster = ( + is_initial_connection + and self._cluster_id is not None + and self._cluster_id != new_cluster_id + ) if changed_cluster: - _logger.warning("Switching from current cluster: %s to new cluster: %s", - self._cluster_id, new_cluster_id) + _logger.warning( + "Switching from current cluster: %s to new cluster: %s", + self._cluster_id, + new_cluster_id, + ) self._on_cluster_restart() with self._lock: @@ -440,8 +522,13 @@ def _handle_successful_auth(self, response, connection, address): self._cluster_id = new_cluster_id self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) - _logger.info("Authenticated with server %s:%s, server version: %s, local address: %s", - remote_address, remote_uuid, server_version_str, connection.local_address) + _logger.info( + "Authenticated with server %s:%s, server version: %s, local address: %s", + remote_address, + remote_uuid, + server_version_str, + connection.local_address, + ) for on_connection_opened, _ in self._connection_listeners: if on_connection_opened: @@ -461,17 +548,21 @@ def _on_cluster_restart(self): def _check_partition_count(self, partition_count): if not self._partition_service.check_and_set_partition_count(partition_count): - raise ClientNotAllowedInClusterError("Client can not work with this cluster because it has a " - "different partition count. Expected partition count: %d, " - "Member partition count: %d" - % (self._partition_service.partition_count, partition_count)) + raise ClientNotAllowedInClusterError( + "Client can not work with this cluster because it has a " + "different partition count. Expected partition count: %d, " + "Member partition count: %d" + % (self._partition_service.partition_count, partition_count) + ) def _check_client_active(self): if not self._lifecycle_service.running: raise HazelcastClientNotActiveError() def _get_possible_addresses(self): - member_addresses = list(map(lambda m: (m.address, None), self._cluster_service.get_members())) + member_addresses = list( + map(lambda m: (m.address, None), self._cluster_service.get_members()) + ) if self._shuffle_member_list: random.shuffle(member_addresses) @@ -529,8 +620,10 @@ def _check_connection(self, now, connection): if (now - connection.last_read_time) > self._heartbeat_timeout: _logger.warning("Heartbeat failed over the connection: %s", connection) - connection.close("Heartbeat timed out", - TargetDisconnectedError("Heartbeat timed out to connection %s" % connection)) + connection.close( + "Heartbeat timed out", + TargetDisconnectedError("Heartbeat timed out to connection %s" % connection), + ) return if (now - connection.last_write_time) > self._heartbeat_interval: @@ -539,7 +632,7 @@ def _check_connection(self, now, connection): self._invocation_service.invoke(invocation) -_frame_header = struct.Struct(' bracket_start_idx: - host = address[bracket_start_idx + 1: bracket_end_idx] + host = address[bracket_start_idx + 1 : bracket_end_idx] if last_colon_idx == (bracket_end_idx + 1): - port = int(address[last_colon_idx + 1:]) + port = int(address[last_colon_idx + 1 :]) else: host = address elif colon_idx > 0 and colon_idx == last_colon_idx: host = address[:colon_idx] - port = int(address[colon_idx + 1:]) + port = int(address[colon_idx + 1 :]) else: host = address return Address(host, port) @@ -183,14 +192,32 @@ def __init__(self, name, service_name, event_type, source): """ def __repr__(self): - return "DistributedObjectEvent(name=%s, service_name=%s, event_type=%s, source=%s)" \ - % (self.name, self.service_name, self.event_type, self.source) + return "DistributedObjectEvent(name=%s, service_name=%s, event_type=%s, source=%s)" % ( + self.name, + self.service_name, + self.event_type, + self.source, + ) class SimpleEntryView(object): """EntryView represents a readonly view of a map entry.""" - def __init__(self, key, value, cost, creation_time, expiration_time, hits, last_access_time, - last_stored_time, last_update_time, version, ttl, max_idle): + + def __init__( + self, + key, + value, + cost, + creation_time, + expiration_time, + hits, + last_access_time, + last_stored_time, + last_update_time, + version, + ttl, + max_idle, + ): self.key = key """ The key of the entry. @@ -252,12 +279,25 @@ def __init__(self, key, value, cost, creation_time, expiration_time, hits, last_ """ def __repr__(self): - return "SimpleEntryView(key=%s, value=%s, cost=%s, creation_time=%s, " \ - "expiration_time=%s, hits=%s, last_access_time=%s, last_stored_time=%s, " \ - "last_update_time=%s, version=%s, ttl=%s, max_idle=%s" \ - % (self.key, self.value, self.cost, self.creation_time, self.expiration_time, self.hits, - self.last_access_time, self.last_stored_time, self.last_update_time, self.version, - self.ttl, self.max_idle) + return ( + "SimpleEntryView(key=%s, value=%s, cost=%s, creation_time=%s, " + "expiration_time=%s, hits=%s, last_access_time=%s, last_stored_time=%s, " + "last_update_time=%s, version=%s, ttl=%s, max_idle=%s" + % ( + self.key, + self.value, + self.cost, + self.creation_time, + self.expiration_time, + self.hits, + self.last_access_time, + self.last_stored_time, + self.last_update_time, + self.version, + self.ttl, + self.max_idle, + ) + ) class HazelcastJsonValue(object): @@ -266,22 +306,22 @@ class HazelcastJsonValue(object): It is preferred to store HazelcastJsonValue instead of Strings for JSON formatted strings. Users can run predicates and use indexes on the attributes of the underlying JSON strings. - + HazelcastJsonValue is queried using Hazelcast's querying language. See `Distributed Query section `_. - + In terms of querying, numbers in JSON strings are treated as either Long or Double in the Java side. str, bool and None are treated as String, boolean and null respectively. - + HazelcastJsonValue keeps given string as it is. Strings are not checked for being valid. Ill-formatted JSON strings may cause false positive or false negative results in queries. - + HazelcastJsonValue can also be constructed from JSON serializable objects. In that case, objects are converted to JSON strings and stored as such. If an error occurs during the conversion, it is raised directly. - + None values are not allowed. """ @@ -294,7 +334,7 @@ def __init__(self, value): def to_string(self): """Returns unaltered string that was used to create this object. - + Returns: str: The original string. """ @@ -303,7 +343,7 @@ def to_string(self): def loads(self): """Deserializes the string that was used to create this object and returns as Python object. - + Returns: any: The Python object represented by the original string. """ diff --git a/hazelcast/cp.py b/hazelcast/cp.py index ad59d44082..e7541aed3b 100644 --- a/hazelcast/cp.py +++ b/hazelcast/cp.py @@ -2,12 +2,21 @@ from threading import RLock, Lock from hazelcast import six -from hazelcast.errors import SessionExpiredError, CPGroupDestroyedError, HazelcastClientNotActiveError +from hazelcast.errors import ( + SessionExpiredError, + CPGroupDestroyedError, + HazelcastClientNotActiveError, +) from hazelcast.future import ImmediateExceptionFuture, ImmediateFuture, combine_futures from hazelcast.invocation import Invocation -from hazelcast.protocol.codec import cp_group_create_cp_group_codec, cp_session_heartbeat_session_codec, \ - cp_session_create_session_codec, cp_session_close_session_codec, cp_session_generate_thread_id_codec, \ - semaphore_get_semaphore_type_codec +from hazelcast.protocol.codec import ( + cp_group_create_cp_group_codec, + cp_session_heartbeat_session_codec, + cp_session_create_session_codec, + cp_session_close_session_codec, + cp_session_generate_thread_id_codec, + semaphore_get_semaphore_type_codec, +) from hazelcast.proxy.cp.atomic_long import AtomicLong from hazelcast.proxy.cp.atomic_reference import AtomicReference from hazelcast.proxy.cp.count_down_latch import CountDownLatch @@ -155,7 +164,7 @@ def _without_default_group_name(name): return name check_true(name.find("@", idx + 1) == -1, "Custom group name must be specified at most once") - group_name = name[idx + 1:].strip() + group_name = name[idx + 1 :].strip() if group_name == _DEFAULT_GROUP_NAME: return name[:idx] return name @@ -166,7 +175,7 @@ def _get_object_name_for_proxy(name): if idx == -1: return name - group_name = name[idx + 1:].strip() + group_name = name[idx + 1 :].strip() check_true(len(group_name) > 0, "Custom CP group name cannot be empty string") object_name = name[:idx].strip() check_true(len(object_name) > 0, "Object name cannot be empty string") @@ -225,9 +234,13 @@ def _create_semaphore(self, group_id, proxy_name, object_name): invocation_service.invoke(invocation) jdk_compatible = invocation.future.result() if jdk_compatible: - return SessionlessSemaphore(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + return SessionlessSemaphore( + self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name + ) else: - return SessionAwareSemaphore(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + return SessionAwareSemaphore( + self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name + ) def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec @@ -294,7 +307,9 @@ def get_session_id(self, group_id): return _NO_SESSION_ID def acquire_session(self, group_id, count): - return self._get_or_create_session(group_id).continue_with(lambda state: state.result().acquire(count)) + return self._get_or_create_session(group_id).continue_with( + lambda state: state.result().acquire(count) + ) def release_session(self, group_id, session_id, count): session = self._sessions.get(group_id, None) @@ -319,7 +334,8 @@ def get_or_create_unique_thread_id(self, group_id): return ImmediateFuture(global_thread_id) return self._request_generate_thread_id(group_id).continue_with( - lambda t_id: self._thread_ids.setdefault(key, t_id.result())) + lambda t_id: self._thread_ids.setdefault(key, t_id.result()) + ) def shutdown(self): with self._lock: diff --git a/hazelcast/discovery.py b/hazelcast/discovery.py index 1a928094b2..e765c4aa38 100644 --- a/hazelcast/discovery.py +++ b/hazelcast/discovery.py @@ -20,7 +20,7 @@ def __init__(self, token, connection_timeout): def load_addresses(self): """Loads member addresses from Hazelcast Cloud endpoint. - + Returns: tuple[list[hazelcast.core.Address], list[hazelcast.core.Address]]: The possible member addresses as primary addresses to connect to. @@ -65,6 +65,7 @@ class HazelcastCloudDiscovery(object): """Discovery service that discover nodes via Hazelcast.cloud https://coordinator.hazelcast.cloud/cluster/discovery?token= """ + _CLOUD_URL_BASE = "coordinator.hazelcast.cloud" _CLOUD_URL_PATH = "/cluster/discovery?token=" _PRIVATE_ADDRESS_PROPERTY = "private-address" @@ -78,16 +79,18 @@ def __init__(self, token, connection_timeout): def discover_nodes(self): """Discovers nodes from Hazelcast.cloud. - + Returns: dict[hazelcast.core.Address, hazelcast.core.Address]: Dictionary that maps private addresses to public addresses. """ try: - https_connection = http_client.HTTPSConnection(host=self._CLOUD_URL_BASE, - timeout=self._connection_timeout, - context=self._ctx) - https_connection.request(method="GET", url=self._url, headers={"Accept-Charset": "UTF-8"}) + https_connection = http_client.HTTPSConnection( + host=self._CLOUD_URL_BASE, timeout=self._connection_timeout, context=self._ctx + ) + https_connection.request( + method="GET", url=self._url, headers={"Accept-Charset": "UTF-8"} + ) https_response = https_connection.getresponse() except ssl.SSLError as err: raise HazelcastCertificationError(str(err)) diff --git a/hazelcast/errors.py b/hazelcast/errors.py index 24d742150f..3c740349d0 100644 --- a/hazelcast/errors.py +++ b/hazelcast/errors.py @@ -10,6 +10,7 @@ def retryable(cls): class HazelcastError(Exception): """General HazelcastError class.""" + def __init__(self, message=None, cause=None): super(HazelcastError, self).__init__(message, cause) @@ -660,9 +661,16 @@ def _create_error(error_holders, idx): error_class = _ERROR_CODE_TO_ERROR.get(error_holder.error_code, None) stack_trace = "\n".join( - ["\tat %s.%s(%s:%s)" % (x.class_name, x.method_name, x.file_name, x.line_number) for x in - error_holder.stack_trace_elements]) - message = "Exception from server: %s: %s\n %s" % (error_holder.class_name, error_holder.message, stack_trace) + [ + "\tat %s.%s(%s:%s)" % (x.class_name, x.method_name, x.file_name, x.line_number) + for x in error_holder.stack_trace_elements + ] + ) + message = "Exception from server: %s: %s\n %s" % ( + error_holder.class_name, + error_holder.message, + stack_trace, + ) if error_class: return error_class(message, _create_error(error_holders, idx + 1)) else: @@ -670,4 +678,4 @@ def _create_error(error_holders, idx): def is_retryable_error(error): - return hasattr(error, 'retryable') + return hasattr(error, "retryable") diff --git a/hazelcast/future.py b/hazelcast/future.py index 5645361d39..740f9343d9 100644 --- a/hazelcast/future.py +++ b/hazelcast/future.py @@ -11,6 +11,7 @@ class Future(object): """Future is used for representing an asynchronous computation result.""" + _result = None _exception = None _traceback = None @@ -49,7 +50,7 @@ def set_exception(self, exception, traceback=None): def result(self): """Returns the result of the Future, which makes the call synchronous if the result has not been computed yet. - + Returns: Result of the Future. """ @@ -63,10 +64,11 @@ def result(self): return self._result def _reactor_check(self): - if not self.done() and hasattr(self._threading_locals, 'is_reactor_thread'): + if not self.done() and hasattr(self._threading_locals, "is_reactor_thread"): raise RuntimeError( - "Synchronous result for incomplete operation must not be called from Reactor thread. " - "Use add_done_callback instead.") + "Synchronous result for incomplete operation must not be called from Reactor thread. " + "Use add_done_callback instead." + ) def is_success(self): """Determines whether the result can be successfully computed or not.""" @@ -74,7 +76,7 @@ def is_success(self): def done(self): """Determines whether the result is computed or not. - + Returns: bool: ``True`` if the result is computed, ``False`` otherwise. """ @@ -82,7 +84,7 @@ def done(self): def running(self): """Determines whether the asynchronous call, the computation is still running or not. - + Returns: bool: ``True`` if the result is being computed, ``False`` otherwise. """ diff --git a/hazelcast/hash.py b/hazelcast/hash.py index 06b2167aa9..4c10939bf7 100644 --- a/hazelcast/hash.py +++ b/hazelcast/hash.py @@ -16,8 +16,8 @@ def murmur_hash3_x86_32(data): h1 = 0x01000193 - c1 = 0xcc9e2d51 - c2 = 0x1b873593 + c1 = 0xCC9E2D51 + c2 = 0x1B873593 # body for block_start in range(0, nblocks * 4, 4): @@ -30,7 +30,7 @@ def murmur_hash3_x86_32(data): h1 ^= k1 h1 = (h1 << 13 | h1 >> 19) & 0xFFFFFFFF # inlined _ROTL32 - h1 = (h1 * 5 + 0xe6546b64) & 0xFFFFFFFF + h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF # tail tail_index = nblocks * 4 @@ -53,9 +53,9 @@ def murmur_hash3_x86_32(data): h1 ^= length h1 ^= h1 >> 16 - h1 = (h1 * 0x85ebca6b) & 0xFFFFFFFF + h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF h1 ^= h1 >> 13 - h1 = (h1 * 0xc2b2ae35) & 0xFFFFFFFF + h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF h1 ^= h1 >> 16 return -(h1 & 0x80000000) | (h1 & 0x7FFFFFFF) diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 85a765f261..25f2260094 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -2,9 +2,17 @@ import time import functools -from hazelcast.errors import create_error_from_message, HazelcastInstanceNotActiveError, is_retryable_error, \ - TargetDisconnectedError, HazelcastClientNotActiveError, TargetNotMemberError, \ - EXCEPTION_MESSAGE_TYPE, IndeterminateOperationStateError, OperationTimeoutError +from hazelcast.errors import ( + create_error_from_message, + HazelcastInstanceNotActiveError, + is_retryable_error, + TargetDisconnectedError, + HazelcastClientNotActiveError, + TargetNotMemberError, + EXCEPTION_MESSAGE_TYPE, + IndeterminateOperationStateError, + OperationTimeoutError, +) from hazelcast.future import Future from hazelcast.protocol.codec import client_local_backup_listener_codec from hazelcast.util import AtomicInteger @@ -18,12 +26,34 @@ def _no_op_response_handler(_): class Invocation(object): - __slots__ = ("request", "timeout", "partition_id", "uuid", "connection", "event_handler", - "future", "sent_connection", "urgent", "response_handler", "backup_acks_received", - "backup_acks_expected", "pending_response", "pending_response_received_time") - - def __init__(self, request, partition_id=-1, uuid=None, connection=None, - event_handler=None, urgent=False, timeout=None, response_handler=_no_op_response_handler): + __slots__ = ( + "request", + "timeout", + "partition_id", + "uuid", + "connection", + "event_handler", + "future", + "sent_connection", + "urgent", + "response_handler", + "backup_acks_received", + "backup_acks_expected", + "pending_response", + "pending_response_received_time", + ) + + def __init__( + self, + request, + partition_id=-1, + uuid=None, + connection=None, + event_handler=None, + urgent=False, + timeout=None, + response_handler=_no_op_response_handler, + ): self.request = request self.partition_id = partition_id self.uuid = uuid @@ -159,7 +189,9 @@ def _invoke_smart(self, invocation): if connection: invoked = self._send(invocation, connection) if not invoked: - self._notify_error(invocation, IOError("Could not invoke on connection %s" % connection)) + self._notify_error( + invocation, IOError("Could not invoke on connection %s" % connection) + ) return if invocation.partition_id != -1: @@ -186,7 +218,9 @@ def _invoke_non_smart(self, invocation): if connection: invoked = self._send(invocation, connection) if not invoked: - self._notify_error(invocation, IOError("Could not invoke on connection %s" % connection)) + self._notify_error( + invocation, IOError("Could not invoke on connection %s" % connection) + ) return if not self._invoke_on_random_connection(invocation): @@ -229,8 +263,10 @@ def _notify_error(self, invocation, error): if invocation.timeout < time.time(): _logger.debug("Error will not be retried because invocation timed out: %s", error) - error = OperationTimeoutError("Request timed out because an error occurred " - "after invocation timeout: %s" % error) + error = OperationTimeoutError( + "Request timed out because an error occurred " + "after invocation timeout: %s" % error + ) self._complete_with_error(invocation, error) return @@ -244,7 +280,9 @@ def _should_retry(self, invocation, error): if invocation.uuid and isinstance(error, TargetNotMemberError): return False - if isinstance(error, (IOError, HazelcastInstanceNotActiveError)) or is_retryable_error(error): + if isinstance(error, (IOError, HazelcastInstanceNotActiveError)) or is_retryable_error( + error + ): return True if isinstance(error, TargetDisconnectedError): @@ -260,10 +298,12 @@ def _complete_with_error(self, invocation, error): def _register_backup_listener(self): codec = client_local_backup_listener_codec request = codec.encode_request() - self._listener_service.register_listener(request, - codec.decode_response, - lambda reg_id: None, - lambda m: codec.handle(m, self._backup_event_handler)).result() + self._listener_service.register_listener( + request, + codec.decode_response, + lambda reg_id: None, + lambda m: codec.handle(m, self._backup_event_handler), + ).result() def _backup_event_handler(self, correlation_id): invocation = self._pending.get(correlation_id, None) @@ -332,7 +372,9 @@ def _detect_and_handle_backup_timeout(self, invocation, now): return if self._fail_on_indeterminate_state: - error = IndeterminateOperationStateError("Invocation failed because the backup acks are missed") + error = IndeterminateOperationStateError( + "Invocation failed because the backup acks are missed" + ) self._complete_with_error(invocation, error) return diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 4c76968346..b363cbc77a 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -13,10 +13,17 @@ class _ListenerRegistration(object): - __slots__ = ("registration_request", "decode_register_response", "encode_deregister_request", - "handler", "connection_registrations") - - def __init__(self, registration_request, decode_register_response, encode_deregister_request, handler): + __slots__ = ( + "registration_request", + "decode_register_response", + "encode_deregister_request", + "handler", + "connection_registrations", + ) + + def __init__( + self, registration_request, decode_register_response, encode_deregister_request, handler + ): self.registration_request = registration_request self.decode_register_response = decode_register_response self.encode_deregister_request = encode_deregister_request @@ -45,11 +52,14 @@ def __init__(self, client, connection_manager, invocation_service): def start(self): self._connection_manager.add_listener(self._connection_added, self._connection_removed) - def register_listener(self, registration_request, decode_register_response, encode_deregister_request, handler): + def register_listener( + self, registration_request, decode_register_response, encode_deregister_request, handler + ): with self._registration_lock: registration_id = str(uuid4()) - registration = _ListenerRegistration(registration_request, decode_register_response, - encode_deregister_request, handler) + registration = _ListenerRegistration( + registration_request, decode_register_response, encode_deregister_request, handler + ) self._active_registrations[registration_id] = registration futures = [] @@ -75,27 +85,38 @@ def deregister_listener(self, user_registration_id): if not listener_registration: return ImmediateFuture(False) - for connection, event_registration in six.iteritems(listener_registration.connection_registrations): + for connection, event_registration in six.iteritems( + listener_registration.connection_registrations + ): # Remove local handler self.remove_event_handler(event_registration.correlation_id) # The rest is for deleting the remote registration server_registration_id = event_registration.server_registration_id - deregister_request = listener_registration.encode_deregister_request(server_registration_id) + deregister_request = listener_registration.encode_deregister_request( + server_registration_id + ) if deregister_request is None: # None means no remote registration (e.g. for backup acks) continue - invocation = Invocation(deregister_request, connection=connection, timeout=six.MAXSIZE, urgent=True) + invocation = Invocation( + deregister_request, connection=connection, timeout=six.MAXSIZE, urgent=True + ) self._invocation_service.invoke(invocation) def handler(f, connection=connection): e = f.exception() if e: - if isinstance(e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)): + if isinstance( + e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError) + ): return - _logger.warning("Deregistration of listener with ID %s has failed for address %s", - user_registration_id, connection.remote_address) + _logger.warning( + "Deregistration of listener with ID %s has failed for address %s", + user_registration_id, + connection.remote_address, + ) invocation.future.add_done_callback(handler) @@ -122,8 +143,13 @@ def _register_on_connection(self, user_registration_id, listener_registration, c return registration_request = listener_registration.registration_request.copy() - invocation = Invocation(registration_request, connection=connection, - event_handler=listener_registration.handler, response_handler=lambda m: m, urgent=True) + invocation = Invocation( + registration_request, + connection=connection, + event_handler=listener_registration.handler, + response_handler=lambda m: m, + urgent=True, + ) self._invocation_service.invoke(invocation) def callback(f): @@ -135,8 +161,11 @@ def callback(f): registration_map[connection] = registration except Exception as e: if connection.live: - _logger.exception("Listener %s can not be added to a new connection: %s", - user_registration_id, connection) + _logger.exception( + "Listener %s can not be added to a new connection: %s", + user_registration_id, + connection, + ) raise e return invocation.future.continue_with(callback) @@ -149,13 +178,17 @@ def _connection_added(self, connection): def _connection_removed(self, connection, _): with self._registration_lock: for listener_registration in six.itervalues(self._active_registrations): - event_registration = listener_registration.connection_registrations.pop(connection, None) + event_registration = listener_registration.connection_registrations.pop( + connection, None + ) if event_registration: self.remove_event_handler(event_registration.correlation_id) class ClusterViewListenerService(object): - def __init__(self, client, connection_manager, partition_service, cluster_service, invocation_service): + def __init__( + self, client, connection_manager, partition_service, cluster_service, invocation_service + ): self._client = client self._partition_service = partition_service self._connection_manager = connection_manager @@ -187,7 +220,9 @@ def _try_register(self, connection): self._cluster_service.clear_member_list_version() self._listener_added_connection = connection request = client_add_cluster_view_listener_codec.encode_request() - invocation = Invocation(request, connection=connection, event_handler=self._handler(connection), urgent=True) + invocation = Invocation( + request, connection=connection, event_handler=self._handler(connection), urgent=True + ) self._cluster_service.clear_member_list_version() self._invocation_service.invoke(invocation) @@ -207,7 +242,8 @@ def handle_members_view_event(member_list_version, member_infos): self._cluster_service.handle_members_view_event(member_list_version, member_infos) def inner(message): - client_add_cluster_view_listener_codec.handle(message, handle_members_view_event, - handle_partitions_view_event) + client_add_cluster_view_listener_codec.handle( + message, handle_members_view_event, handle_partitions_view_event + ) return inner diff --git a/hazelcast/near_cache.py b/hazelcast/near_cache.py index 209dec7c6a..b41bf66bbb 100644 --- a/hazelcast/near_cache.py +++ b/hazelcast/near_cache.py @@ -23,12 +23,13 @@ def _random_key_func(_): EvictionPolicy.NONE: None, EvictionPolicy.LRU: _lru_key_func, EvictionPolicy.LFU: _lfu_key_func, - EvictionPolicy.RANDOM: _random_key_func + EvictionPolicy.RANDOM: _random_key_func, } class DataRecord(object): """An expirable and evictable data object which represents a cache entry.""" + def __init__(self, key, value, create_time=None, ttl_seconds=None): self.key = key self.value = value @@ -48,19 +49,41 @@ def is_expired(self, max_idle_seconds): """ now = current_time() - return (self.expiration_time is not None and self.expiration_time < now) or \ - (max_idle_seconds is not None and self.last_access_time + max_idle_seconds < now) + return (self.expiration_time is not None and self.expiration_time < now) or ( + max_idle_seconds is not None and self.last_access_time + max_idle_seconds < now + ) def __repr__(self): - return "DataRecord(key=%s, value=%s, create_time=%s, expiration_time=%s, last_access_time=%s, access_hit=%s)" \ - % (self.key, self.value, self.create_time, self.expiration_time, self.last_access_time, self.access_hit) + return ( + "DataRecord(key=%s, value=%s, create_time=%s, " + "expiration_time=%s, last_access_time=%s, access_hit=%s)" + % ( + self.key, + self.value, + self.create_time, + self.expiration_time, + self.last_access_time, + self.access_hit, + ) + ) class NearCache(dict): """NearCache is a local cache used by :class:`~hazelcast.proxy.map.MapFeatNearCache`.""" - def __init__(self, name, serialization_service, in_memory_format, time_to_live, max_idle, invalidate_on_change, - eviction_policy, eviction_max_size, eviction_sampling_count=None, eviction_sampling_pool_size=None): + def __init__( + self, + name, + serialization_service, + in_memory_format, + time_to_live, + max_idle, + invalidate_on_change, + eviction_policy, + eviction_max_size, + eviction_sampling_count=None, + eviction_sampling_pool_size=None, + ): self.name = name self.serialization_service = serialization_service self.in_memory_format = in_memory_format @@ -143,8 +166,11 @@ def __getitem__(self, key): elif self.eviction_policy == EvictionPolicy.LFU: value_record.access_hit += 1 self._hits += 1 - return self.serialization_service.to_object(value_record.value) \ - if self.in_memory_format == InMemoryFormat.BINARY else value_record.value + return ( + self.serialization_service.to_object(value_record.value) + if self.in_memory_format == InMemoryFormat.BINARY + else value_record.value + ) def _do_eviction_if_required(self): if not self._is_eviction_required(): @@ -156,9 +182,11 @@ def _do_eviction_if_required(self): sorted_candidate_pool = sorted(new_eviction_samples_cleaned, key=self._key_func) min_size = min(self.eviction_sampling_pool_size, len(sorted_candidate_pool)) - self._eviction_candidates = sorted_candidate_pool[:min_size] # set new eviction candidate pool + # set new eviction candidate pool + self._eviction_candidates = sorted_candidate_pool[:min_size] - if len(new_eviction_samples) == len(new_eviction_samples_cleaned): # did any item expired or do we need to evict + # did any item expired or do we need to evict + if len(new_eviction_samples) == len(new_eviction_samples_cleaned): try: self.__delitem__(self._eviction_candidates[0].key) self._evictions += 1 @@ -175,7 +203,10 @@ def _find_new_random_samples(self): index = i if i < len(records) else i - len(records) if records[index].is_expired(self.max_idle): self._clean_expired_record(records[index].key) - elif self._is_better_than_worse_entry(records[index]) or len(new_sample_pool) < self.eviction_sampling_pool_size: + elif ( + self._is_better_than_worse_entry(records[index]) + or len(new_sample_pool) < self.eviction_sampling_pool_size + ): new_sample_pool.add(records[index]) return new_sample_pool @@ -192,11 +223,15 @@ def _random_index(self): return random.randint(0, self.eviction_max_size - 1) def _is_better_than_worse_entry(self, data_record): - return len(self._eviction_candidates) == 0 \ - or (self._key_func(data_record) - self._key_func(self._eviction_candidates[-1])) < 0 + return ( + len(self._eviction_candidates) == 0 + or (self._key_func(data_record) - self._key_func(self._eviction_candidates[-1])) < 0 + ) def _is_eviction_required(self): - return self.eviction_policy != EvictionPolicy.NONE and self.eviction_max_size <= self.__len__() + return ( + self.eviction_policy != EvictionPolicy.NONE and self.eviction_max_size <= self.__len__() + ) def _clean_expired_record(self, key): try: @@ -238,16 +273,18 @@ def get_or_create_near_cache(self, name): if not near_cache_config: raise ValueError("Cannot find a near cache configuration with the name '%s'" % name) - near_cache = NearCache(name, - self._serialization_service, - near_cache_config.in_memory_format, - near_cache_config.time_to_live, - near_cache_config.max_idle, - near_cache_config.invalidate_on_change, - near_cache_config.eviction_policy, - near_cache_config.eviction_max_size, - near_cache_config.eviction_sampling_count, - near_cache_config.eviction_sampling_pool_size) + near_cache = NearCache( + name, + self._serialization_service, + near_cache_config.in_memory_format, + near_cache_config.time_to_live, + near_cache_config.max_idle, + near_cache_config.invalidate_on_change, + near_cache_config.eviction_policy, + near_cache_config.eviction_max_size, + near_cache_config.eviction_sampling_count, + near_cache_config.eviction_sampling_pool_size, + ) self._caches[name] = near_cache diff --git a/hazelcast/partition.py b/hazelcast/partition.py index eb93faaeac..f3da2b88e4 100644 --- a/hazelcast/partition.py +++ b/hazelcast/partition.py @@ -105,20 +105,31 @@ def check_and_set_partition_count(self, partition_count): def _should_be_applied(self, connection, partitions, version, current): if not partitions: - _logger.debug("Partition view will not be applied since response is empty. " - "Sending connection: %s, version: %s, current table: %s", - connection, version, current) + _logger.debug( + "Partition view will not be applied since response is empty. " + "Sending connection: %s, version: %s, current table: %s", + connection, + version, + current, + ) return False if connection != current.connection: - _logger.debug("Partition view event coming from a new connection. Old: %s, new: %s", - current.connection, connection) + _logger.debug( + "Partition view event coming from a new connection. Old: %s, new: %s", + current.connection, + connection, + ) return True if version <= current.version: - _logger.debug("Partition view will not be applied since response state version is older. " - "Sending connection: %s, version: %s, current table: %s", - connection, version, current) + _logger.debug( + "Partition view will not be applied since response state version is older. " + "Sending connection: %s, version: %s, current table: %s", + connection, + version, + current, + ) return False return True @@ -136,7 +147,7 @@ def string_partition_strategy(key): if key is None: return None try: - index_of = key.index('@') - return key[index_of + 1:] + index_of = key.index("@") + return key[index_of + 1 :] except ValueError: return key diff --git a/hazelcast/predicate.py b/hazelcast/predicate.py index 8794230ba4..3166e84dc6 100644 --- a/hazelcast/predicate.py +++ b/hazelcast/predicate.py @@ -63,17 +63,19 @@ class Predicate(object): the stored attribute value. If no conversion matching the type exists, ``IllegalArgumentError`` is thrown. """ + pass class PagingPredicate(Predicate): """This class is a special Predicate which helps to get a page-by-page - result of a query. + result of a query. - It can be constructed with a page-size, an inner predicate for filtering, - and a comparator for sorting. This class is not thread-safe and stateless. - To be able to reuse for another query, one should call :func:`reset`. + It can be constructed with a page-size, an inner predicate for filtering, + and a comparator for sorting. This class is not thread-safe and stateless. + To be able to reuse for another query, one should call :func:`reset`. """ + def reset(self): """Resets the predicate for reuse.""" raise NotImplementedError("reset") @@ -182,7 +184,11 @@ def write_data(self, output): output.write_object(self.from_) def __repr__(self): - return "BetweenPredicate(attribute='%s', from=%s, to=%s)" % (self.attribute, self.from_, self.to) + return "BetweenPredicate(attribute='%s', from=%s, to=%s)" % ( + self.attribute, + self.from_, + self.to, + ) class _EqualPredicate(_AbstractPredicate): @@ -217,7 +223,11 @@ def write_data(self, output): def __repr__(self): return "GreaterLessPredicate(attribute='%s', value=%s, is_equal=%s, is_less=%s)" % ( - self.attribute, self.value, self.is_equal, self.is_less) + self.attribute, + self.value, + self.is_equal, + self.is_less, + ) class _LikePredicate(_AbstractPredicate): @@ -256,7 +266,10 @@ def write_data(self, output): output.write_object(value) def __repr__(self): - return "InPredicate(attribute='%s', %s)" % (self.attribute, ",".join([str(x) for x in self.values])) + return "InPredicate(attribute='%s', %s)" % ( + self.attribute, + ",".join([str(x) for x in self.values]), + ) class _InstanceOfPredicate(_AbstractPredicate): @@ -342,7 +355,7 @@ def __init__(self, predicate, page_size, comparator=None): raise TypeError("Nested paging predicate not supported.") if page_size <= 0: - raise ValueError('page_size should be greater than 0.') + raise ValueError("page_size should be greater than 0.") self._internal_predicate = predicate self._page_size = page_size @@ -352,8 +365,11 @@ def __init__(self, predicate, page_size, comparator=None): self.anchor_list = [] # List of pairs: (nearest page, (anchor key, anchor value)) def __repr__(self): - return "PagingPredicate(predicate=%s, page_size=%s, comparator=%s)" % (self._internal_predicate, - self.page_size, self.comparator) + return "PagingPredicate(predicate=%s, page_size=%s, comparator=%s)" % ( + self._internal_predicate, + self.page_size, + self.comparator, + ) def write_data(self, output): output.write_object(self._internal_predicate) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 6e59561296..a402a9776d 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -47,7 +47,7 @@ errno.EWOULDBLOCK, errno.EDEADLK, ssl.SSL_ERROR_WANT_WRITE, - ssl.SSL_ERROR_WANT_READ + ssl.SSL_ERROR_WANT_READ, ) @@ -112,7 +112,7 @@ def handle_read(self): pass def close(self): - _AbstractWaker.close(self) # Will close the reader + _AbstractWaker.close(self) # Will close the reader os.close(self._write_fd) @@ -250,7 +250,7 @@ def shutdown(self): class _WakeableLoop(_AbstractLoop): - _waker_class = _PipedWaker if os.name != 'nt' else _SocketedWaker + _waker_class = _PipedWaker if os.name != "nt" else _SocketedWaker def __init__(self, map): _AbstractLoop.__init__(self, map) @@ -334,10 +334,12 @@ def __init__(self): loop = _WakeableLoop(self.map) loop.check_loop() except: - _logger.exception("Failed to initialize the wakeable loop. " - "Using the basic loop instead. " - "When used in the blocking mode, client" - "may have sub-optimal performance.") + _logger.exception( + "Failed to initialize the wakeable loop. " + "Using the basic loop instead. " + "When used in the blocking mode, client" + "may have sub-optimal performance." + ) if loop: loop.shutdown() loop = _BasicLoop(self.map) @@ -355,8 +357,12 @@ def wake_loop(self): def shutdown(self): self._loop.shutdown() - def connection_factory(self, connection_manager, connection_id, address, network_config, message_callback): - return AsyncoreConnection(self, connection_manager, connection_id, address, network_config, message_callback) + def connection_factory( + self, connection_manager, connection_id, address, network_config, message_callback + ): + return AsyncoreConnection( + self, connection_manager, connection_id, address, network_config, message_callback + ) _BUFFER_SIZE = 128000 @@ -368,8 +374,9 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): send_buffer_size = _BUFFER_SIZE _close_timer = None - def __init__(self, reactor, connection_manager, connection_id, address, - config, message_callback): + def __init__( + self, reactor, connection_manager, connection_id, address, config, message_callback + ): asyncore.dispatcher.__init__(self, map=reactor.map) Connection.__init__(self, connection_manager, connection_id, message_callback) @@ -545,7 +552,9 @@ def _wrap_as_ssl_socket(self, config): ssl_context.load_default_certs() if config.ssl_certfile: - ssl_context.load_cert_chain(config.ssl_certfile, config.ssl_keyfile, config.ssl_password) + ssl_context.load_cert_chain( + config.ssl_certfile, config.ssl_keyfile, config.ssl_password + ) if config.ssl_ciphers: ssl_context.set_ciphers(config.ssl_ciphers) @@ -553,7 +562,11 @@ def _wrap_as_ssl_socket(self, config): self.socket = ssl_context.wrap_socket(self.socket) def __repr__(self): - return "Connection(id=%s, live=%s, remote_address=%s)" % (self._id, self.live, self.remote_address) + return "Connection(id=%s, live=%s, remote_address=%s)" % ( + self._id, + self.live, + self.remote_address, + ) def __str__(self): return self.__repr__() diff --git a/hazelcast/six.py b/hazelcast/six.py index 241d95cee6..ad95b754ca 100644 --- a/hazelcast/six.py +++ b/hazelcast/six.py @@ -34,15 +34,15 @@ PY34 = sys.version_info[0:2] >= (3, 4) if PY3: - string_types = str, - integer_types = int, - class_types = type, + string_types = (str,) + integer_types = (int,) + class_types = (type,) text_type = str binary_type = bytes MAXSIZE = sys.maxsize else: - string_types = basestring, + string_types = (basestring,) integer_types = (int, long) class_types = (type, types.ClassType) text_type = unicode @@ -54,9 +54,9 @@ else: # It's possible to have sizeof(long) != sizeof(Py_ssize_t). class X(object): - def __len__(self): return 1 << 31 + try: len(X()) except OverflowError: @@ -80,7 +80,6 @@ def _import_module(name): class _LazyDescr(object): - def __init__(self, name): self.name = name @@ -97,7 +96,6 @@ def __get__(self, obj, tp): class MovedModule(_LazyDescr): - def __init__(self, name, old, new=None): super(MovedModule, self).__init__(name) if PY3: @@ -118,7 +116,6 @@ def __getattr__(self, attr): class _LazyModule(types.ModuleType): - def __init__(self, name): super(_LazyModule, self).__init__(name) self.__doc__ = self.__class__.__doc__ @@ -133,7 +130,6 @@ def __dir__(self): class MovedAttribute(_LazyDescr): - def __init__(self, name, old_mod, new_mod, old_attr=None, new_attr=None): super(MovedAttribute, self).__init__(name) if PY3: @@ -217,14 +213,17 @@ def get_code(self, fullname): Required, if is_package is implemented""" self.__get_module(fullname) # eventually raises ImportError return None + get_source = get_code # same as get_code + _importer = _SixMetaPathImporter(__name__) class _MovedItems(_LazyModule): """Lazy loading of moved objects""" + __path__ = [] # mark as package @@ -281,15 +280,12 @@ class _MovedItems(_LazyModule): MovedModule("tkinter_ttk", "ttk", "tkinter.ttk"), MovedModule("tkinter_constants", "Tkconstants", "tkinter.constants"), MovedModule("tkinter_dnd", "Tkdnd", "tkinter.dnd"), - MovedModule("tkinter_colorchooser", "tkColorChooser", - "tkinter.colorchooser"), - MovedModule("tkinter_commondialog", "tkCommonDialog", - "tkinter.commondialog"), + MovedModule("tkinter_colorchooser", "tkColorChooser", "tkinter.colorchooser"), + MovedModule("tkinter_commondialog", "tkCommonDialog", "tkinter.commondialog"), MovedModule("tkinter_tkfiledialog", "tkFileDialog", "tkinter.filedialog"), MovedModule("tkinter_font", "tkFont", "tkinter.font"), MovedModule("tkinter_messagebox", "tkMessageBox", "tkinter.messagebox"), - MovedModule("tkinter_tksimpledialog", "tkSimpleDialog", - "tkinter.simpledialog"), + MovedModule("tkinter_tksimpledialog", "tkSimpleDialog", "tkinter.simpledialog"), MovedModule("urllib_parse", __name__ + ".moves.urllib_parse", "urllib.parse"), MovedModule("urllib_error", __name__ + ".moves.urllib_error", "urllib.error"), MovedModule("urllib", __name__ + ".moves.urllib", __name__ + ".moves.urllib"), @@ -353,8 +349,11 @@ class Module_six_moves_urllib_parse(_LazyModule): Module_six_moves_urllib_parse._moved_attributes = _urllib_parse_moved_attributes -_importer._add_module(Module_six_moves_urllib_parse(__name__ + ".moves.urllib_parse"), - "moves.urllib_parse", "moves.urllib.parse") +_importer._add_module( + Module_six_moves_urllib_parse(__name__ + ".moves.urllib_parse"), + "moves.urllib_parse", + "moves.urllib.parse", +) class Module_six_moves_urllib_error(_LazyModule): @@ -373,8 +372,11 @@ class Module_six_moves_urllib_error(_LazyModule): Module_six_moves_urllib_error._moved_attributes = _urllib_error_moved_attributes -_importer._add_module(Module_six_moves_urllib_error(__name__ + ".moves.urllib.error"), - "moves.urllib_error", "moves.urllib.error") +_importer._add_module( + Module_six_moves_urllib_error(__name__ + ".moves.urllib.error"), + "moves.urllib_error", + "moves.urllib.error", +) class Module_six_moves_urllib_request(_LazyModule): @@ -425,8 +427,11 @@ class Module_six_moves_urllib_request(_LazyModule): Module_six_moves_urllib_request._moved_attributes = _urllib_request_moved_attributes -_importer._add_module(Module_six_moves_urllib_request(__name__ + ".moves.urllib.request"), - "moves.urllib_request", "moves.urllib.request") +_importer._add_module( + Module_six_moves_urllib_request(__name__ + ".moves.urllib.request"), + "moves.urllib_request", + "moves.urllib.request", +) class Module_six_moves_urllib_response(_LazyModule): @@ -446,8 +451,11 @@ class Module_six_moves_urllib_response(_LazyModule): Module_six_moves_urllib_response._moved_attributes = _urllib_response_moved_attributes -_importer._add_module(Module_six_moves_urllib_response(__name__ + ".moves.urllib.response"), - "moves.urllib_response", "moves.urllib.response") +_importer._add_module( + Module_six_moves_urllib_response(__name__ + ".moves.urllib.response"), + "moves.urllib_response", + "moves.urllib.response", +) class Module_six_moves_urllib_robotparser(_LazyModule): @@ -464,13 +472,17 @@ class Module_six_moves_urllib_robotparser(_LazyModule): Module_six_moves_urllib_robotparser._moved_attributes = _urllib_robotparser_moved_attributes -_importer._add_module(Module_six_moves_urllib_robotparser(__name__ + ".moves.urllib.robotparser"), - "moves.urllib_robotparser", "moves.urllib.robotparser") +_importer._add_module( + Module_six_moves_urllib_robotparser(__name__ + ".moves.urllib.robotparser"), + "moves.urllib_robotparser", + "moves.urllib.robotparser", +) class Module_six_moves_urllib(types.ModuleType): """Create a six.moves.urllib namespace that resembles the Python 3 namespace""" + __path__ = [] # mark as package parse = _importer._get_module("moves.urllib_parse") error = _importer._get_module("moves.urllib_error") @@ -479,10 +491,10 @@ class Module_six_moves_urllib(types.ModuleType): robotparser = _importer._get_module("moves.urllib_robotparser") def __dir__(self): - return ['parse', 'error', 'request', 'response', 'robotparser'] + return ["parse", "error", "request", "response", "robotparser"] + -_importer._add_module(Module_six_moves_urllib(__name__ + ".moves.urllib"), - "moves.urllib") +_importer._add_module(Module_six_moves_urllib(__name__ + ".moves.urllib"), "moves.urllib") def add_move(move): @@ -522,19 +534,24 @@ def remove_move(name): try: advance_iterator = next except NameError: + def advance_iterator(it): return it.next() + + next = advance_iterator try: callable = callable except NameError: + def callable(obj): return any("__call__" in klass.__dict__ for klass in type(obj).__mro__) if PY3: + def get_unbound_function(unbound): return unbound @@ -545,6 +562,7 @@ def create_unbound_method(func, cls): Iterator = object else: + def get_unbound_function(unbound): return unbound.im_func @@ -555,13 +573,11 @@ def create_unbound_method(func, cls): return types.MethodType(func, None, cls) class Iterator(object): - def next(self): return type(self).__next__(self) callable = callable -_add_doc(get_unbound_function, - """Get the function out of a possibly unbound function""") +_add_doc(get_unbound_function, """Get the function out of a possibly unbound function""") get_method_function = operator.attrgetter(_meth_func) @@ -573,6 +589,7 @@ def next(self): if PY3: + def iterkeys(d, **kw): return iter(d.keys(**kw)) @@ -591,6 +608,7 @@ def iterlists(d, **kw): viewitems = operator.methodcaller("items") else: + def iterkeys(d, **kw): return d.iterkeys(**kw) @@ -611,26 +629,28 @@ def iterlists(d, **kw): _add_doc(iterkeys, "Return an iterator over the keys of a dictionary.") _add_doc(itervalues, "Return an iterator over the values of a dictionary.") -_add_doc(iteritems, - "Return an iterator over the (key, value) pairs of a dictionary.") -_add_doc(iterlists, - "Return an iterator over the (key, [values]) pairs of a dictionary.") +_add_doc(iteritems, "Return an iterator over the (key, value) pairs of a dictionary.") +_add_doc(iterlists, "Return an iterator over the (key, [values]) pairs of a dictionary.") if PY3: + def b(s): return s.encode("latin-1") def u(s): return s + unichr = chr import struct + int2byte = struct.Struct(">B").pack del struct byte2int = operator.itemgetter(0) indexbytes = operator.getitem iterbytes = iter import io + StringIO = io.StringIO BytesIO = io.BytesIO _assertCountEqual = "assertCountEqual" @@ -641,12 +661,15 @@ def u(s): _assertRaisesRegex = "assertRaisesRegex" _assertRegex = "assertRegex" else: + def b(s): return s + # Workaround for standalone backslash def u(s): - return unicode(s.replace(r'\\', r'\\\\'), "unicode_escape") + return unicode(s.replace(r"\\", r"\\\\"), "unicode_escape") + unichr = unichr int2byte = chr @@ -655,8 +678,10 @@ def byte2int(bs): def indexbytes(buf, i): return ord(buf[i]) + iterbytes = functools.partial(itertools.imap, ord) import StringIO + StringIO = BytesIO = StringIO.StringIO _assertCountEqual = "assertItemsEqual" _assertRaisesRegex = "assertRaisesRegexp" @@ -691,7 +716,9 @@ def reraise(tp, value, tb=None): value = None tb = None + else: + def exec_(_code_, _globs_=None, _locs_=None): """Execute code in a namespace.""" if _globs_ is None: @@ -704,37 +731,45 @@ def exec_(_code_, _globs_=None, _locs_=None): _locs_ = _globs_ exec("""exec _code_ in _globs_, _locs_""") - exec_("""def reraise(tp, value, tb=None): + exec_( + """def reraise(tp, value, tb=None): try: raise tp, value, tb finally: tb = None -""") +""" + ) if sys.version_info[:2] == (3, 2): - exec_("""def raise_from(value, from_value): + exec_( + """def raise_from(value, from_value): try: if from_value is None: raise value raise value from from_value finally: value = None -""") +""" + ) elif sys.version_info[:2] > (3, 2): - exec_("""def raise_from(value, from_value): + exec_( + """def raise_from(value, from_value): try: raise value from from_value finally: value = None -""") +""" + ) else: + def raise_from(value, from_value): raise value print_ = getattr(moves.builtins, "print", None) if print_ is None: + def print_(*args, **kwargs): """The new-style print function for Python 2.4 and 2.5.""" fp = kwargs.pop("file", sys.stdout) @@ -745,14 +780,13 @@ def write(data): if not isinstance(data, basestring): data = str(data) # If the file has an encoding, encode unicode with it. - if (isinstance(fp, file) and - isinstance(data, unicode) and - fp.encoding is not None): + if isinstance(fp, file) and isinstance(data, unicode) and fp.encoding is not None: errors = getattr(fp, "errors", None) if errors is None: errors = "strict" data = data.encode(fp.encoding, errors) fp.write(data) + want_unicode = False sep = kwargs.pop("sep", None) if sep is not None: @@ -788,6 +822,8 @@ def write(data): write(sep) write(arg) write(end) + + if sys.version_info[:2] < (3, 3): _print = print_ @@ -798,16 +834,20 @@ def print_(*args, **kwargs): if flush and fp is not None: fp.flush() + _add_doc(reraise, """Reraise an exception.""") if sys.version_info[0:2] < (3, 4): - def wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS, - updated=functools.WRAPPER_UPDATES): + + def wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS, updated=functools.WRAPPER_UPDATES): def wrapper(f): f = functools.wraps(wrapped, assigned, updated)(f) f.__wrapped__ = wrapped return f + return wrapper + + else: wraps = functools.wraps @@ -818,33 +858,35 @@ def with_metaclass(meta, *bases): # metaclass for one level of class instantiation that replaces itself with # the actual metaclass. class metaclass(type): - def __new__(cls, name, this_bases, d): return meta(name, bases, d) @classmethod def __prepare__(cls, name, this_bases): return meta.__prepare__(name, bases) - return type.__new__(metaclass, 'temporary_class', (), {}) + + return type.__new__(metaclass, "temporary_class", (), {}) def add_metaclass(metaclass): """Class decorator for creating a class with a metaclass.""" + def wrapper(cls): orig_vars = cls.__dict__.copy() - slots = orig_vars.get('__slots__') + slots = orig_vars.get("__slots__") if slots is not None: if isinstance(slots, str): slots = [slots] for slots_var in slots: orig_vars.pop(slots_var) - orig_vars.pop('__dict__', None) - orig_vars.pop('__weakref__', None) + orig_vars.pop("__dict__", None) + orig_vars.pop("__weakref__", None) return metaclass(cls.__name__, cls.__bases__, orig_vars) + return wrapper -def ensure_binary(s, encoding='utf-8', errors='strict'): +def ensure_binary(s, encoding="utf-8", errors="strict"): """Coerce **s** to six.binary_type. For Python 2: @@ -863,7 +905,7 @@ def ensure_binary(s, encoding='utf-8', errors='strict'): raise TypeError("not expecting type '%s'" % type(s)) -def ensure_str(s, encoding='utf-8', errors='strict'): +def ensure_str(s, encoding="utf-8", errors="strict"): """Coerce *s* to `str`. For Python 2: @@ -883,7 +925,7 @@ def ensure_str(s, encoding='utf-8', errors='strict'): return s -def ensure_text(s, encoding='utf-8', errors='strict'): +def ensure_text(s, encoding="utf-8", errors="strict"): """Coerce *s* to six.text_type. For Python 2: @@ -911,12 +953,13 @@ def python_2_unicode_compatible(klass): returning text and apply this decorator to the class. """ if PY2: - if '__str__' not in klass.__dict__: - raise ValueError("@python_2_unicode_compatible cannot be applied " - "to %s because it doesn't define __str__()." % - klass.__name__) + if "__str__" not in klass.__dict__: + raise ValueError( + "@python_2_unicode_compatible cannot be applied " + "to %s because it doesn't define __str__()." % klass.__name__ + ) klass.__unicode__ = klass.__str__ - klass.__str__ = lambda self: self.__unicode__().encode('utf-8') + klass.__str__ = lambda self: self.__unicode__().encode("utf-8") return klass @@ -936,8 +979,7 @@ def python_2_unicode_compatible(klass): # be floating around. Therefore, we can't use isinstance() to check for # the six meta path importer, since the other six instance will have # inserted an importer with different class. - if (type(importer).__name__ == "_SixMetaPathImporter" and - importer.name == __name__): + if type(importer).__name__ == "_SixMetaPathImporter" and importer.name == __name__: del sys.meta_path[i] break del i, importer diff --git a/hazelcast/statistics.py b/hazelcast/statistics.py index 7d7ee7384e..099c4aa9ce 100644 --- a/hazelcast/statistics.py +++ b/hazelcast/statistics.py @@ -83,13 +83,19 @@ def _get_os_and_runtime_stats(self): psutil_stats = {} if PSUTIL_ENABLED: - if self._can_collect_stat("os.totalPhysicalMemorySize") \ - or self._can_collect_stat("os.freePhysicalMemorySize"): - self._collect_physical_memory_info(psutil_stats, "os.totalPhysicalMemorySize", - "os.freePhysicalMemorySize") - - if self._can_collect_stat("os.totalSwapSpaceSize") or self._can_collect_stat("os.freeSwapSpaceSize"): - self._collect_swap_memory_info(psutil_stats, "os.totalSwapSpaceSize", "os.freeSwapSpaceSize") + if self._can_collect_stat("os.totalPhysicalMemorySize") or self._can_collect_stat( + "os.freePhysicalMemorySize" + ): + self._collect_physical_memory_info( + psutil_stats, "os.totalPhysicalMemorySize", "os.freePhysicalMemorySize" + ) + + if self._can_collect_stat("os.totalSwapSpaceSize") or self._can_collect_stat( + "os.freeSwapSpaceSize" + ): + self._collect_swap_memory_info( + psutil_stats, "os.totalSwapSpaceSize", "os.freeSwapSpaceSize" + ) if self._can_collect_stat("os.systemLoadAverage"): self._collect_load_average(psutil_stats, "os.systemLoadAverage") @@ -102,16 +108,22 @@ def _get_os_and_runtime_stats(self): # With oneshot, process related information could be gathered # faster due to caching. - if self._can_collect_stat("os.committedVirtualMemorySize") \ - or self._can_collect_stat("runtime.usedMemory"): - self._collect_process_memory_info(psutil_stats, "os.committedVirtualMemorySize", - "runtime.usedMemory", process) + if self._can_collect_stat( + "os.committedVirtualMemorySize" + ) or self._can_collect_stat("runtime.usedMemory"): + self._collect_process_memory_info( + psutil_stats, "os.committedVirtualMemorySize", "runtime.usedMemory", process + ) if self._can_collect_stat("os.openFileDescriptorCount"): - self._collect_file_descriptor_count(psutil_stats, "os.openFileDescriptorCount", process) + self._collect_file_descriptor_count( + psutil_stats, "os.openFileDescriptorCount", process + ) if self._can_collect_stat("os.maxFileDescriptorCount"): - self._collect_max_file_descriptor_count(psutil_stats, "os.maxFileDescriptorCount", process) + self._collect_max_file_descriptor_count( + psutil_stats, "os.maxFileDescriptorCount", process + ) if self._can_collect_stat("os.processCpuTime"): self._collect_process_cpu_time(psutil_stats, "os.processCpuTime", process) @@ -140,15 +152,21 @@ def _add_near_cache_stats(self, stats): prefix = "".join(near_cache_name_with_prefix) near_cache_stats = near_cache.get_statistics() - self._add_stat(stats, "creationTime", to_millis(near_cache_stats["creation_time"]), prefix) + self._add_stat( + stats, "creationTime", to_millis(near_cache_stats["creation_time"]), prefix + ) self._add_stat(stats, "evictions", near_cache_stats["evictions"], prefix) self._add_stat(stats, "hits", near_cache_stats["hits"], prefix) self._add_stat(stats, "misses", near_cache_stats["misses"], prefix) self._add_stat(stats, "ownedEntryCount", near_cache_stats["owned_entry_count"], prefix) self._add_stat(stats, "expirations", near_cache_stats["expirations"], prefix) self._add_stat(stats, "invalidations", near_cache_stats["invalidations"], prefix) - self._add_stat(stats, "invalidationRequests", near_cache_stats["invalidation_requests"], prefix) - self._add_stat(stats, "ownedEntryMemoryCost", near_cache_stats["owned_entry_memory_cost"], prefix) + self._add_stat( + stats, "invalidationRequests", near_cache_stats["invalidation_requests"], prefix + ) + self._add_stat( + stats, "ownedEntryMemoryCost", near_cache_stats["owned_entry_memory_cost"], prefix + ) def _add_stat(self, stats, name, value, key_prefix=None): if len(stats) != 0: @@ -168,7 +186,9 @@ def _get_name_with_prefix(self, name): return [Statistics._NEAR_CACHE_CATEGORY_PREFIX, self._escape_special_characters(name)] def _escape_special_characters(self, name): - escaped_name = name.replace("\\", "\\\\").replace(",", "\\,").replace(".", "\\.").replace("=", "\\=") + escaped_name = ( + name.replace("\\", "\\\\").replace(",", "\\,").replace(".", "\\.").replace("=", "\\=") + ) return escaped_name[1:] if name[0] == "/" else escaped_name def _can_collect_stat(self, name): @@ -180,8 +200,11 @@ def safe_wrapper(self, psutil_stats, probe_name, *args): try: stat = func(self, psutil_stats, probe_name, *args) except AttributeError as ae: - _logger.debug("Unable to register psutil method used for the probe %s. " - "Cause: %s", probe_name, ae) + _logger.debug( + "Unable to register psutil method used for the probe %s. " "Cause: %s", + probe_name, + ae, + ) self._failed_gauges.add(probe_name) return except Exception as ex: diff --git a/hazelcast/transaction.py b/hazelcast/transaction.py index 7a8ee524fa..22f9529207 100644 --- a/hazelcast/transaction.py +++ b/hazelcast/transaction.py @@ -4,7 +4,11 @@ from hazelcast.errors import TransactionError, IllegalStateError from hazelcast.future import make_blocking from hazelcast.invocation import Invocation -from hazelcast.protocol.codec import transaction_create_codec, transaction_commit_codec, transaction_rollback_codec +from hazelcast.protocol.codec import ( + transaction_create_codec, + transaction_commit_codec, + transaction_rollback_codec, +) from hazelcast.proxy.transactional_list import TransactionalList from hazelcast.proxy.transactional_map import TransactionalMap from hazelcast.proxy.transactional_multi_map import TransactionalMultiMap @@ -54,8 +58,12 @@ def _connect(self): if connection: return connection - _logger.debug("Could not get a connection for the transaction. Attempt %d of %d", count, - RETRY_COUNT, exc_info=True) + _logger.debug( + "Could not get a connection for the transaction. Attempt %d of %d", + count, + RETRY_COUNT, + exc_info=True, + ) if count + 1 == RETRY_COUNT: raise IllegalStateError("No active connection is found") @@ -80,6 +88,7 @@ class Transaction(object): """Provides transactional operations: beginning/committing transactions, but also retrieving transactional data-structures like the TransactionalMap. """ + state = _STATE_NOT_STARTED id = None start_time = None @@ -96,7 +105,7 @@ def __init__(self, context, connection, timeout, durability, transaction_type): def begin(self): """Begins this transaction.""" - if hasattr(self._locals, 'transaction_exists') and self._locals.transaction_exists: + if hasattr(self._locals, "transaction_exists") and self._locals.transaction_exists: raise TransactionError("Nested transactions are not allowed.") if self.state != _STATE_NOT_STARTED: raise TransactionError("Transaction has already been started.") @@ -104,11 +113,15 @@ def begin(self): self.start_time = time.time() self.thread_id = thread_id() try: - request = transaction_create_codec.encode_request(timeout=int(self.timeout * 1000), - durability=self.durability, - transaction_type=self.transaction_type, - thread_id=self.thread_id) - invocation = Invocation(request, connection=self.connection, response_handler=lambda m: m) + request = transaction_create_codec.encode_request( + timeout=int(self.timeout * 1000), + durability=self.durability, + transaction_type=self.transaction_type, + thread_id=self.thread_id, + ) + invocation = Invocation( + request, connection=self.connection, response_handler=lambda m: m + ) invocation_service = self._context.invocation_service invocation_service.invoke(invocation) response = invocation.future.result() diff --git a/hazelcast/util.py b/hazelcast/util.py index 72c977be8c..732759fd92 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -83,7 +83,7 @@ def __init__(self, initial=0): def get_and_increment(self): """Returns the current value and increment it. - + Returns: int: Current value of AtomicInteger. """ @@ -152,6 +152,7 @@ def __repr__(self): # Serialization Utilities + def get_portable_version(portable, default_version): try: version = portable.get_class_version() @@ -180,7 +181,9 @@ def calculate_version(version_str): major_coeff = int(tokens[0]) minor_coeff = int(tokens[1]) - calculated_version = major_coeff * MAJOR_VERSION_MULTIPLIER + minor_coeff * MINOR_VERSION_MULTIPLIER + calculated_version = ( + major_coeff * MAJOR_VERSION_MULTIPLIER + minor_coeff * MINOR_VERSION_MULTIPLIER + ) if len(tokens) > 2: patch_coeff = int(tokens[2]) @@ -230,8 +233,10 @@ def try_to_get_enum_value(value, enum_class): if enum_value is not None: return enum_value else: - raise TypeError("%s must be equal to one of the values or " - "names of the members of the %s" % (value, enum_class.__name__)) + raise TypeError( + "%s must be equal to one of the values or " + "names of the members of the %s" % (value, enum_class.__name__) + ) number_types = (six.integer_types, float) @@ -241,10 +246,11 @@ def try_to_get_enum_value(value, enum_class): class LoadBalancer(object): """Load balancer allows you to send operations to one of a number of endpoints (Members). It is up to the implementation to use different load balancing policies. - + If the client is configured with smart routing, only the operations that are not key based will be routed to the endpoint """ + def init(self, cluster_service): """Initializes the load balancer. @@ -263,7 +269,6 @@ def next(self): class _AbstractLoadBalancer(LoadBalancer): - def __init__(self): self._cluster_service = None self._members = [] @@ -279,7 +284,7 @@ def _listener(self, _): class RoundRobinLB(_AbstractLoadBalancer): """A load balancer implementation that relies on using round robin to a next member to send a request to. - + Round robin is done based on best effort basis, the order of members for concurrent calls to the next() is not guaranteed. """ diff --git a/pyproject.toml b/pyproject.toml index 8a368ba5ca..658cae8f2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,23 +1,20 @@ [tool.black] line-length = 100 exclude = ''' -( - /( - \.eggs - | \.git - | \.hg - | \.mypy_cache - | \.tox - | \.venv - | _build - | buck-out - | build - | dist +/( + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist - | benchmarks - | hazelcast/protocol/codec - | tests - )/ - | /hazelcast/(?!(__init__))\w+\.py -) + | benchmarks + | hazelcast/protocol/codec + | tests +)/ '''