Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,3 @@ target/
.directory

*.jar

# Git version info related
git_info.json
hazelcast/git_info.json
246 changes: 86 additions & 160 deletions README.md

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions hazelcast/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
__version__ = "4.0.0"

# Set the default handler to "hazelcast" loggers
# to avoid "No handlers could be found" warnings.
import logging
try:
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass

logging.getLogger(__name__).addHandler(NullHandler())

from hazelcast.client import HazelcastClient
from hazelcast.version import CLIENT_VERSION_INFO as __version_info__
from hazelcast.version import CLIENT_VERSION as __version__
49 changes: 16 additions & 33 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import logging.config
import threading

from hazelcast import six
Expand All @@ -23,10 +22,12 @@
from hazelcast.serialization import SerializationServiceV1
from hazelcast.statistics import Statistics
from hazelcast.transaction import TWO_PHASE, TransactionManager
from hazelcast.util import AtomicInteger, DEFAULT_LOGGING, RoundRobinLB
from hazelcast.util import AtomicInteger, RoundRobinLB
from hazelcast.discovery import HazelcastCloudAddressProvider
from hazelcast.errors import IllegalStateError

_logger = logging.getLogger(__name__)


class HazelcastClient(object):
"""Hazelcast client instance to access access and manipulate
Expand Down Expand Up @@ -297,46 +298,40 @@ class SomeClassSerializer(StreamSerializer):
"""

_CLIENT_ID = AtomicInteger()
logger = logging.getLogger("HazelcastClient")

def __init__(self, **kwargs):
config = _Config.from_dict(kwargs)
self.config = config
self._context = _ClientContext()
client_id = HazelcastClient._CLIENT_ID.get_and_increment()
self.name = self._create_client_name(client_id)
self._init_logger()
self._logger_extras = {"client_name": self.name, "cluster_name": self.config.cluster_name}
self._reactor = AsyncoreReactor(self._logger_extras)
self._reactor = AsyncoreReactor()
self._serialization_service = SerializationServiceV1(config)
self._near_cache_manager = NearCacheManager(self, self._serialization_service)
self._internal_lifecycle_service = _InternalLifecycleService(self, self._logger_extras)
self._internal_lifecycle_service = _InternalLifecycleService(self)
self.lifecycle_service = LifecycleService(self._internal_lifecycle_service)
self._invocation_service = InvocationService(self, self._reactor, self._logger_extras)
self._invocation_service = InvocationService(self, self._reactor)
self._address_provider = self._create_address_provider()
self._internal_partition_service = _InternalPartitionService(self, self._logger_extras)
self._internal_partition_service = _InternalPartitionService(self)
self.partition_service = PartitionService(self._internal_partition_service, self._serialization_service)
self._internal_cluster_service = _InternalClusterService(self, self._logger_extras)
self._internal_cluster_service = _InternalClusterService(self)
self.cluster_service = ClusterService(self._internal_cluster_service)
self._connection_manager = ConnectionManager(self, self._reactor, self._address_provider,
self._internal_lifecycle_service,
self._internal_partition_service,
self._internal_cluster_service,
self._invocation_service,
self._near_cache_manager,
self._logger_extras)
self._near_cache_manager)
self._load_balancer = self._init_load_balancer(config)
self._listener_service = ListenerService(self, self._connection_manager,
self._invocation_service,
self._logger_extras)
self._invocation_service)
self._proxy_manager = ProxyManager(self._context)
self.cp_subsystem = CPSubsystem(self._context)
self._proxy_session_manager = ProxySessionManager(self._context)
self._transaction_manager = TransactionManager(self._context, self._logger_extras)
self._transaction_manager = TransactionManager(self._context)
self._lock_reference_id_generator = AtomicInteger(1)
self._statistics = Statistics(self, self._reactor, self._connection_manager,
self._invocation_service, self._near_cache_manager,
self._logger_extras)
self._invocation_service, self._near_cache_manager)
self._cluster_view_listener = ClusterViewListenerService(self, self._connection_manager,
self._internal_partition_service,
self._internal_cluster_service,
Expand All @@ -349,7 +344,7 @@ def _init_context(self):
self._context.init_context(self.config, self._invocation_service, self._internal_partition_service,
self._internal_cluster_service, self._connection_manager,
self._serialization_service, self._listener_service, self._proxy_manager,
self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras,
self._near_cache_manager, self._lock_reference_id_generator,
self.name, self._proxy_session_manager, self._reactor)

def _start(self):
Expand All @@ -373,7 +368,7 @@ def _start(self):
except:
self.shutdown()
raise
self.logger.info("Client started.", extra=self._logger_extras)
_logger.info("Client started")

def get_executor(self, name):
"""Creates cluster-wide ExecutorService.
Expand Down Expand Up @@ -620,19 +615,10 @@ def _create_address_provider(self):

if cloud_enabled:
connection_timeout = self._get_connection_timeout(config)
return HazelcastCloudAddressProvider(cloud_discovery_token, connection_timeout, self._logger_extras)
return HazelcastCloudAddressProvider(cloud_discovery_token, connection_timeout)

return DefaultAddressProvider(cluster_members)

def _init_logger(self):
config = self.config
logging_config = config.logging_config
if logging_config:
logging.config.dictConfig(logging_config)
else:
logging.config.dictConfig(DEFAULT_LOGGING)
self.logger.setLevel(config.logging_level)

def _create_client_name(self, client_id):
client_name = self.config.client_name
if client_name:
Expand Down Expand Up @@ -668,16 +654,14 @@ def __init__(self):
self.proxy_manager = None
self.near_cache_manager = None
self.lock_reference_id_generator = None
self.logger_extras = None
self.name = None
self.proxy_session_manager = None
self.reactor = None

def init_context(self, config, invocation_service, partition_service,
cluster_service, connection_manager, serialization_service,
listener_service, proxy_manager, near_cache_manager,
lock_reference_id_generator, logger_extras, name,
proxy_session_manager, reactor):
lock_reference_id_generator, name, proxy_session_manager, reactor):
self.config = config
self.invocation_service = invocation_service
self.partition_service = partition_service
Expand All @@ -688,7 +672,6 @@ def init_context(self, config, invocation_service, partition_service,
self.proxy_manager = proxy_manager
self.near_cache_manager = near_cache_manager
self.lock_reference_id_generator = lock_reference_id_generator
self.logger_extras = logger_extras
self.name = name
self.proxy_session_manager = proxy_session_manager
self.reactor = reactor
21 changes: 10 additions & 11 deletions hazelcast/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from hazelcast.errors import TargetDisconnectedError, IllegalStateError
from hazelcast.util import check_not_none

_logger = logging.getLogger(__name__)


class _MemberListSnapshot(object):
__slots__ = ("version", "members")
Expand Down Expand Up @@ -101,12 +103,10 @@ def get_members(self, member_selector=None):


class _InternalClusterService(object):
logger = logging.getLogger("HazelcastClient.ClusterService")

def __init__(self, client, logger_extras):
def __init__(self, client):
self._client = client
self._connection_manager = None
self._logger_extras = logger_extras
config = client.config
self._labels = frozenset(config.labels)
self._listeners = {}
Expand Down Expand Up @@ -183,18 +183,17 @@ def wait_initial_member_list_fetched(self):
raise IllegalStateError("Could not get initial member list from cluster!")

def clear_member_list_version(self):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting the member list version", extra=self._logger_extras)
_logger.debug("Resetting the member list version")

current = self._member_list_snapshot
if current is not _EMPTY_SNAPSHOT:
self._member_list_snapshot = _MemberListSnapshot(0, current.members)

def handle_members_view_event(self, version, member_infos):
snapshot = self._create_snapshot(version, member_infos)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Handling new snapshot with membership version: %s, member string: %s"
% (version, self._members_string(snapshot)), extra=self._logger_extras)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Handling new snapshot with membership version: %s, member string: %s",
version, self._members_string(snapshot))

current = self._member_list_snapshot
if version >= current.version:
Expand All @@ -214,15 +213,15 @@ def _apply_new_state_and_fire_events(self, current, snapshot):
try:
handler(removed_member)
except:
self.logger.exception("Exception in membership lister", extra=self._logger_extras)
_logger.exception("Exception in membership listener")

for added_member in additions:
for handler, _ in six.itervalues(self._listeners):
if handler:
try:
handler(added_member)
except:
self.logger.exception("Exception in membership lister", extra=self._logger_extras)
_logger.exception("Exception in membership listener")

def _detect_membership_events(self, old, new):
new_members = []
Expand All @@ -242,7 +241,7 @@ def _detect_membership_events(self, old, new):

if (len(new_members) + len(dead_members)) > 0:
if len(new.members) > 0:
self.logger.info(self._members_string(new), extra=self._logger_extras)
_logger.info(self._members_string(new))

return dead_members, new_members

Expand Down
35 changes: 2 additions & 33 deletions hazelcast/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
"""
Hazelcast Client Configuration module contains configuration classes and various constants required to create a ClientConfig.

"""
import logging
import re

from hazelcast import six
Expand Down Expand Up @@ -457,9 +452,8 @@ class _Config(object):
"_membership_listeners", "_lifecycle_listeners", "_flake_id_generators",
"_labels", "_heartbeat_interval", "_heartbeat_timeout",
"_invocation_timeout", "_invocation_retry_pause", "_statistics_enabled",
"_statistics_period", "_shuffle_member_list", "_logging_config",
"_logging_level", "_backup_ack_to_client_enabled", "_operation_backup_timeout",
"_fail_on_indeterminate_operation_state")
"_statistics_period", "_shuffle_member_list", "_backup_ack_to_client_enabled",
"_operation_backup_timeout", "_fail_on_indeterminate_operation_state")

def __init__(self):
self._cluster_members = []
Expand Down Expand Up @@ -506,8 +500,6 @@ def __init__(self):
self._statistics_enabled = False
self._statistics_period = 3.0
self._shuffle_member_list = True
self._logging_config = None
self._logging_level = logging.INFO
self._backup_ack_to_client_enabled = True
self._operation_backup_timeout = 5.0
self._fail_on_indeterminate_operation_state = False
Expand Down Expand Up @@ -1112,29 +1104,6 @@ def shuffle_member_list(self, value):
else:
raise TypeError("shuffle_member_list must be a boolean")

@property
def logging_config(self):
return self._logging_config

@logging_config.setter
def logging_config(self, value):
if isinstance(value, dict):
self._logging_config = value
else:
raise TypeError("logging_config must be a dict")

@property
def logging_level(self):
return self._logging_level

@logging_level.setter
def logging_level(self, value):
if value in {logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL}:
self._logging_level = value
else:
raise TypeError("logging_level must be a valid value")

@property
def backup_ack_to_client_enabled(self):
return self._backup_ack_to_client_enabled
Expand Down
Loading