Skip to content

Commit

Permalink
Introduce various updates for the Compact serialization [API-1564] (#610
Browse files Browse the repository at this point in the history
)

* Introduce various updates for the Compact serialization

This PR introduces couple of updates to the implementation

- Schema now holds a regular dict, instead of OrderedDict for
faster lookups and simpler implementation
- Prevent duplicate field names by adding a check to the
schema writer
- Disallowing compact serializers to override builtin
serializers

* address review comments
  • Loading branch information
mdumandag committed Mar 9, 2023
1 parent 244084b commit 622990e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 22 deletions.
35 changes: 18 additions & 17 deletions hazelcast/serialization/compact.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import abc
import collections
import datetime
import decimal
import typing
Expand Down Expand Up @@ -1560,10 +1559,10 @@ def _raise_on_none_value_in_fix_sized_item_array(field: "FieldDescriptor") -> ty
class SchemaWriter(CompactWriter):
def __init__(self, type_name: str):
self._type_name = type_name
self._fields: typing.List[FieldDescriptor] = []
self._fields: typing.Dict[str, FieldDescriptor] = {}

def build(self) -> "Schema":
return Schema(self._type_name, self._fields)
return Schema(self._type_name, list(self._fields.values()))

def write_boolean(self, field_name: str, value: bool) -> None:
self._add_field(field_name, FieldKind.BOOLEAN)
Expand Down Expand Up @@ -1740,16 +1739,20 @@ def write_array_of_compact(
self._add_field(field_name, FieldKind.ARRAY_OF_COMPACT)

def _add_field(self, name: str, kind: "FieldKind"):
self._fields.append(FieldDescriptor(name, kind))
if name in self._fields:
raise HazelcastSerializationError(f"Field with the name '{name}' already exists")

self._fields[name] = FieldDescriptor(name, kind)


class Schema:
def __init__(self, type_name: str, fields_list: typing.List["FieldDescriptor"]):
self.type_name = type_name
self.fields: typing.Dict[str, "FieldDescriptor"] = Schema._dict_to_key_ordered_dict(
{f.name: f for f in fields_list}
)
self.fields_list = list(self.fields.values())
self.fields: typing.Dict[str, "FieldDescriptor"] = {f.name: f for f in fields_list}
# Sort the fields by the field name so that the field offsets/indexes
# can be set correctly.
fields_list.sort(key=lambda f: f.name)
self.fields_list = fields_list
self.schema_id: int = 0
self.var_sized_field_count: int = 0
self.fix_sized_fields_length: int = 0
Expand All @@ -1769,6 +1772,11 @@ def _init(self):
else:
fix_sized_fields.append(field)

# Fix sized fields should be in descending order of size in bytes.
# For ties, the alphabetical order(ascending) of the field name will
# be used. Since, `fields_list` is sorted at this point, and the `sort`
# method is stable, only sorting by the size in bytes is enough for
# this invariant to hold.
fix_sized_fields.sort(
key=lambda f: FIELD_OPERATIONS[f.kind].size_in_bytes(),
reverse=True,
Expand All @@ -1792,6 +1800,8 @@ def _init(self):

self.fix_sized_fields_length = position

# Variable sized fields should be in ascending alphabetical ordering
# of the field names
index = 0
for field in var_sized_fields:
field.index = index
Expand All @@ -1800,15 +1810,6 @@ def _init(self):
self.var_sized_field_count = index
self.schema_id = RabinFingerprint.of(self)

@staticmethod
def _dict_to_key_ordered_dict(
d: typing.Dict[str, "FieldDescriptor"]
) -> typing.Dict[str, "FieldDescriptor"]:
od = collections.OrderedDict()
for key in sorted(d):
od[key] = d[key]
return od

def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, Schema)
Expand Down
27 changes: 24 additions & 3 deletions hazelcast/serialization/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import typing

from hazelcast.config import IntType, Config
from hazelcast.errors import HazelcastInstanceNotActiveError
from hazelcast.errors import HazelcastInstanceNotActiveError, IllegalArgumentError
from hazelcast.serialization.api import IdentifiedDataSerializable, Portable
from hazelcast.serialization.compact import (
SchemaNotFoundError,
Expand Down Expand Up @@ -111,6 +111,11 @@ def __init__(
for _type, custom_serializer in config.custom_serializers.items():
self._registry.safe_register_serializer(custom_serializer(), _type)

# Called here so that we can make sure that we are not overriding
# any of the default serializers registered above with the Compact
# serialization.
self._registry.validate()

def to_data(self, obj, partitioning_strategy=None):
"""Serialize the input object into byte array representation
Expand Down Expand Up @@ -241,12 +246,12 @@ def _register_constant_serializers(self):
self._registry.register_constant_serializer(BooleanSerializer(), bool)
self._registry.register_constant_serializer(CharSerializer())
self._registry.register_constant_serializer(ShortSerializer())
self._registry.register_constant_serializer(IntegerSerializer())
self._registry.register_constant_serializer(IntegerSerializer(), int)
self._registry.register_constant_serializer(LongSerializer())
self._registry.register_constant_serializer(FloatSerializer())
self._registry.register_constant_serializer(DoubleSerializer(), float)
self._registry.register_constant_serializer(UuidSerializer(), uuid.UUID)
self._registry.register_constant_serializer(StringSerializer())
self._registry.register_constant_serializer(StringSerializer(), str)
# Arrays of primitives and String
self._registry.register_constant_serializer(ByteArraySerializer(), bytearray)
self._registry.register_constant_serializer(BooleanArraySerializer())
Expand Down Expand Up @@ -496,6 +501,22 @@ def register_from_super_type(self, obj_type, super_type) -> typing.Optional[Stre
self.safe_register_serializer(serializer, obj_type)
return serializer

def validate(self):
"""
Makes sure that the classes registered as Compact serializable are not
overriding the default serializers.
Must be called in the constructor of the serialization service after it
completes registering default serializers.
"""
for compact_type in self._compact_types:
if compact_type in self._constant_type_dict:
raise IllegalArgumentError(
f"Compact serializer for the class {compact_type}' can not be "
f"registered as it overrides the default serializer for that "
f"class provided by Hazelcast."
)

def destroy(self):
for serializer in list(self._type_dict.values()):
serializer.destroy()
Expand Down
56 changes: 54 additions & 2 deletions tests/unit/serialization/compact_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from parameterized import parameterized

from hazelcast.config import Config
from hazelcast.errors import HazelcastSerializationError
from hazelcast.errors import HazelcastSerializationError, IllegalArgumentError
from hazelcast.serialization import SerializationServiceV1
from hazelcast.serialization.api import CompactSerializer, CompactReader, CompactWriter, FieldKind
from hazelcast.serialization.compact import (
Expand Down Expand Up @@ -198,6 +198,12 @@ def test_schema_writer(self):
for name, kind in fields:
self.assertEqual(kind, schema.fields.get(name).kind)

def test_schema_writer_with_duplicate_field_names(self):
writer = SchemaWriter("foo")
writer.write_int32("bar", 42)
with self.assertRaisesRegex(HazelcastSerializationError, "already exists"):
writer.write_string("bar", "42")


class Child:
def __init__(self, name: str):
Expand All @@ -214,7 +220,7 @@ def read(self, reader: CompactReader):
name = reader.read_string("name")
return Child(name)

def write(self, writer: CompactWriter, obj: Parent):
def write(self, writer: CompactWriter, obj: Child):
writer.write_string("name", obj.name)

def get_type_name(self):
Expand Down Expand Up @@ -259,3 +265,49 @@ def test_missing_serializer(self):
):
obj = Parent(Child("test"))
self._serialize(service, obj)


class CompactSerializationTest(unittest.TestCase):
def test_overriding_default_serializers(self):
config = Config()
config.compact_serializers = [StringCompactSerializer()]

with self.assertRaisesRegex(IllegalArgumentError, "can not be registered as it overrides"):
SerializationServiceV1(config)

def test_serializer_with_duplicate_field_names(self):
config = Config()
config.compact_serializers = [SerializerWithDuplicateFieldsNames()]

service = SerializationServiceV1(config)
with self.assertRaisesRegex(HazelcastSerializationError, "already exists"):
service.to_data(Child("foo"))


class StringCompactSerializer(CompactSerializer[str]):
def read(self, reader: CompactReader) -> str:
pass

def write(self, writer: CompactWriter, obj: str) -> None:
pass

def get_class(self) -> typing.Type[str]:
return str

def get_type_name(self) -> str:
return "str"


class SerializerWithDuplicateFieldsNames(CompactSerializer[Child]):
def read(self, reader: CompactReader) -> Child:
pass

def write(self, writer: CompactWriter, obj: Child) -> None:
writer.write_string("name", obj.name)
writer.write_string("name", obj.name)

def get_class(self) -> typing.Type[Child]:
return Child

def get_type_name(self) -> str:
return "child"

0 comments on commit 622990e

Please sign in to comment.