Skip to content

Commit

Permalink
[API-510] [API-519] Implement compact serialization (#526)
Browse files Browse the repository at this point in the history
* Implement compact serialization

This PR introduces new compact serialization to the Python client.

Users interact with this API through, `compact_serializers` configuration
element, which will be documented along with other aspects of the
implementation in the future PRs.

`compact_serializers` is a dictionary from the type of user class to
`CompactSerializer` associated with it.

Schema registration and fetching mechanisms are implemented with
a minimum overhead to other serialization options.

They follow the structures described as `Controlled Serialization`
and `Eager Deserialization` in the metadata distribution design.

In this PR, only `map#put` and `map#get` APIs support compact
serialization. Other APIs will be added in further PRs to not
complicate this already huge PR.

* address review comments

* address review comments

* pre-allocate lists

* rename read_xxx_or methods to read_xxx_or_default

* address review comments

* remove unnecessary quotes and unused import

* add a note about which thread executes the local schema registrations

* simplify _initialize_on_cluster

* address review comments

* update black version to latest to deal with breaking changes in one of its dependencies
  • Loading branch information
mdumandag committed Mar 29, 2022
1 parent f8f6162 commit d8d416a
Show file tree
Hide file tree
Showing 46 changed files with 7,159 additions and 712 deletions.
12 changes: 6 additions & 6 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
master_doc = "index"

# General information about the project.
project = u"Hazelcast Python Client"
copyright = u"2022, Hazelcast Inc."
author = u"Hazelcast Inc. Developers"
project = "Hazelcast Python Client"
copyright = "2022, Hazelcast Inc."
author = "Hazelcast Inc. Developers"

# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
Expand Down Expand Up @@ -244,7 +244,7 @@
(
master_doc,
"hazelcast-python-client.tex",
u"Hazelcast Python Client Documentation",
"Hazelcast Python Client Documentation",
author,
"manual",
),
Expand Down Expand Up @@ -276,7 +276,7 @@
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, "hazelcast-python-client", u"Hazelcast Python Client Documentation", [author], 1)
(master_doc, "hazelcast-python-client", "Hazelcast Python Client Documentation", [author], 1)
]

# If true, show URL addresses after external links.
Expand All @@ -292,7 +292,7 @@
(
master_doc,
"hazelcast-python-client",
u"Hazelcast Python Client Documentation",
"Hazelcast Python Client Documentation",
author,
"hazelcast-python-client",
"One line description of project.",
Expand Down
26 changes: 23 additions & 3 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import typing

from hazelcast.cluster import ClusterService, _InternalClusterService
from hazelcast.compact import CompactSchemaService
from hazelcast.config import _Config
from hazelcast.connection import ConnectionManager, DefaultAddressProvider
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
from hazelcast.cp import CPSubsystem, ProxySessionManager
from hazelcast.discovery import HazelcastCloudAddressProvider
from hazelcast.errors import IllegalStateError
from hazelcast.future import Future
from hazelcast.invocation import Invocation, InvocationService
from hazelcast.invocation import InvocationService, Invocation
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
from hazelcast.listener import ClusterViewListenerService, ListenerService
from hazelcast.near_cache import NearCacheManager
Expand Down Expand Up @@ -371,6 +372,10 @@ def __init__(self, **kwargs):
self._internal_lifecycle_service = _InternalLifecycleService(config)
self._lifecycle_service = LifecycleService(self._internal_lifecycle_service)
self._invocation_service = InvocationService(self, config, self._reactor)
self._compact_schema_service = CompactSchemaService(
self._serialization_service.compact_stream_serializer,
self._invocation_service,
)
self._address_provider = self._create_address_provider()
self._internal_partition_service = _InternalPartitionService(self)
self._partition_service = PartitionService(
Expand All @@ -388,10 +393,15 @@ def __init__(self, **kwargs):
self._internal_cluster_service,
self._invocation_service,
self._near_cache_manager,
self._send_state_to_cluster,
)
self._load_balancer = self._init_load_balancer(config)
self._listener_service = ListenerService(
self, config, self._connection_manager, self._invocation_service
self,
config,
self._connection_manager,
self._invocation_service,
self._compact_schema_service,
)
self._proxy_manager = ProxyManager(self._context)
self._cp_subsystem = CPSubsystem(self._context)
Expand All @@ -415,7 +425,10 @@ def __init__(self, **kwargs):
)
self._shutdown_lock = threading.RLock()
self._invocation_service.init(
self._internal_partition_service, self._connection_manager, self._listener_service
self._internal_partition_service,
self._connection_manager,
self._listener_service,
self._compact_schema_service,
)
self._internal_sql_service = _InternalSqlService(
self._connection_manager, self._serialization_service, self._invocation_service
Expand All @@ -440,6 +453,7 @@ def _init_context(self):
self._name,
self._proxy_session_manager,
self._reactor,
self._compact_schema_service,
)

def _start(self):
Expand Down Expand Up @@ -772,6 +786,9 @@ def _create_client_name(self, client_id):
return client_name
return "hz.client_%s" % client_id

def _send_state_to_cluster(self) -> Future:
return self._compact_schema_service.send_all_schemas()

@staticmethod
def _get_connection_timeout(config):
timeout = config.connection_timeout
Expand Down Expand Up @@ -806,6 +823,7 @@ def __init__(self):
self.name = None
self.proxy_session_manager = None
self.reactor = None
self.compact_schema_service = None

def init_context(
self,
Expand All @@ -823,6 +841,7 @@ def init_context(
name,
proxy_session_manager,
reactor,
compact_schema_service,
):
self.client = client
self.config = config
Expand All @@ -838,3 +857,4 @@ def init_context(
self.name = name
self.proxy_session_manager = proxy_session_manager
self.reactor = reactor
self.compact_schema_service = compact_schema_service
70 changes: 70 additions & 0 deletions hazelcast/compact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging
import typing

from hazelcast.errors import HazelcastSerializationError
from hazelcast.future import Future, ImmediateFuture
from hazelcast.invocation import InvocationService, Invocation
from hazelcast.protocol.codec import (
client_fetch_schema_codec,
client_send_schema_codec,
client_send_all_schemas_codec,
)
from hazelcast.serialization.compact import CompactStreamSerializer, Schema

_logger = logging.getLogger(__name__)


class CompactSchemaService:
def __init__(
self,
compact_serializer: CompactStreamSerializer,
invocation_service: InvocationService,
):
self._compact_serializer = compact_serializer
self._invocation_service = invocation_service

def fetch_schema(self, schema_id: int) -> Future:
_logger.debug(
"Could not find schema with the id %s locally. It will be fetched from the cluster.",
schema_id,
)

request = client_fetch_schema_codec.encode_request(schema_id)
fetch_schema_invocation = Invocation(
request,
response_handler=client_fetch_schema_codec.decode_response,
)
self._invocation_service.invoke(fetch_schema_invocation)
return fetch_schema_invocation.future

def send_schema(self, schema: Schema, clazz: typing.Type) -> Future:
request = client_send_schema_codec.encode_request(schema)
invocation = Invocation(request)

def continuation(future):
future.result()
self._compact_serializer.register_sent_schema(schema, clazz)

self._invocation_service.invoke(invocation)
return invocation.future.continue_with(continuation)

def send_all_schemas(self) -> Future:
schemas = self._compact_serializer.get_sent_schemas()
if not schemas:
_logger.debug("There is no schema to send to the cluster.")
return ImmediateFuture(None)

_logger.debug("Sending the following schemas to the cluster: %s", schemas)

request = client_send_all_schemas_codec.encode_request(schemas)
invocation = Invocation(request, urgent=True)
self._invocation_service.invoke(invocation)
return invocation.future

def register_fetched_schema(self, schema_id: int, schema: typing.Optional[Schema]) -> None:
if not schema:
raise HazelcastSerializationError(
f"The schema with the id {schema_id} can not be found in the cluster."
)

self._compact_serializer.register_fetched_schema(schema)
28 changes: 27 additions & 1 deletion hazelcast/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import re
import types
import typing

from hazelcast.errors import InvalidConfigurationError
from hazelcast.serialization.api import StreamSerializer, IdentifiedDataSerializable, Portable
from hazelcast.serialization.api import (
StreamSerializer,
IdentifiedDataSerializable,
Portable,
CompactSerializer,
)
from hazelcast.serialization.portable.classdef import ClassDefinition
from hazelcast.security import TokenProvider
from hazelcast.util import (
Expand Down Expand Up @@ -576,6 +582,7 @@ class _Config:
"_creds_password",
"_token_provider",
"_use_public_ip",
"_compact_serializers",
)

def __init__(self):
Expand Down Expand Up @@ -632,6 +639,7 @@ def __init__(self):
self._creds_password = None
self._token_provider = None
self._use_public_ip = False
self._compact_serializers: typing.Dict[typing.Type, CompactSerializer] = {}

@property
def cluster_members(self):
Expand Down Expand Up @@ -1361,6 +1369,24 @@ def use_public_ip(self, value):
else:
raise TypeError("use_public_ip must be a boolean")

@property
def compact_serializers(self) -> typing.Dict[typing.Type, CompactSerializer]:
return self._compact_serializers

@compact_serializers.setter
def compact_serializers(self, value: typing.Dict[typing.Type, CompactSerializer]) -> None:
if isinstance(value, dict):
for clazz, serializer in value.items():
if not isinstance(clazz, type):
raise TypeError("Keys of compact_serializers must be classes")

if not isinstance(serializer, CompactSerializer):
raise TypeError("Values of compact_serializers must be CompactSerializer")

self._compact_serializers = value
else:
raise TypeError("compact_serializers must be a dict")

@classmethod
def from_dict(cls, d):
config = cls()
Expand Down

0 comments on commit d8d416a

Please sign in to comment.