Skip to content

Commit

Permalink
Send client state to cluster in reconnections (#613)
Browse files Browse the repository at this point in the history
In split-brain scenarios, the cluster id stays the same for
different halves of the split.

When the client disconnects from the first half it was connected
to and reconnects to the other half, from the point of view of
the client, it connects to the same cluster.

However, it might very well be the case that, the client has sent
some state to the first half of the cluster, and that state must
be replicated to the other half when the client reconnects to it.

This is especially needed for Compact schemas.

In this PR, we are sending the client state to the cluster after
reconnections, regardless it is connected back to possibly the same
cluster with the same id or not.

We couldn't add proper split brain tests, as the remote controller
does not have that ability yet. However, I have added a simple
test that verifies the current behavior.
  • Loading branch information
mdumandag committed Mar 10, 2023
1 parent b2196d4 commit b728a54
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
20 changes: 18 additions & 2 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def __init__(
self._near_cache_manager = near_cache_manager
self._send_state_to_cluster_fn = send_state_to_cluster_fn
self._client_state = _ClientState.INITIAL # must be modified under the _lock
self._established_initial_cluster_connection = False # must be modified under the _lock
self._smart_routing_enabled = config.smart_routing
self._wait_strategy = self._init_wait_strategy(config)
self._reconnect_mode = config.reconnect_mode
Expand Down Expand Up @@ -667,15 +668,30 @@ def _handle_successful_auth(self, response, connection):

is_initial_connection = not self.active_connections
self.active_connections[remote_uuid] = connection
fire_connected_lifecycle_event = False
if is_initial_connection:
self._cluster_id = new_cluster_id
if changed_cluster:
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = _ClientState.CONNECTED_TO_CLUSTER
self._initialize_on_cluster(new_cluster_id)
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = _ClientState.INITIALIZED_ON_CLUSTER

if is_initial_connection and not changed_cluster:
if fire_connected_lifecycle_event:
self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED)

_logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def write(self, writer: CompactWriter, obj: SomeFields) -> None:
def get_type_name(self) -> str:
return SomeFields.__name__

def get_class(self):
def get_class(self) -> typing.Type[SomeFields]:
return SomeFields

self._write_then_read0(all_fields, Serializer(list(all_fields.keys())))
Expand Down Expand Up @@ -803,7 +803,7 @@ def write(self, writer: CompactWriter, obj: Nested) -> None:
def get_type_name(self) -> str:
return Nested.__name__

def get_class(self) -> Nested:
def get_class(self) -> typing.Type[Nested]:
return Nested


Expand Down Expand Up @@ -851,7 +851,7 @@ def write(self, writer: CompactWriter, obj: SomeFields) -> None:
def get_type_name(self) -> str:
return SomeFields.__name__

def get_class(self) -> SomeFields:
def get_class(self) -> typing.Type[SomeFields]:
return SomeFields

@staticmethod
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/connection_manager_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import uuid

from mock import patch
Expand All @@ -6,6 +7,7 @@
from hazelcast.core import Address, MemberInfo, MemberVersion, EndpointQualifier, ProtocolType
from hazelcast.errors import IllegalStateError, TargetDisconnectedError
from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture
from hazelcast.lifecycle import LifecycleState
from hazelcast.util import AtomicInteger
from tests.base import HazelcastTestCase, SingleMemberTestCase
from tests.util import random_string
Expand Down Expand Up @@ -180,10 +182,46 @@ def send_state_to_cluster_fn():

self.assertEqual(5, counter.get())

def test_client_state_is_sent_on_reconnection_when_the_cluster_id_is_same(self):
disconnected = threading.Event()
reconnected = threading.Event()

def listener(state):
if state == LifecycleState.DISCONNECTED:
disconnected.set()
elif disconnected.is_set() and state == LifecycleState.CONNECTED:
reconnected.set()

self.client.lifecycle_service.add_listener(listener)

conn_manager = self.client._connection_manager
counter = AtomicInteger()

def send_state_to_cluster_fn():
counter.add(1)
return ImmediateFuture(None)

conn_manager._send_state_to_cluster_fn = send_state_to_cluster_fn

# Keep the cluster alive, but close the connection
# to simulate re-connection to a cluster with
# the same cluster id.
connection = conn_manager.get_random_connection()
connection.close_connection("expected", None)

disconnected.wait()
reconnected.wait()

self._wait_until_state_is_sent()

self.assertEqual(1, counter.get())

def _restart_cluster(self):
self.rc.terminateMember(self.cluster.id, self.member.uuid)
ConnectionManagerOnClusterRestartTest.member = self.cluster.start_member()
self._wait_until_state_is_sent()

def _wait_until_state_is_sent(self):
# Perform an invocation to wait until the client state is sent
m = self.client.get_map(random_string()).blocking()
m.set(1, 1)
Expand Down

0 comments on commit b728a54

Please sign in to comment.