Skip to content

Commit

Permalink
[API-521] Add support for compact serialization to public APIs (#531)
Browse files Browse the repository at this point in the history
* Add support for compact serialization to public APIs

For compact serialization to work, we have to control the results
of the serialization. Also, for the responses we got from the server,
we have to eagerly deserialize them, and retry after receiving the
schema from the cluster.

With this PR, we make sure that on each API we support compact serialization,
we check the result of the `to_data` call to see if it throws or not.

If it throws, we try to send the schema to the cluster and retry.

Also, this PR adds tests for all the APIs we support compact serialization.
For now, we didn't implement eager deserialization for the APIs that
return `ImmutableLazyDataList`, so there are no tests for such APIs.

To be able to add those tests, we would need quite a bit of compact
serializable classes written in Java. This PR includes aforementioned
Java code, but they might be moved to the remote controller package so
that it would be accessible to all clients.

* address review comments

* assign values to correct field names in the TopicMessage constructor

* improve sql compatibility test

* address review comments
  • Loading branch information
mdumandag committed Mar 31, 2022
1 parent d8d416a commit c844437
Show file tree
Hide file tree
Showing 29 changed files with 2,717 additions and 413 deletions.
9 changes: 7 additions & 2 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ def __init__(self, **kwargs):
self._address_provider = self._create_address_provider()
self._internal_partition_service = _InternalPartitionService(self)
self._partition_service = PartitionService(
self._internal_partition_service, self._serialization_service
self._internal_partition_service,
self._serialization_service,
self._compact_schema_service.send_schema_and_retry,
)
self._internal_cluster_service = _InternalClusterService(self, config)
self._cluster_service = ClusterService(self._internal_cluster_service)
Expand Down Expand Up @@ -431,7 +433,10 @@ def __init__(self, **kwargs):
self._compact_schema_service,
)
self._internal_sql_service = _InternalSqlService(
self._connection_manager, self._serialization_service, self._invocation_service
self._connection_manager,
self._serialization_service,
self._invocation_service,
self._compact_schema_service.send_schema_and_retry,
)
self._sql_service = SqlService(self._internal_sql_service)
self._init_context()
Expand Down
23 changes: 18 additions & 5 deletions hazelcast/compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
client_send_schema_codec,
client_send_all_schemas_codec,
)
from hazelcast.serialization.compact import CompactStreamSerializer, Schema
from hazelcast.serialization.compact import (
CompactStreamSerializer,
Schema,
SchemaNotReplicatedError,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,19 +41,28 @@ def fetch_schema(self, schema_id: int) -> Future:
self._invocation_service.invoke(fetch_schema_invocation)
return fetch_schema_invocation.future

def send_schema(self, schema: Schema, clazz: typing.Type) -> Future:
def send_schema_and_retry(
self,
error: SchemaNotReplicatedError,
func: typing.Callable[..., Future],
*args: typing.Any,
**kwargs: typing.Any,
) -> Future:
schema = error.schema
clazz = error.clazz
request = client_send_schema_codec.encode_request(schema)
invocation = Invocation(request)

def continuation(future):
future.result()
self._compact_serializer.register_sent_schema(schema, clazz)
self._compact_serializer.register_schema_to_type(schema, clazz)
return func(*args, **kwargs)

self._invocation_service.invoke(invocation)
return invocation.future.continue_with(continuation)

def send_all_schemas(self) -> Future:
schemas = self._compact_serializer.get_sent_schemas()
schemas = self._compact_serializer.get_schemas()
if not schemas:
_logger.debug("There is no schema to send to the cluster.")
return ImmediateFuture(None)
Expand All @@ -67,4 +80,4 @@ def register_fetched_schema(self, schema_id: int, schema: typing.Optional[Schema
f"The schema with the id {schema_id} can not be found in the cluster."
)

self._compact_serializer.register_fetched_schema(schema)
self._compact_serializer.register_schema_to_id(schema)
8 changes: 5 additions & 3 deletions hazelcast/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def handle_client_message(self, message: InboundMessage, correlation_id: int):
try:
handler(message)
except SchemaNotFoundError as e:
self._fetch_schema_and_handle_again(e, handler, message, correlation_id)
self._fetch_schema_and_handle_again(e, handler, message)
else:
_logger.debug("Got event message with unknown correlation id: %s", message)

Expand All @@ -156,7 +156,6 @@ def _fetch_schema_and_handle_again(
error: SchemaNotFoundError,
handler: typing.Callable[[InboundMessage], None],
message: InboundMessage,
correlation_id: int,
) -> None:
schema_id = error.schema_id

Expand All @@ -171,7 +170,10 @@ def callback(future):
return

message.reset_next_frame()
self.handle_client_message(message, correlation_id)
try:
handler(message)
except SchemaNotFoundError as e:
self._fetch_schema_and_handle_again(e, handler, message)

fetch_schema_future = self._compact_schema_service.fetch_schema(schema_id)
fetch_schema_future.add_done_callback(callback)
Expand Down
13 changes: 10 additions & 3 deletions hazelcast/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from hazelcast.errors import ClientOfflineError
from hazelcast.hash import hash_to_index
from hazelcast.serialization.compact import SchemaNotReplicatedError

_logger = logging.getLogger(__name__)

Expand All @@ -27,11 +28,12 @@ class PartitionService:
owner or the partition id of a key.
"""

__slots__ = ("_service", "_serialization_service")
__slots__ = ("_service", "_serialization_service", "_send_schema_and_retry_fn")

def __init__(self, internal_partition_service, serialization_service):
def __init__(self, internal_partition_service, serialization_service, send_schema_and_retry_fn):
self._service = internal_partition_service
self._serialization_service = serialization_service
self._send_schema_and_retry_fn = send_schema_and_retry_fn

def get_partition_owner(self, partition_id: int) -> typing.Optional[uuid.UUID]:
"""
Expand All @@ -55,7 +57,12 @@ def get_partition_id(self, key: typing.Any) -> int:
Returns:
The partition id.
"""
key_data = self._serialization_service.to_data(key)
try:
key_data = self._serialization_service.to_data(key)
except SchemaNotReplicatedError as e:
self._send_schema_and_retry_fn(e, lambda: None).result()
return self.get_partition_id(key)

return self._service.get_partition_id(key_data)

def get_partition_count(self) -> int:
Expand Down
128 changes: 36 additions & 92 deletions hazelcast/proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import uuid

from hazelcast.core import MemberInfo
from hazelcast.future import Future
from hazelcast.types import KeyType, ValueType, ItemType, MessageType, BlockingProxyType
from hazelcast.invocation import Invocation
from hazelcast.partition import string_partition_strategy
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.types import KeyType, ValueType, ItemType, MessageType, BlockingProxyType
from hazelcast.util import get_attr_name

MAX_SIZE = float("inf")
Expand All @@ -33,20 +31,7 @@ def __init__(self, service_name: str, name: str, context):
self._register_listener = listener_service.register_listener
self._deregister_listener = listener_service.deregister_listener
self._is_smart = context.config.smart_routing
self._send_compact_schema = context.compact_schema_service.send_schema

def _send_schema_and_retry(
self,
error: SchemaNotReplicatedError,
func: typing.Callable[..., Future],
*args: typing.Any,
**kwargs: typing.Any
) -> Future:
def continuation(future):
future.result()
return func(*args, **kwargs)

return self._send_compact_schema(error.schema, error.clazz).continue_with(continuation)
self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry

def destroy(self) -> bool:
"""Destroys this proxy.
Expand Down Expand Up @@ -121,6 +106,10 @@ def __init__(self, name, transaction, context):
serialization_service = context.serialization_service
self._to_object = serialization_service.to_object
self._to_data = serialization_service.to_data
self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry

def _send_schema(self, error):
return self._send_schema_and_retry(error, lambda: None).result()

def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(
Expand Down Expand Up @@ -206,21 +195,16 @@ class ItemEvent(typing.Generic[ItemType]):
Attributes:
name: Name of the proxy that fired the event.
item: The item related to the event.
event_type: Type of the event.
member: Member that fired the event.
"""

def __init__(self, name: str, item_data, event_type: int, member: MemberInfo, to_object):
def __init__(self, name: str, item: ItemEventType, event_type: int, member: MemberInfo):
self.name = name
self._item_data = item_data
self.item = item
self.event_type = event_type
self.member = member
self._to_object = to_object

@property
def item(self) -> ItemType:
"""The item related to the event."""
return self._to_object(self._item_data)


class EntryEvent(typing.Generic[KeyType, ValueType]):
Expand All @@ -230,48 +214,30 @@ class EntryEvent(typing.Generic[KeyType, ValueType]):
event_type: Type of the event.
uuid: UUID of the member that fired the event.
number_of_affected_entries: Number of affected entries by this event.
key: The key of this entry event.
value: The value of the entry event.
old_value: The old value of the entry event.
merging_value: The incoming merging value of the entry event.
"""

def __init__(
self,
to_object,
key,
value,
old_value,
merging_value,
key: KeyType,
value: ValueType,
old_value: ValueType,
merging_value: ValueType,
event_type: int,
member_uuid: uuid.UUID,
number_of_affected_entries: int,
):
self._to_object = to_object
self._key_data = key
self._value_data = value
self._old_value_data = old_value
self._merging_value_data = merging_value
self.key = key
self.value = value
self.old_value = old_value
self.merging_value = merging_value
self.event_type = event_type
self.uuid = member_uuid
self.number_of_affected_entries = number_of_affected_entries

@property
def key(self) -> KeyType:
"""The key of this entry event."""
return self._to_object(self._key_data)

@property
def old_value(self) -> ValueType:
"""The old value of the entry event."""
return self._to_object(self._old_value_data)

@property
def value(self) -> ValueType:
"""The value of the entry event."""
return self._to_object(self._value_data)

@property
def merging_value(self) -> ValueType:
"""The incoming merging value of the entry event."""
return self._to_object(self._merging_value_data)

def __repr__(self):
return (
"EntryEvent(key=%s, value=%s, old_value=%s, merging_value=%s, event_type=%s, uuid=%s, "
Expand All @@ -288,45 +254,23 @@ def __repr__(self):
)


_SENTINEL = object()
class TopicMessage(typing.Generic[MessageType]):
"""Topic message.
Attributes:
name: Name of the proxy that fired the event.
message: The message sent to Topic.
publish_time: UNIX time that the event is published as seconds.
member: Member that fired the event.
"""

__slots__ = ("name", "message", "publish_time", "member")

class TopicMessage(typing.Generic[MessageType]):
"""Topic message."""

__slots__ = ("_name", "_message_data", "_message", "_publish_time", "_member", "_to_object")

def __init__(self, name, message_data, publish_time, member, to_object):
self._name = name
self._message_data = message_data
self._message = _SENTINEL
self._publish_time = publish_time
self._member = member
self._to_object = to_object

@property
def name(self) -> str:
"""Name of the proxy that fired the event."""
return self._name

@property
def publish_time(self) -> int:
"""UNIX time that the event is published as seconds."""
return self._publish_time

@property
def member(self) -> MemberInfo:
"""Member that fired the event."""
return self._member

@property
def message(self) -> MessageType:
"""The message sent to Topic."""
if self._message is not _SENTINEL:
return self._message

self._message = self._to_object(self._message_data)
return self._message
def __init__(self, name: str, message: MessageType, publish_time: int, member: MemberInfo):
self.name = name
self.message = message
self.publish_time = publish_time
self.member = member

def __repr__(self):
return "TopicMessage(message=%s, publish_time=%s, topic_name=%s, publishing_member=%s)" % (
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/proxy/cp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def _no_op_response_handler(_):

class BaseCPProxy(typing.Generic[BlockingProxyType], abc.ABC):
def __init__(self, context, group_id, service_name, proxy_name, object_name):
self._context = context
self._group_id = group_id
self._service_name = service_name
self._proxy_name = proxy_name
Expand All @@ -23,6 +22,7 @@ def __init__(self, context, group_id, service_name, proxy_name, object_name):
serialization_service = context.serialization_service
self._to_data = serialization_service.to_data
self._to_object = serialization_service.to_object
self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry

def destroy(self) -> Future[None]:
"""Destroys this proxy."""
Expand Down

0 comments on commit c844437

Please sign in to comment.