Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, config=None):
self.partition_service = PartitionService(self)
self.proxy = ProxyManager(self)
self.load_balancer = RandomLoadBalancer(self.cluster)
self.serialization_service = SerializationServiceV1(serialization_config=self.config.serialization_config)
self.serialization_service = SerializationServiceV1(serialization_config=self.config.serialization_config, properties=self.properties)
self.transaction_manager = TransactionManager(self)
self.lock_reference_id_generator = AtomicInteger(1)
self.near_cache_manager = NearCacheManager(self)
Expand Down
6 changes: 6 additions & 0 deletions hazelcast/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,12 @@ class ClientProperties(object):
Period in seconds to collect statistics.
"""

SERIALIZATION_INPUT_RETURNS_BYTEARRAY = ClientProperty("hazelcast.serialization.input.returns.bytearray", False)
"""
Input#read_byte_array returns a List if property is False, otherwise it will return a byte-array.
Changing this to True, gives a considerable performance benefit.
"""

def __init__(self, properties):
self._properties = properties

Expand Down
8 changes: 6 additions & 2 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _loop(self):
Future._threading_locals.is_reactor_thread = True
while self._is_live:
try:
asyncore.loop(count=1, timeout=0.1, map=self._map)
asyncore.loop(count=1, timeout=0.01, map=self._map)
self._check_timers()
except select.error as err:
# TODO: parse error type to catch only error "9"
Expand Down Expand Up @@ -117,6 +117,7 @@ def _cleanup_all_timers(self):

class AsyncoreConnection(Connection, asyncore.dispatcher):
sent_protocol_bytes = False
read_buffer_size = BUFFER_SIZE

def __init__(self, map, address, connect_timeout, socket_options, connection_closed_callback,
message_callback, network_config, logger_extras=None):
Expand All @@ -135,6 +136,9 @@ def __init__(self, map, address, connect_timeout, socket_options, connection_clo
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE)

for socket_option in socket_options:
if socket_option.option is socket.SO_RCVBUF:
self.read_buffer_size = socket_option.value

self.socket.setsockopt(socket_option.level, socket_option.option, socket_option.value)

self.connect(self._address)
Expand Down Expand Up @@ -187,7 +191,7 @@ def handle_connect(self):
self.logger.debug("Connected to %s", self._address, extra=self._logger_extras)

def handle_read(self):
self._read_buffer.extend(self.recv(BUFFER_SIZE))
self._read_buffer.extend(self.recv(self.read_buffer_size))
self.last_read_in_seconds = time.time()
self.receive_message()

Expand Down
9 changes: 7 additions & 2 deletions hazelcast/serialization/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from hazelcast.serialization.data import Data
from hazelcast import six
from hazelcast.six.moves import range

from hazelcast.config import ClientProperties


class _ObjectDataInput(ObjectDataInput):
Expand All @@ -15,6 +15,7 @@ def __init__(self, buff, offset=0, serialization_service=None, is_big_endian=Tru
self._is_big_endian = is_big_endian
self._pos = offset
self._size = len(buff)
self._respect_bytearrays = False if serialization_service is None else serialization_service.properties.get_bool(ClientProperties.SERIALIZATION_INPUT_RETURNS_BYTEARRAY)
# Local cache struct formats according to endianness
self._FMT_INT8 = FMT_BE_INT8 if self._is_big_endian else FMT_LE_INT8
self._FMT_UINT8 = FMT_BE_UINT8 if self._is_big_endian else FMT_LE_UINT8
Expand Down Expand Up @@ -111,7 +112,11 @@ def read_byte_array(self):
result = bytearray(length)
if length > 0:
self.read_into(result, 0, length)
return [x for x in result]

if self._respect_bytearrays:
return result

return list(result)

def read_boolean_array(self):
return self._read_array_fnc(self.read_boolean)
Expand Down
6 changes: 5 additions & 1 deletion hazelcast/serialization/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from hazelcast.serialization.portable.serializer import PortableSerializer
from hazelcast.serialization.serializer import *
from hazelcast import six
from hazelcast.config import ClientProperties

DEFAULT_OUT_BUFFER_SIZE = 4 * 1024

Expand All @@ -16,7 +17,8 @@ def default_partition_strategy(key):


class SerializationServiceV1(BaseSerializationService):
def __init__(self, serialization_config, version=1, global_partition_strategy=default_partition_strategy,

def __init__(self, serialization_config, properties=ClientProperties({}), version=1, global_partition_strategy=default_partition_strategy,
output_buffer_size=DEFAULT_OUT_BUFFER_SIZE):
super(SerializationServiceV1, self).__init__(version, global_partition_strategy, output_buffer_size,
serialization_config.is_big_endian,
Expand All @@ -40,6 +42,8 @@ def __init__(self, serialization_config, version=1, global_partition_strategy=de
if global_serializer:
self._registry._global_serializer = global_serializer()

self.properties = properties

def _register_constant_serializers(self):
self._registry.register_constant_serializer(self._registry._null_serializer, type(None))
self._registry.register_constant_serializer(self._registry._data_serializer)
Expand Down
22 changes: 22 additions & 0 deletions tests/serialization/identified_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hazelcast
from hazelcast.serialization import SerializationServiceV1
from hazelcast.serialization.api import IdentifiedDataSerializable
from hazelcast.config import ClientProperties

FACTORY_ID = 1

Expand Down Expand Up @@ -108,6 +109,11 @@ def create_identified():
['a', 'b', 'c'], [1, 2, 3], [4, 2, 3], [11, 2, 3], [1.0, 2.0, 3.0], [11.0, 22.0, 33.0],
"the string text", ["item1", "item2", "item3"])

def create_identified_with_bytearray():
return SerializationV1Identified(99, True, 'c', 11, 1234134, 1341431221, 1.0, 2.0, bytes([1, 2, 3]), [True, False, True],
['a', 'b', 'c'], [1, 2, 3], [4, 2, 3], [11, 2, 3], [1.0, 2.0, 3.0], [11.0, 22.0, 33.0],
"the string text", ["item1", "item2", "item3"])


the_factory = {SerializationV1Identified.CLASS_ID: SerializationV1Identified}

Expand All @@ -123,3 +129,19 @@ def test_encode_decode(self):

obj2 = service.to_object(data)
self.assertTrue(obj == obj2)

def test_encode_decode_respect_bytearray_fields(self):
config = hazelcast.ClientConfig()
config.set_property("hazelcast.serialization.input.returns.bytearray", True)
config.serialization_config.data_serializable_factories[FACTORY_ID] = the_factory
service = SerializationServiceV1(config.serialization_config, properties=ClientProperties(config.get_properties()))
obj = create_identified_with_bytearray()
data = service.to_data(obj)

obj2 = service.to_object(data)
self.assertTrue(obj == obj2)

service = SerializationServiceV1(config.serialization_config)

obj2 = service.to_object(data)
self.assertFalse(obj == obj2)
1 change: 1 addition & 0 deletions tests/serialization/portable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hazelcast.serialization.portable.classdef import ClassDefinitionBuilder
from tests.serialization.identified_test import create_identified, SerializationV1Identified
from hazelcast import six
from hazelcast.config import ClientProperties

if not six.PY2:
long = int
Expand Down