Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,25 +315,26 @@ class SomeClassSerializer(StreamSerializer):

def __init__(self, **kwargs):
config = _Config.from_dict(kwargs)
self.config = config
self._config = config
self._context = _ClientContext()
client_id = HazelcastClient._CLIENT_ID.get_and_increment()
self.name = self._create_client_name(client_id)
self._reactor = AsyncoreReactor()
self._serialization_service = SerializationServiceV1(config)
self._near_cache_manager = NearCacheManager(self, self._serialization_service)
self._internal_lifecycle_service = _InternalLifecycleService(self)
self._near_cache_manager = NearCacheManager(config, self._serialization_service)
self._internal_lifecycle_service = _InternalLifecycleService(config)
self.lifecycle_service = LifecycleService(self._internal_lifecycle_service)
self._invocation_service = InvocationService(self, self._reactor)
self._invocation_service = InvocationService(self, config, self._reactor)
self._address_provider = self._create_address_provider()
self._internal_partition_service = _InternalPartitionService(self)
self.partition_service = PartitionService(
self._internal_partition_service, self._serialization_service
)
self._internal_cluster_service = _InternalClusterService(self)
self._internal_cluster_service = _InternalClusterService(self, config)
self.cluster_service = ClusterService(self._internal_cluster_service)
self._connection_manager = ConnectionManager(
self,
config,
self._reactor,
self._address_provider,
self._internal_lifecycle_service,
Expand All @@ -344,7 +345,7 @@ def __init__(self, **kwargs):
)
self._load_balancer = self._init_load_balancer(config)
self._listener_service = ListenerService(
self, self._connection_manager, self._invocation_service
self, config, self._connection_manager, self._invocation_service
)
self._proxy_manager = ProxyManager(self._context)
self.cp_subsystem = CPSubsystem(self._context)
Expand All @@ -353,6 +354,7 @@ def __init__(self, **kwargs):
self._lock_reference_id_generator = AtomicInteger(1)
self._statistics = Statistics(
self,
config,
self._reactor,
self._connection_manager,
self._invocation_service,
Expand All @@ -371,7 +373,7 @@ def __init__(self, **kwargs):

def _init_context(self):
self._context.init_context(
self.config,
self._config,
self._invocation_service,
self._internal_partition_service,
self._internal_cluster_service,
Expand All @@ -393,11 +395,11 @@ def _start(self):
self._internal_partition_service, self._connection_manager, self._listener_service
)
self._internal_lifecycle_service.start()
membership_listeners = self.config.membership_listeners
membership_listeners = self._config.membership_listeners
self._internal_cluster_service.start(self._connection_manager, membership_listeners)
self._cluster_view_listener.start()
self._connection_manager.start(self._load_balancer)
if not self.config.async_start:
if not self._config.async_start:
self._internal_cluster_service.wait_initial_member_list_fetched()
self._connection_manager.connect_to_all_cluster_members()

Expand Down Expand Up @@ -569,7 +571,7 @@ def add_distributed_object_listener(self, listener_func):
Returns:
hazelcast.future.Future[str]: A registration id which is used as a key to remove the listener.
"""
is_smart = self.config.smart_routing
is_smart = self._config.smart_routing
codec = client_add_distributed_object_listener_codec
request = codec.encode_request(is_smart)

Expand Down Expand Up @@ -648,7 +650,7 @@ def shutdown(self):
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTDOWN)

def _create_address_provider(self):
config = self.config
config = self._config
cluster_members = config.cluster_members
address_list_provided = len(cluster_members) > 0
cloud_discovery_token = config.cloud_discovery_token
Expand All @@ -667,7 +669,7 @@ def _create_address_provider(self):
return DefaultAddressProvider(cluster_members)

def _create_client_name(self, client_id):
client_name = self.config.client_name
client_name = self._config.client_name
if client_name:
return client_name
return "hz.client_%s" % client_id
Expand Down
3 changes: 1 addition & 2 deletions hazelcast/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ def get_members(self, member_selector=None):


class _InternalClusterService(object):
def __init__(self, client):
def __init__(self, client, config):
self._client = client
self._connection_manager = None
config = client.config
self._labels = frozenset(config.labels)
self._listeners = {}
self._member_list_snapshot = _EMPTY_SNAPSHOT
Expand Down
18 changes: 10 additions & 8 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class ConnectionManager(object):
def __init__(
self,
client,
config,
reactor,
address_provider,
lifecycle_service,
Expand All @@ -108,18 +109,20 @@ def __init__(
self.client_uuid = uuid.uuid4()

self._client = client
self._config = config
self._reactor = reactor
self._address_provider = address_provider
self._lifecycle_service = lifecycle_service
self._partition_service = partition_service
self._cluster_service = cluster_service
self._invocation_service = invocation_service
self._near_cache_manager = near_cache_manager
config = self._client.config
self._smart_routing_enabled = config.smart_routing
self._wait_strategy = self._init_wait_strategy(config)
self._reconnect_mode = config.reconnect_mode
self._heartbeat_manager = _HeartbeatManager(self, self._client, reactor, invocation_service)
self._heartbeat_manager = _HeartbeatManager(
self, self._client, config, reactor, invocation_service
)
self._connection_listeners = []
self._connect_all_members_timer = None
self._async_start = config.async_start
Expand Down Expand Up @@ -368,10 +371,10 @@ def _sync_connect_to_cluster(self):
if not self._wait_strategy.sleep():
break
except (ClientNotAllowedInClusterError, InvalidConfigurationError):
cluster_name = self._client.config.cluster_name
cluster_name = self._config.cluster_name
_logger.exception("Stopped trying on cluster %s", cluster_name)

cluster_name = self._client.config.cluster_name
cluster_name = self._config.cluster_name
_logger.info(
"Unable to connect to any address from the cluster with name: %s. "
"The following addresses were tried: %s",
Expand Down Expand Up @@ -422,7 +425,7 @@ def _get_or_connect(self, address):
self,
self._connection_id_generator.get_and_increment(),
translated,
self._client.config,
self._config,
self._invocation_service.handle_client_message,
)
except IOError:
Expand All @@ -437,7 +440,7 @@ def _get_or_connect(self, address):

def _authenticate(self, connection):
client = self._client
cluster_name = client.config.cluster_name
cluster_name = self._config.cluster_name
client_name = client.name
request = client_authentication_codec.encode_request(
cluster_name,
Expand Down Expand Up @@ -585,12 +588,11 @@ def _get_possible_addresses(self):
class _HeartbeatManager(object):
_heartbeat_timer = None

def __init__(self, connection_manager, client, reactor, invocation_service):
def __init__(self, connection_manager, client, config, reactor, invocation_service):
self._connection_manager = connection_manager
self._client = client
self._reactor = reactor
self._invocation_service = invocation_service
config = client.config
self._heartbeat_timeout = config.heartbeat_timeout
self._heartbeat_interval = config.heartbeat_interval

Expand Down
3 changes: 1 addition & 2 deletions hazelcast/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ def set_exception(self, exception, traceback=None):
class InvocationService(object):
_CLEAN_RESOURCES_PERIOD = 0.1

def __init__(self, client, reactor):
config = client.config
def __init__(self, client, config, reactor):
smart_routing = config.smart_routing
if smart_routing:
self._do_invoke = self._invoke_smart
Expand Down
5 changes: 2 additions & 3 deletions hazelcast/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ def remove_listener(self, registration_id):


class _InternalLifecycleService(object):
def __init__(self, client):
self._client = client
def __init__(self, config):
self.running = False
self._listeners = {}

lifecycle_listeners = client.config.lifecycle_listeners
lifecycle_listeners = config.lifecycle_listeners
if lifecycle_listeners:
for listener in lifecycle_listeners:
self.add_listener(listener)
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def __init__(self, server_registration_id, correlation_id):


class ListenerService(object):
def __init__(self, client, connection_manager, invocation_service):
def __init__(self, client, config, connection_manager, invocation_service):
self._client = client
self._connection_manager = connection_manager
self._invocation_service = invocation_service
self._is_smart = client.config.smart_routing
self._is_smart = config.smart_routing
self._active_registrations = {} # Dict of user_registration_id, ListenerRegistration
self._registration_lock = threading.RLock()
self._event_handlers = {}
Expand Down
6 changes: 3 additions & 3 deletions hazelcast/near_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,15 @@ def __repr__(self):


class NearCacheManager(object):
def __init__(self, client, serialization_service):
self._client = client
def __init__(self, config, serialization_service):
self._config = config
self._serialization_service = serialization_service
self._caches = {}

def get_or_create_near_cache(self, name):
near_cache = self._caches.get(name, None)
if not near_cache:
near_cache_config = self._client.config.near_caches.get(name, None)
near_cache_config = self._config.near_caches.get(name, None)
if not near_cache_config:
raise ValueError("Cannot find a near cache configuration with the name '%s'" % name)

Expand Down
5 changes: 3 additions & 2 deletions hazelcast/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class Statistics(object):

_DEFAULT_PROBE_VALUE = 0

def __init__(self, client, reactor, connection_manager, invocation_service, near_cache_manager):
def __init__(
self, client, config, reactor, connection_manager, invocation_service, near_cache_manager
):
self._client = client
self._reactor = reactor
self._connection_manager = connection_manager
self._invocation_service = invocation_service
self._near_cache_manager = near_cache_manager
config = client.config
self._enabled = config.statistics_enabled
self._period = config.statistics_period
self._statistics_timer = None
Expand Down
4 changes: 2 additions & 2 deletions tests/invocation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ def test_backup_handler_when_all_acks_are_not_received_and_reached_timeout_with_
)

def _start_service(self, config=_Config()):
c = MagicMock(config=config)
invocation_service = InvocationService(c, c._reactor)
c = MagicMock()
invocation_service = InvocationService(c, config, c._reactor)
self.service = invocation_service
invocation_service.init(
c._internal_partition_service, c._connection_manager, c._listener_service
Expand Down
2 changes: 1 addition & 1 deletion tests/proxy/map_nearcache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def configure_client(cls, config):
return config

def setUp(self):
name = list(self.client.config.near_caches.keys())[0]
name = list(self.client._config.near_caches.keys())[0]
self.map = self.client.get_map(name).blocking()

def tearDown(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/statistics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_statistics_content(self):
# in different platforms. So, first try to get these statistics and then check the
# response content

s = Statistics(client, None, None, None, None)
s = Statistics(client, client._config, None, None, None, None)
psutil_stats = s._get_os_and_runtime_stats()
for stat_name in psutil_stats:
self.assertEqual(1, result.count(stat_name))
Expand Down