diff --git a/hazelcast/client.py b/hazelcast/client.py index 5cdd8cd172..e9a68e5dd0 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -51,7 +51,7 @@ def __init__(self, config=None): self.partition_service = PartitionService(self) self.proxy = ProxyManager(self) self.load_balancer = RandomLoadBalancer(self.cluster) - self.serialization_service = SerializationServiceV1(serialization_config=self.config.serialization_config) + self.serialization_service = SerializationServiceV1(serialization_config=self.config.serialization_config, properties=self.properties) self.transaction_manager = TransactionManager(self) self.lock_reference_id_generator = AtomicInteger(1) self.near_cache_manager = NearCacheManager(self) diff --git a/hazelcast/config.py b/hazelcast/config.py index 7069504205..96d76877c7 100644 --- a/hazelcast/config.py +++ b/hazelcast/config.py @@ -706,6 +706,12 @@ class ClientProperties(object): Period in seconds to collect statistics. """ + SERIALIZATION_INPUT_RETURNS_BYTEARRAY = ClientProperty("hazelcast.serialization.input.returns.bytearray", False) + """ + Input#read_byte_array returns a List if property is False, otherwise it will return a byte-array. + Changing this to True, gives a considerable performance benefit. + """ + def __init__(self, properties): self._properties = properties diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 7047374303..a3ca825835 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -42,7 +42,7 @@ def _loop(self): Future._threading_locals.is_reactor_thread = True while self._is_live: try: - asyncore.loop(count=1, timeout=0.1, map=self._map) + asyncore.loop(count=1, timeout=0.01, map=self._map) self._check_timers() except select.error as err: # TODO: parse error type to catch only error "9" @@ -117,6 +117,7 @@ def _cleanup_all_timers(self): class AsyncoreConnection(Connection, asyncore.dispatcher): sent_protocol_bytes = False + read_buffer_size = BUFFER_SIZE def __init__(self, map, address, connect_timeout, socket_options, connection_closed_callback, message_callback, network_config, logger_extras=None): @@ -135,6 +136,9 @@ def __init__(self, map, address, connect_timeout, socket_options, connection_clo self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE) for socket_option in socket_options: + if socket_option.option is socket.SO_RCVBUF: + self.read_buffer_size = socket_option.value + self.socket.setsockopt(socket_option.level, socket_option.option, socket_option.value) self.connect(self._address) @@ -187,7 +191,7 @@ def handle_connect(self): self.logger.debug("Connected to %s", self._address, extra=self._logger_extras) def handle_read(self): - self._read_buffer.extend(self.recv(BUFFER_SIZE)) + self._read_buffer.extend(self.recv(self.read_buffer_size)) self.last_read_in_seconds = time.time() self.receive_message() diff --git a/hazelcast/serialization/input.py b/hazelcast/serialization/input.py index 4dec8bc322..7be39c1661 100644 --- a/hazelcast/serialization/input.py +++ b/hazelcast/serialization/input.py @@ -5,7 +5,7 @@ from hazelcast.serialization.data import Data from hazelcast import six from hazelcast.six.moves import range - +from hazelcast.config import ClientProperties class _ObjectDataInput(ObjectDataInput): @@ -15,6 +15,7 @@ def __init__(self, buff, offset=0, serialization_service=None, is_big_endian=Tru self._is_big_endian = is_big_endian self._pos = offset self._size = len(buff) + self._respect_bytearrays = False if serialization_service is None else serialization_service.properties.get_bool(ClientProperties.SERIALIZATION_INPUT_RETURNS_BYTEARRAY) # Local cache struct formats according to endianness self._FMT_INT8 = FMT_BE_INT8 if self._is_big_endian else FMT_LE_INT8 self._FMT_UINT8 = FMT_BE_UINT8 if self._is_big_endian else FMT_LE_UINT8 @@ -111,7 +112,11 @@ def read_byte_array(self): result = bytearray(length) if length > 0: self.read_into(result, 0, length) - return [x for x in result] + + if self._respect_bytearrays: + return result + + return list(result) def read_boolean_array(self): return self._read_array_fnc(self.read_boolean) diff --git a/hazelcast/serialization/service.py b/hazelcast/serialization/service.py index ee14f83c11..1ceda5c251 100644 --- a/hazelcast/serialization/service.py +++ b/hazelcast/serialization/service.py @@ -5,6 +5,7 @@ from hazelcast.serialization.portable.serializer import PortableSerializer from hazelcast.serialization.serializer import * from hazelcast import six +from hazelcast.config import ClientProperties DEFAULT_OUT_BUFFER_SIZE = 4 * 1024 @@ -16,7 +17,8 @@ def default_partition_strategy(key): class SerializationServiceV1(BaseSerializationService): - def __init__(self, serialization_config, version=1, global_partition_strategy=default_partition_strategy, + + def __init__(self, serialization_config, properties=ClientProperties({}), version=1, global_partition_strategy=default_partition_strategy, output_buffer_size=DEFAULT_OUT_BUFFER_SIZE): super(SerializationServiceV1, self).__init__(version, global_partition_strategy, output_buffer_size, serialization_config.is_big_endian, @@ -40,6 +42,8 @@ def __init__(self, serialization_config, version=1, global_partition_strategy=de if global_serializer: self._registry._global_serializer = global_serializer() + self.properties = properties + def _register_constant_serializers(self): self._registry.register_constant_serializer(self._registry._null_serializer, type(None)) self._registry.register_constant_serializer(self._registry._data_serializer) diff --git a/tests/serialization/identified_test.py b/tests/serialization/identified_test.py index 172ea90ed6..eaa13967aa 100644 --- a/tests/serialization/identified_test.py +++ b/tests/serialization/identified_test.py @@ -3,6 +3,7 @@ import hazelcast from hazelcast.serialization import SerializationServiceV1 from hazelcast.serialization.api import IdentifiedDataSerializable +from hazelcast.config import ClientProperties FACTORY_ID = 1 @@ -108,6 +109,11 @@ def create_identified(): ['a', 'b', 'c'], [1, 2, 3], [4, 2, 3], [11, 2, 3], [1.0, 2.0, 3.0], [11.0, 22.0, 33.0], "the string text", ["item1", "item2", "item3"]) +def create_identified_with_bytearray(): + return SerializationV1Identified(99, True, 'c', 11, 1234134, 1341431221, 1.0, 2.0, bytes([1, 2, 3]), [True, False, True], + ['a', 'b', 'c'], [1, 2, 3], [4, 2, 3], [11, 2, 3], [1.0, 2.0, 3.0], [11.0, 22.0, 33.0], + "the string text", ["item1", "item2", "item3"]) + the_factory = {SerializationV1Identified.CLASS_ID: SerializationV1Identified} @@ -123,3 +129,19 @@ def test_encode_decode(self): obj2 = service.to_object(data) self.assertTrue(obj == obj2) + + def test_encode_decode_respect_bytearray_fields(self): + config = hazelcast.ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + config.serialization_config.data_serializable_factories[FACTORY_ID] = the_factory + service = SerializationServiceV1(config.serialization_config, properties=ClientProperties(config.get_properties())) + obj = create_identified_with_bytearray() + data = service.to_data(obj) + + obj2 = service.to_object(data) + self.assertTrue(obj == obj2) + + service = SerializationServiceV1(config.serialization_config) + + obj2 = service.to_object(data) + self.assertFalse(obj == obj2) diff --git a/tests/serialization/portable_test.py b/tests/serialization/portable_test.py index b47310755a..550579bb5b 100644 --- a/tests/serialization/portable_test.py +++ b/tests/serialization/portable_test.py @@ -7,6 +7,7 @@ from hazelcast.serialization.portable.classdef import ClassDefinitionBuilder from tests.serialization.identified_test import create_identified, SerializationV1Identified from hazelcast import six +from hazelcast.config import ClientProperties if not six.PY2: long = int