Skip to content

Commit

Permalink
Make sure the Compact schema is replicated on all known cluster membe…
Browse files Browse the repository at this point in the history
…rs [API-1564] (#615)

* Make sure the Compact schema is replicated on all known cluster members

This was the last task necessary to ensure the correctness of the
client-side implementation under the split brain.

The client might be connected to both halves of the split brain
without knowing it. Eventually, a member list update will come and
it will close connections to one half. But, until then, the client
might replicate the schema in one half, and put the data to other,
breaking our promise that no data will be received by members before
the schema.

To solve this, we have decided to use UUIDs of the members that the
schema is replicated on the client side. If there is at least one
member that is available in the member list on the client-side, but
not part of the schema replication response, we retry a couple of
times, after waiting for a while.

This PR also updates some names of the fields used in protocol,
to comply with their definitions in the protocol repo.

* fix mypy error

* ignore imports added for type checking from the coverage report

* add newline

* add a test about the invocation failure

* address review comments
  • Loading branch information
mdumandag committed Mar 23, 2023
1 parent cd9a317 commit 7923483
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 48 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[report]
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
7 changes: 5 additions & 2 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,15 @@ def __init__(self, config: Config = None, **kwargs):
self._near_cache_manager = NearCacheManager(config, self._serialization_service)
self._internal_lifecycle_service = _InternalLifecycleService(config)
self._lifecycle_service = LifecycleService(self._internal_lifecycle_service)
self._internal_cluster_service = _InternalClusterService(self, config)
self._cluster_service = ClusterService(self._internal_cluster_service)
self._invocation_service = InvocationService(self, config, self._reactor)
self._compact_schema_service = CompactSchemaService(
self._serialization_service.compact_stream_serializer,
self._invocation_service,
self._cluster_service,
self._reactor,
self._config,
)
self._address_provider = self._create_address_provider()
self._internal_partition_service = _InternalPartitionService(self)
Expand All @@ -135,8 +140,6 @@ def __init__(self, config: Config = None, **kwargs):
self._serialization_service,
self._compact_schema_service.send_schema_and_retry,
)
self._internal_cluster_service = _InternalClusterService(self, config)
self._cluster_service = ClusterService(self._internal_cluster_service)
self._connection_manager = ConnectionManager(
self,
config,
Expand Down
99 changes: 84 additions & 15 deletions hazelcast/compact.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,46 @@
import logging
import typing

from hazelcast.errors import HazelcastSerializationError
from hazelcast.errors import HazelcastSerializationError, IllegalStateError
from hazelcast.future import Future, ImmediateFuture
from hazelcast.invocation import InvocationService, Invocation
from hazelcast.invocation import Invocation
from hazelcast.protocol.codec import (
client_fetch_schema_codec,
client_send_schema_codec,
client_send_all_schemas_codec,
)
from hazelcast.serialization.compact import (
CompactStreamSerializer,
Schema,
SchemaNotReplicatedError,
)

if typing.TYPE_CHECKING:
from hazelcast.cluster import ClusterService
from hazelcast.config import Config
from hazelcast.invocation import InvocationService
from hazelcast.protocol.client_message import OutboundMessage
from hazelcast.reactor import AsyncoreReactor
from hazelcast.serialization.compact import (
CompactStreamSerializer,
Schema,
SchemaNotReplicatedError,
)

_logger = logging.getLogger(__name__)


class CompactSchemaService:
_SEND_SCHEMA_RETRY_COUNT = 100

def __init__(
self,
compact_serializer: CompactStreamSerializer,
invocation_service: InvocationService,
compact_serializer: "CompactStreamSerializer",
invocation_service: "InvocationService",
cluster_service: "ClusterService",
reactor: "AsyncoreReactor",
config: "Config",
):
self._compact_serializer = compact_serializer
self._invocation_service = invocation_service
self._cluster_service = cluster_service
self._reactor = reactor
self._invocation_retry_pause = config.invocation_retry_pause

def fetch_schema(self, schema_id: int) -> Future:
_logger.debug(
Expand All @@ -43,23 +58,77 @@ def fetch_schema(self, schema_id: int) -> Future:

def send_schema_and_retry(
self,
error: SchemaNotReplicatedError,
error: "SchemaNotReplicatedError",
func: typing.Callable[..., Future],
*args: typing.Any,
**kwargs: typing.Any,
) -> Future:
schema = error.schema
clazz = error.clazz
request = client_send_schema_codec.encode_request(schema)
invocation = Invocation(request)

def continuation(future):
future.result()
def callback():
self._compact_serializer.register_schema_to_type(schema, clazz)
return func(*args, **kwargs)

return self._replicate_schema(
schema, request, CompactSchemaService._SEND_SCHEMA_RETRY_COUNT, callback
)

def _replicate_schema(
self,
schema: "Schema",
request: "OutboundMessage",
remaining_retries: int,
callback: typing.Callable[..., Future],
) -> Future:
def continuation(future: Future):
replicated_members = future.result()
members = self._cluster_service.get_members()
for member in members:
if member.uuid not in replicated_members:
break
else:
# Loop completed normally.
# All members in our member list all known to have the schema
return callback()

# There is a member in our member list that the schema
# is not known to be replicated yet. We should retry
# sending it in a random member.
if remaining_retries <= 1:
# We tried to send it a couple of times, but the member list
# in our local and the member list returned by the initiator
# nodes did not match.
raise IllegalStateError(
f"The schema {schema} cannot be replicated in the cluster, "
f"after {CompactSchemaService._SEND_SCHEMA_RETRY_COUNT} retries. "
f"It might be the case that the client is connected to the two "
f"halves of the cluster that is experiencing a split-brain, "
f"and continue putting the data associated with that schema "
f"might result in data loss. It might be possible to replicate "
f"the schema after some time, when the cluster is healed."
)

delayed_future: Future = Future()
self._reactor.add_timer(
self._invocation_retry_pause,
lambda: delayed_future.set_result(None),
)

def retry(_):
return self._replicate_schema(
schema, request.copy(), remaining_retries - 1, callback
)

return delayed_future.continue_with(retry)

return self._send_schema_replication_request(request).continue_with(continuation)

def _send_schema_replication_request(self, request: "OutboundMessage") -> Future:
invocation = Invocation(request, response_handler=client_send_schema_codec.decode_response)
self._invocation_service.invoke(invocation)
return invocation.future.continue_with(continuation)
return invocation.future

def send_all_schemas(self) -> Future:
schemas = self._compact_serializer.get_schemas()
Expand All @@ -74,7 +143,7 @@ def send_all_schemas(self) -> Future:
self._invocation_service.invoke(invocation)
return invocation.future

def register_fetched_schema(self, schema_id: int, schema: typing.Optional[Schema]) -> None:
def register_fetched_schema(self, schema_id: int, schema: typing.Optional["Schema"]) -> None:
if not schema:
raise HazelcastSerializationError(
f"The schema with the id {schema_id} can not be found in the cluster."
Expand Down
29 changes: 29 additions & 0 deletions hazelcast/protocol/builtin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing
import uuid
from datetime import date, time, datetime, timedelta, timezone
from decimal import Decimal
Expand Down Expand Up @@ -34,6 +35,9 @@
from hazelcast.serialization.data import Data
from hazelcast.util import int_from_bytes

if typing.TYPE_CHECKING:
from hazelcast.protocol.client_message import InboundMessage

_LOCAL_DATE_SIZE_IN_BYTES = INT_SIZE_IN_BYTES + BYTE_SIZE_IN_BYTES * 2
_LOCAL_TIME_SIZE_IN_BYTES = BYTE_SIZE_IN_BYTES * 3 + INT_SIZE_IN_BYTES
_LOCAL_DATE_TIME_SIZE_IN_BYTES = _LOCAL_DATE_SIZE_IN_BYTES + _LOCAL_TIME_SIZE_IN_BYTES
Expand Down Expand Up @@ -481,6 +485,31 @@ def decode(msg):
return result


class SetUUIDCodec:
@staticmethod
def encode(buf: bytearray, s: typing.Set[uuid.UUID], is_final=False):
n = len(s)
size = SIZE_OF_FRAME_LENGTH_AND_FLAGS + n * UUID_SIZE_IN_BYTES
b = bytearray(size)
LE_INT.pack_into(b, 0, size)
if is_final:
LE_UINT16.pack_into(b, INT_SIZE_IN_BYTES, _IS_FINAL_FLAG)
for i, u in enumerate(s):
FixSizedTypesCodec.encode_uuid(
b, SIZE_OF_FRAME_LENGTH_AND_FLAGS + i * UUID_SIZE_IN_BYTES, u
)
buf.extend(b)

@staticmethod
def decode(msg: "InboundMessage") -> typing.Set[uuid.UUID]:
b = msg.next_frame().buf
n = len(b) // UUID_SIZE_IN_BYTES
result = set()
for i in range(n):
result.add(FixSizedTypesCodec.decode_uuid(b, i * UUID_SIZE_IN_BYTES))
return result


class LongArrayCodec:
@staticmethod
def encode(buf, arr, is_final=False):
Expand Down
6 changes: 6 additions & 0 deletions hazelcast/protocol/codec/client_send_schema_codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
from hazelcast.protocol.codec.custom.schema_codec import SchemaCodec
from hazelcast.protocol.builtin import SetUUIDCodec

# hex: 0x001300
_REQUEST_MESSAGE_TYPE = 4864
Expand All @@ -13,3 +14,8 @@ def encode_request(schema):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
SchemaCodec.encode(buf, schema, True)
return OutboundMessage(buf, True)


def decode_response(msg):
msg.next_frame()
return SetUUIDCodec.decode(msg)
6 changes: 3 additions & 3 deletions hazelcast/protocol/codec/custom/field_descriptor_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def encode(buf, field_descriptor, is_final=False):
initial_frame_buf = create_initial_buffer_custom(_INITIAL_FRAME_SIZE)
FixSizedTypesCodec.encode_int(initial_frame_buf, _KIND_ENCODE_OFFSET, field_descriptor.kind)
buf.extend(initial_frame_buf)
StringCodec.encode(buf, field_descriptor.name)
StringCodec.encode(buf, field_descriptor.field_name)
if is_final:
buf.extend(END_FINAL_FRAME_BUF)
else:
Expand All @@ -26,6 +26,6 @@ def decode(msg):
msg.next_frame()
initial_frame = msg.next_frame()
kind = FixSizedTypesCodec.decode_int(initial_frame.buf, _KIND_DECODE_OFFSET)
name = StringCodec.decode(msg)
field_name = StringCodec.decode(msg)
CodecUtil.fast_forward_to_end_frame(msg)
return FieldDescriptor(name, kind)
return FieldDescriptor(field_name, kind)
6 changes: 3 additions & 3 deletions hazelcast/protocol/codec/custom/schema_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class SchemaCodec:
def encode(buf, schema, is_final=False):
buf.extend(BEGIN_FRAME_BUF)
StringCodec.encode(buf, schema.type_name)
ListMultiFrameCodec.encode(buf, schema.fields_list, FieldDescriptorCodec.encode)
ListMultiFrameCodec.encode(buf, schema.fields, FieldDescriptorCodec.encode)
if is_final:
buf.extend(END_FINAL_FRAME_BUF)
else:
Expand All @@ -21,6 +21,6 @@ def encode(buf, schema, is_final=False):
def decode(msg):
msg.next_frame()
type_name = StringCodec.decode(msg)
fields_list = ListMultiFrameCodec.decode(msg, FieldDescriptorCodec.decode)
fields = ListMultiFrameCodec.decode(msg, FieldDescriptorCodec.decode)
CodecUtil.fast_forward_to_end_frame(msg)
return Schema(type_name, fields_list)
return Schema(type_name, fields)
33 changes: 18 additions & 15 deletions hazelcast/serialization/compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def _write_positions(self, data_length: int, positions: typing.List[int]):
self._out.write_int(position)

def _get_field(self, field_name: str, field_kind: "FieldKind") -> "FieldDescriptor":
field = self._schema.fields.get(field_name, None)
field = self._schema.fields_dict.get(field_name, None)
if not field:
raise HazelcastSerializationError(
f"No field with the name '{field_name}' can found in the {self._schema}"
Expand Down Expand Up @@ -636,7 +636,7 @@ def __init__(
inp.set_position(end_position)

def get_field_kind(self, field_name) -> "FieldKind":
field = self._schema.fields.get(field_name)
field = self._schema.fields_dict.get(field_name)
if not field:
return FieldKind.NOT_AVAILABLE
return field.kind
Expand Down Expand Up @@ -1353,7 +1353,7 @@ def read_array_of_compact_or_default(
return self.read_array_of_compact(field_name)

def _get_field(self, field_name: str) -> "FieldDescriptor":
field = self._schema.fields.get(field_name)
field = self._schema.fields_dict.get(field_name)
if not field:
raise HazelcastSerializationError(
f"No field with the name '{field_name}' can be found in the {self._schema}"
Expand Down Expand Up @@ -1424,7 +1424,7 @@ def _read_var_sized_field_position(self, field: "FieldDescriptor") -> int:
return position + self._data_start_position

def _is_field_exists(self, field_name: str, field_kind: "FieldKind") -> bool:
field = self._schema.fields.get(field_name)
field = self._schema.fields_dict.get(field_name)
if not field:
return False

Expand Down Expand Up @@ -1769,13 +1769,13 @@ def _add_field(self, name: str, kind: "FieldKind"):


class Schema:
def __init__(self, type_name: str, fields_list: typing.List["FieldDescriptor"]):
def __init__(self, type_name: str, fields: typing.List["FieldDescriptor"]):
self.type_name = type_name
self.fields: typing.Dict[str, "FieldDescriptor"] = {f.name: f for f in fields_list}
self.fields_dict: typing.Dict[str, "FieldDescriptor"] = {f.name: f for f in fields}
# Sort the fields by the field name so that the field offsets/indexes
# can be set correctly.
fields_list.sort(key=lambda f: f.name)
self.fields_list = fields_list
fields.sort(key=lambda f: f.name)
self.fields = fields
self.schema_id: int = 0
self.var_sized_field_count: int = 0
self.fix_sized_fields_length: int = 0
Expand All @@ -1786,7 +1786,7 @@ def _init(self):
var_sized_fields = []
bool_fields = []

for field in self.fields_list:
for field in self.fields:
kind = field.kind
if FIELD_OPERATIONS[field.kind].is_var_sized():
var_sized_fields.append(field)
Expand Down Expand Up @@ -1837,17 +1837,15 @@ def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, Schema)
and self.type_name == other.type_name
and self.fields_dict == other.fields_dict
and self.fields == other.fields
and self.fields_list == other.fields_list
and self.schema_id == other.schema_id
and self.var_sized_field_count == other.var_sized_field_count
and self.fix_sized_fields_length == other.fix_sized_fields_length
)

def __repr__(self) -> str:
return (
f"Schema(schema_id={self.schema_id}, type_name={self.type_name}, fields={self.fields})"
)
return f"Schema(schema_id={self.schema_id}, type_name={self.type_name}, fields={self.fields_dict})"


class FieldDescriptor:
Expand All @@ -1860,6 +1858,11 @@ def __init__(self, name: str, kind: "FieldKind"):
self.position = -1
self.bit_position = -1

@property
def field_name(self) -> str:
# Needed for protocol
return self.name

def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, FieldDescriptor)
Expand Down Expand Up @@ -1893,8 +1896,8 @@ class RabinFingerprint:
@staticmethod
def of(schema: Schema) -> int:
fp = RabinFingerprint._of_str(RabinFingerprint._EMPTY, schema.type_name)
fp = RabinFingerprint._of_i32(fp, len(schema.fields))
for field in schema.fields_list:
fp = RabinFingerprint._of_i32(fp, len(schema.fields_dict))
for field in schema.fields:
fp = RabinFingerprint._of_str(fp, field.name)
fp = RabinFingerprint._of_i32(fp, field.kind)
return fp
Expand Down

0 comments on commit 7923483

Please sign in to comment.