Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions hazelcast/serialization/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def skip_bytes(self, count):

def read_boolean(self):
"""Reads 1 byte from input stream and convert it to a bool value.

Returns:
bool: The bool value read.
"""
Expand All @@ -252,23 +252,23 @@ def read_byte(self):

def read_unsigned_byte(self):
"""Reads 1 byte from input stream, zero-extends it and returns.

Returns:
int: The unsigned byte value read.
"""
raise NotImplementedError()

def read_short(self):
"""Reads 2 bytes from input stream and returns a short value.

Returns:
int: The short value read.
"""
raise NotImplementedError()

def read_unsigned_short(self):
"""Reads 2 bytes from input stream and returns an int value.

Returns:
int: The unsigned short value read.
"""
Expand All @@ -284,119 +284,119 @@ def read_char(self):

def read_int(self):
"""Reads 4 bytes from input stream and returns an int value.

Returns:
int: The int value read.
"""
raise NotImplementedError()

def read_long(self):
"""Reads 8 bytes from input stream and returns a long value.

Returns:
int: The int value read.
"""
raise NotImplementedError()

def read_float(self):
"""Reads 4 bytes from input stream and returns a float value.

Returns:
float: The float value read.
"""
raise NotImplementedError()

def read_double(self):
"""Reads 8 bytes from input stream and returns a double value.

Returns:
float: The double value read.
"""
raise NotImplementedError()

def read_utf(self):
"""Reads a UTF-8 string from input stream and returns it.

Returns:
str: The UTF-8 string read.
"""
raise NotImplementedError()

def read_byte_array(self):
"""Reads a byte array from input stream and returns it.

Returns:
bytearray: The byte array read.
"""
raise NotImplementedError()

def read_boolean_array(self):
"""Reads a bool array from input stream and returns it.

Returns:
list[bool]: The bool array read.
"""
raise NotImplementedError()

def read_char_array(self):
"""Reads a char array from input stream and returns it.

Returns:
list[str]: The char array read.
"""
raise NotImplementedError()

def read_int_array(self):
"""Reads a int array from input stream and returns it.

Returns:
list[int]: The int array read.
"""
raise NotImplementedError()

def read_long_array(self):
"""Reads a long array from input stream and returns it.

Returns:
list[int]: The long array read.
"""
raise NotImplementedError()

def read_double_array(self):
"""Reads a double array from input stream and returns it.

Returns:
list[float]: The double array read.
"""
raise NotImplementedError()

def read_float_array(self):
"""Reads a float array from input stream and returns it.

Returns:
list[float]: The float array read.
"""
raise NotImplementedError()

def read_short_array(self):
"""Reads a short array from input stream and returns it.

Returns:
list[int]: The short array read.
"""
raise NotImplementedError()

def read_utf_array(self):
"""Reads a UTF-8 string array from input stream and returns it.

Returns:
list[str]: The UTF-8 string array read.
"""
raise NotImplementedError()

def read_object(self):
"""Reads a object from input stream and returns it.

Returns:
The object read.
"""
Expand All @@ -412,15 +412,15 @@ def get_byte_order(self):

def position(self):
"""Returns current position in buffer.

Returns:
int: Current position in buffer.
"""
raise NotImplementedError()

def size(self):
"""Returns size of buffer.

Returns:
int: size of buffer.
"""
Expand All @@ -440,31 +440,39 @@ def write_data(self, object_data_output):
Args:
object_data_output (hazelcast.serialization.api.ObjectDataOutput): The output.
"""
raise NotImplementedError("read_data must be implemented to serialize this IdentifiedDataSerializable")
raise NotImplementedError(
"read_data must be implemented to serialize this IdentifiedDataSerializable"
)

def read_data(self, object_data_input):
"""Reads fields from the input stream.

Args:
object_data_input (hazelcast.serialization.api.ObjectDataInput): The input.
"""
raise NotImplementedError("read_data must be implemented to deserialize this IdentifiedDataSerializable")
raise NotImplementedError(
"read_data must be implemented to deserialize this IdentifiedDataSerializable"
)

def get_factory_id(self):
"""Returns DataSerializableFactory factory id for this class.

Returns:
int: The factory id.
"""
raise NotImplementedError("This method must return the factory ID for this IdentifiedDataSerializable")
raise NotImplementedError(
"This method must return the factory ID for this IdentifiedDataSerializable"
)

def get_class_id(self):
"""Returns type identifier for this class. Id should be unique per DataSerializableFactory.

Returns:
int: The type id.
"""
raise NotImplementedError("This method must return the class ID for this IdentifiedDataSerializable")
raise NotImplementedError(
"This method must return the class ID for this IdentifiedDataSerializable"
)


class Portable(object):
Expand Down Expand Up @@ -496,15 +504,15 @@ def read_portable(self, reader):

def get_factory_id(self):
"""Returns PortableFactory id for this portable class

Returns:
int: The factory id.
"""
raise NotImplementedError()

def get_class_id(self):
"""Returns class identifier for this portable class. Class id should be unique per PortableFactory.

Returns:
int: The class id.
"""
Expand Down Expand Up @@ -557,7 +565,7 @@ class PortableReader(object):

def get_version(self):
"""Returns the global version of portable classes.

Returns:
int: Global version of portable classes.
"""
Expand All @@ -576,7 +584,7 @@ def has_field(self, field_name):

def get_field_names(self):
"""Returns the set of field names on this portable class.

Returns:
set: Set of field names on this portable class.
"""
Expand Down Expand Up @@ -827,7 +835,7 @@ def read_portable_array(self, field_name):
def get_raw_data_input(self):
"""After reading portable fields, one can read remaining fields in old fashioned way
consecutively from the end of stream. After get_raw_data_input() called, no data can be read.

Returns:
hazelcast.serialization.api.ObjectDataInput: The input.
"""
Expand Down Expand Up @@ -1032,9 +1040,8 @@ def write_portable_array(self, field_name, values):
def get_raw_data_output(self):
"""After writing portable fields, one can write remaining fields in old fashioned way
consecutively at the end of stream. After get_raw_data_output() called, no data can be written.

Returns:
hazelcast.serialization.api.ObjectDataOutput: The output.
"""
raise NotImplementedError()

42 changes: 31 additions & 11 deletions hazelcast/serialization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def index_for_default_type(type_id):


class BaseSerializationService(object):
def __init__(self, version, global_partition_strategy, output_buffer_size, is_big_endian, int_type):
def __init__(
self, version, global_partition_strategy, output_buffer_size, is_big_endian, int_type
):
self._registry = SerializerRegistry(int_type)
self._version = version
self._global_partition_strategy = global_partition_strategy
Expand Down Expand Up @@ -116,7 +118,9 @@ def to_object(self, data):

def write_object(self, out, obj):
if isinstance(obj, Data):
raise HazelcastSerializationError("Cannot write a Data instance! Use write_data(out, data) instead.")
raise HazelcastSerializationError(
"Cannot write a Data instance! Use write_data(out, data) instead."
)
try:
serializer = self._registry.serializer_for(obj)
out.write_int(serializer.get_type_id())
Expand All @@ -130,7 +134,9 @@ def read_object(self, inp):
serializer = self._registry.serializer_by_type_id(type_id)
if serializer is None:
if self._active:
raise HazelcastSerializationError("Missing Serializer for type-id: %s" % type_id)
raise HazelcastSerializationError(
"Missing Serializer for type-id: %s" % type_id
)
else:
raise HazelcastInstanceNotActiveError()
return serializer.read(inp)
Expand All @@ -139,11 +145,17 @@ def read_object(self, inp):

def _calculate_partitioning_hash(self, obj, partitioning_strategy):
partitioning_hash = 0
_ps = partitioning_strategy if partitioning_strategy is not None else self._global_partition_strategy
_ps = (
partitioning_strategy
if partitioning_strategy is not None
else self._global_partition_strategy
)
pk = _ps(obj)
if pk is not None and pk is not obj:
partitioning_key = self.to_data(pk, empty_partitioning_strategy)
partitioning_hash = 0 if partitioning_key is None else partitioning_key.get_partition_hash()
partitioning_hash = (
0 if partitioning_key is None else partitioning_key.get_partition_hash()
)
return partitioning_hash

def _create_data_output(self):
Expand Down Expand Up @@ -231,7 +243,9 @@ def serializer_for(self, obj):
serializer = self.lookup_python_serializer(obj_type)

if serializer is None:
raise HazelcastSerializationError("There is no suitable serializer for:" + str(obj_type))
raise HazelcastSerializationError(
"There is no suitable serializer for:" + str(obj_type)
)
return serializer

def lookup_default_serializer(self, obj_type, obj):
Expand Down Expand Up @@ -286,7 +300,9 @@ def lookup_global_serializer(self, obj_type):

def register_constant_serializer(self, serializer, object_type=None):
stream_serializer = serializer
self._constant_type_ids[index_for_default_type(stream_serializer.get_type_id())] = stream_serializer
self._constant_type_ids[
index_for_default_type(stream_serializer.get_type_id())
] = stream_serializer
if object_type is not None:
self._constant_type_dict[object_type] = stream_serializer

Expand All @@ -297,15 +313,19 @@ def safe_register_serializer(self, stream_serializer, obj_type=None):
raise ValueError("[%s] serializer cannot be overridden!" % obj_type)
current = self._type_dict.get(obj_type, None)
if current is not None and current.__class__ != stream_serializer.__class__:
raise ValueError("Serializer[%s] has been already registered for type: %s"
% (current.__class__, obj_type))
raise ValueError(
"Serializer[%s] has been already registered for type: %s"
% (current.__class__, obj_type)
)
else:
self._type_dict[obj_type] = stream_serializer
serializer_type_id = stream_serializer.get_type_id()
current = self._id_dic.get(serializer_type_id, None)
if current is not None and current.__class__ != stream_serializer.__class__:
raise ValueError("Serializer[%s] has been already registered for type-id: %s"
% (current.__class__, serializer_type_id))
raise ValueError(
"Serializer[%s] has been already registered for type-id: %s"
% (current.__class__, serializer_type_id)
)
else:
self._id_dic[serializer_type_id] = stream_serializer
return current is None
Expand Down
Loading