Skip to content

Commit

Permalink
Do not invoke urgent invocations that contain serialized data immedia…
Browse files Browse the repository at this point in the history
…tely (#617)

Although we believe it was a mistake, the urgent invocations on the
client-side are used for some user invocations like listener registrations,
apart from the actual urgent invocations like heartbeats, authentication, etc.

When the client reconnects to some cluster, it sends the local state
in some executor. So, there might be some time between the local state is
sent. However, during that time, urgent invocations are allowed to
go through from the client. That might violate our assumption that
the schema is received by the cluster before the data.

To solve this, we will not invoke urgent invocations that contain
serialized data if the client is not initialized on the cluster, and there
were some compact schemas sent to the previous clusters. Such
invocations are only related to user invocations so we will not delay
invoking actual urgent invocations like heartbeats.
  • Loading branch information
mdumandag committed Mar 24, 2023
1 parent 7923483 commit a76d653
Show file tree
Hide file tree
Showing 143 changed files with 275 additions and 150 deletions.
9 changes: 9 additions & 0 deletions hazelcast/compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
self._cluster_service = cluster_service
self._reactor = reactor
self._invocation_retry_pause = config.invocation_retry_pause
self._has_replicated_schemas = False

def fetch_schema(self, schema_id: int) -> Future:
_logger.debug(
Expand Down Expand Up @@ -68,6 +69,7 @@ def send_schema_and_retry(
request = client_send_schema_codec.encode_request(schema)

def callback():
self._has_replicated_schemas = True
self._compact_serializer.register_schema_to_type(schema, clazz)
return func(*args, **kwargs)

Expand Down Expand Up @@ -150,3 +152,10 @@ def register_fetched_schema(self, schema_id: int, schema: typing.Optional["Schem
)

self._compact_serializer.register_schema_to_id(schema)

def has_replicated_schemas(self):
"""
Returns ``True`` is the client has replicated
any Compact schemas to the cluster.
"""
return self._has_replicated_schemas
7 changes: 7 additions & 0 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ def check_invocation_allowed(self):
else:
raise IOError("No connection found to cluster")

def initialized_on_cluster(self) -> bool:
"""
Returns ``True`` if the client is initialized on the cluster, by
sending its local state, if necessary.
"""
return self._client_state == _ClientState.INITIALIZED_ON_CLUSTER

def _get_or_connect_to_address(self, address):
for connection in list(self.active_connections.values()):
if connection.remote_address == address:
Expand Down
16 changes: 16 additions & 0 deletions hazelcast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,22 @@ class NotLeaderError(HazelcastError):
pass


class InvocationMightContainCompactDataError(HazelcastError):
"""
Signals that the invocation might contain Compact serialized data,
and it would not be safe to send that invocation now to make sure
that the invariant regarding not sending the data before the schemas
are hold while the client reconnects or retries urgent invocations.
"""

def __init__(self):
super(InvocationMightContainCompactDataError, self).__init__(
"The invocation might contain Compact serialized "
"data and it is not safe to invoke it when the client "
"is not yet initialized on the cluster."
)


# Error Codes
_UNDEFINED = 0
_ARRAY_INDEX_OUT_OF_BOUNDS = 1
Expand Down
38 changes: 36 additions & 2 deletions hazelcast/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
EXCEPTION_MESSAGE_TYPE,
IndeterminateOperationStateError,
OperationTimeoutError,
InvocationMightContainCompactDataError,
)
from hazelcast.future import Future
from hazelcast.protocol.client_message import InboundMessage
Expand Down Expand Up @@ -175,7 +176,9 @@ def _invoke_on_random_connection(self, invocation):

def _invoke_smart(self, invocation):
try:
if not invocation.urgent:
if invocation.urgent:
self._check_urgent_invocation_allowed(invocation)
else:
self._check_invocation_allowed_fn()

connection = invocation.connection
Expand Down Expand Up @@ -204,7 +207,9 @@ def _invoke_smart(self, invocation):

def _invoke_non_smart(self, invocation):
try:
if not invocation.urgent:
if invocation.urgent:
self._check_urgent_invocation_allowed(invocation)
else:
self._check_invocation_allowed_fn()

connection = invocation.connection
Expand Down Expand Up @@ -309,6 +314,9 @@ def _retry_if_not_done(self, invocation):
self._do_invoke(invocation)

def _should_retry(self, invocation, error):
if isinstance(error, InvocationMightContainCompactDataError):
return True

if invocation.connection and isinstance(error, (IOError, TargetDisconnectedError)):
return False

Expand All @@ -325,6 +333,32 @@ def _should_retry(self, invocation, error):

return False

def _check_urgent_invocation_allowed(self, invocation: Invocation):
if self._connection_manager.initialized_on_cluster():
# If the client is initialized on the cluster, that means we
# have sent all the schemas to the cluster, even if we are
# reconnected to it
return

if not self._compact_schema_service.has_replicated_schemas():
# If there were no Compact schemas to begin with, we don't need
# to perform the check below. If the client didn't send a Compact
# schema up until this point, the retries or listener registrations
# could not send a schema, because if they were, we wouldn't hit
# this line.
return

# We are not yet initialized on cluster, so the Compact schemas might
# not be sent yet. This message contains some serialized data,
# and it is possible that it can also contain Compact serialized data.
# In that case, allowing this invocation to go through now could
# violate the invariant that the schema must come to cluster before
# the data. We will retry this invocation and wait until the client
# is initialized on the cluster, which means schemas are replicated
# in the cluster.
if invocation.request.contains_data:
raise InvocationMightContainCompactDataError()

def _register_backup_listener(self):
codec = client_local_backup_listener_codec
request = codec.encode_request()
Expand Down
7 changes: 4 additions & 3 deletions hazelcast/protocol/client_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ def create_initial_buffer_custom(size, is_begin_frame=False):


class OutboundMessage:
__slots__ = ("buf", "retryable")
__slots__ = ("buf", "retryable", "contains_data")

def __init__(self, buf, retryable):
def __init__(self, buf, retryable, contains_data=False):
self.buf = buf
self.retryable = retryable
self.contains_data = contains_data

def set_correlation_id(self, correlation_id):
LE_LONG.pack_into(self.buf, _OUTBOUND_MESSAGE_CORRELATION_ID_OFFSET, correlation_id)
Expand All @@ -80,7 +81,7 @@ def set_partition_id(self, partition_id):
LE_INT.pack_into(self.buf, _OUTBOUND_MESSAGE_PARTITION_ID_OFFSET, partition_id)

def copy(self):
return OutboundMessage(bytearray(self.buf), self.retryable)
return OutboundMessage(bytearray(self.buf), self.retryable, self.contains_data)

def set_backup_aware_flag(self):
flags = LE_UINT16.unpack_from(self.buf, INT_SIZE_IN_BYTES)[0]
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/atomic_long_alter_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def encode_request(group_id, name, function, return_value_type):
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name)
DataCodec.encode(buf, function, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/atomic_long_apply_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def encode_request(group_id, name, function):
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name)
DataCodec.encode(buf, function, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/atomic_ref_apply_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def encode_request(group_id, name, function, return_value_type, alter):
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name)
DataCodec.encode(buf, function, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def encode_request(group_id, name, old_value, new_value):
StringCodec.encode(buf, name)
CodecUtil.encode_nullable(buf, old_value, DataCodec.encode)
CodecUtil.encode_nullable(buf, new_value, DataCodec.encode, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/atomic_ref_contains_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def encode_request(group_id, name, value):
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name)
CodecUtil.encode_nullable(buf, value, DataCodec.encode, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/atomic_ref_set_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def encode_request(group_id, name, new_value, return_old_value):
RaftGroupIdCodec.encode(buf, group_id)
StringCodec.encode(buf, name)
CodecUtil.encode_nullable(buf, new_value, DataCodec.encode, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def encode_request(name, uuid, callable, member_uuid):
FixSizedTypesCodec.encode_uuid(buf, _REQUEST_MEMBER_UUID_OFFSET, member_uuid)
StringCodec.encode(buf, name)
DataCodec.encode(buf, callable, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def encode_request(name, uuid, callable):
FixSizedTypesCodec.encode_uuid(buf, _REQUEST_UUID_OFFSET, uuid)
StringCodec.encode(buf, name)
DataCodec.encode(buf, callable, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_add_all_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def encode_request(name, value_list):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, value_list, DataCodec.encode, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_add_all_with_index_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def encode_request(name, index, value_list):
FixSizedTypesCodec.encode_int(buf, _REQUEST_INDEX_OFFSET, index)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, value_list, DataCodec.encode, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_add_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def encode_request(name, value):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_add_with_index_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ def encode_request(name, index, value):
FixSizedTypesCodec.encode_int(buf, _REQUEST_INDEX_OFFSET, index)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def encode_request(name, values):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, values, DataCodec.encode, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def encode_request(name, values):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, values, DataCodec.encode, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_contains_all_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def encode_request(name, values):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, values, DataCodec.encode, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_contains_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def encode_request(name, value):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_index_of_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def encode_request(name, value):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_last_index_of_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def encode_request(name, value):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_remove_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def encode_request(name, value):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/list_set_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def encode_request(name, index, value):
FixSizedTypesCodec.encode_int(buf, _REQUEST_INDEX_OFFSET, index)
StringCodec.encode(buf, name)
DataCodec.encode(buf, value, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def encode_request(name, key, include_value, listener_flags, local_only):
FixSizedTypesCodec.encode_boolean(buf, _REQUEST_LOCAL_ONLY_OFFSET, local_only)
StringCodec.encode(buf, name)
DataCodec.encode(buf, key, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def encode_request(name, key, predicate, include_value, listener_flags, local_on
StringCodec.encode(buf, name)
DataCodec.encode(buf, key)
DataCodec.encode(buf, predicate, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def encode_request(name, predicate, include_value, listener_flags, local_only):
FixSizedTypesCodec.encode_boolean(buf, _REQUEST_LOCAL_ONLY_OFFSET, local_only)
StringCodec.encode(buf, name)
DataCodec.encode(buf, predicate, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/map_add_interceptor_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def encode_request(name, interceptor):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, interceptor, True)
return OutboundMessage(buf, False)
return OutboundMessage(buf, False, True)


def decode_response(msg):
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/protocol/codec/map_aggregate_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def encode_request(name, aggregator):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
DataCodec.encode(buf, aggregator, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def encode_request(name, aggregator, predicate):
StringCodec.encode(buf, name)
DataCodec.encode(buf, aggregator)
DataCodec.encode(buf, predicate, True)
return OutboundMessage(buf, True)
return OutboundMessage(buf, True, True)


def decode_response(msg):
Expand Down

0 comments on commit a76d653

Please sign in to comment.