Skip to content

Commit

Permalink
Eagerly deserialize responses consisting of list of data [API-1964] (#…
Browse files Browse the repository at this point in the history
…618)

* Eagerly deserialize responses consisting of list of data

We were performing lazy deserialization on APIs that return
a list of data, such  as map#values(). This was not going to work
with Compact, as after returning the list to the user, there might
be Compact serialized data on the list which we don't have the
schema on the client yet. If it was a normal response, the client
would have fetched the schema, but it is not possible with
these lazy APIs. We perform the deserialization while getting
the list items and this is a sync API. We can't perform a blocking
call there, because there is a chance that this will be executed
in the reactor thread, which would result in a deadlock.

So, the only possible way of making these APIs work with Compact
is to convert them to eager deserialization.

This PR removes lazy deserialization in everything apart from SQL,
which will be handled in another PR.

* fix mypy error

* address review comments
  • Loading branch information
mdumandag committed Mar 23, 2023
1 parent 8125a2d commit cd9a317
Show file tree
Hide file tree
Showing 15 changed files with 312 additions and 260 deletions.
27 changes: 0 additions & 27 deletions docs/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,33 +278,6 @@ All of these APIs will work with the Compact serialization format, once it is
promoted to the stable status.
- Reading OBJECT columns of the SQL results
- Listening for :class:`hazelcast.proxy.reliable_topic.ReliableTopic` messages
- :func:`hazelcast.proxy.list.List.iterator`
- :func:`hazelcast.proxy.list.List.list_iterator`
- :func:`hazelcast.proxy.list.List.get_all`
- :func:`hazelcast.proxy.list.List.sub_list`
- :func:`hazelcast.proxy.map.Map.values`
- :func:`hazelcast.proxy.map.Map.entry_set`
- :func:`hazelcast.proxy.map.Map.execute_on_keys`
- :func:`hazelcast.proxy.map.Map.key_set`
- :func:`hazelcast.proxy.map.Map.project`
- :func:`hazelcast.proxy.map.Map.execute_on_entries`
- :func:`hazelcast.proxy.map.Map.get_all`
- :func:`hazelcast.proxy.multi_map.MultiMap.remove_all`
- :func:`hazelcast.proxy.multi_map.MultiMap.key_set`
- :func:`hazelcast.proxy.multi_map.MultiMap.values`
- :func:`hazelcast.proxy.multi_map.MultiMap.entry_set`
- :func:`hazelcast.proxy.multi_map.MultiMap.get`
- :func:`hazelcast.proxy.queue.Queue.iterator`
- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.values`
- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.entry_set`
- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.key_set`
- :func:`hazelcast.proxy.set.Set.get_all`
- :func:`hazelcast.proxy.ringbuffer.Ringbuffer.read_many`
- :func:`hazelcast.proxy.transactional_map.TransactionalMap.values`
- :func:`hazelcast.proxy.transactional_map.TransactionalMap.key_set`
- :func:`hazelcast.proxy.transactional_multi_map.TransactionalMultiMap.get`
- :func:`hazelcast.proxy.transactional_multi_map.TransactionalMultiMap.remove_all`
IdentifiedDataSerializable Serialization
----------------------------------------
Expand Down
20 changes: 9 additions & 11 deletions hazelcast/proxy/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType
from hazelcast.types import ItemType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, ImmutableLazyDataList
from hazelcast.util import check_not_none, deserialize_list_in_place


class List(PartitionSpecificProxy["BlockingList"], typing.Generic[ItemType]):
Expand Down Expand Up @@ -247,9 +247,8 @@ def get_all(self) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
list_get_all_codec.decode_response(message), self._to_object
)
data_list = list_get_all_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_get_all_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand All @@ -263,9 +262,8 @@ def iterator(self) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
list_iterator_codec.decode_response(message), self._to_object
)
data_list = list_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_iterator_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down Expand Up @@ -337,9 +335,8 @@ def list_iterator(self, index: int = 0) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
list_list_iterator_codec.decode_response(message), self._to_object
)
data_list = list_list_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_list_iterator_codec.encode_request(self.name, index)
return self._invoke(request, handler)
Expand Down Expand Up @@ -494,7 +491,8 @@ def sub_list(self, from_index: int, to_index: int) -> Future[typing.List[ItemTyp
"""

def handler(message):
return ImmutableLazyDataList(list_sub_codec.decode_response(message), self._to_object)
data_list = list_sub_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_sub_codec.encode_request(self.name, from_index, to_index)
return self._invoke(request, handler)
Expand Down
72 changes: 32 additions & 40 deletions hazelcast/proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@
check_not_none,
thread_id,
to_millis,
ImmutableLazyDataList,
IterationType,
deserialize_entry_list_in_place,
deserialize_list_in_place,
)


Expand Down Expand Up @@ -563,7 +564,8 @@ def handler(message):
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
return ImmutableLazyDataList(response["response"], self._to_object)
entry_data_list = response["response"]
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_entries_with_paging_predicate_codec.encode_request(self.name, holder)
else:
Expand All @@ -573,17 +575,15 @@ def handler(message):
return self._send_schema_and_retry(e, self.entry_set, predicate)

def handler(message):
return ImmutableLazyDataList(
map_entries_with_predicate_codec.decode_response(message), self._to_object
)
entry_data_list = map_entries_with_predicate_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_entries_with_predicate_codec.encode_request(self.name, predicate_data)
else:

def handler(message):
return ImmutableLazyDataList(
map_entry_set_codec.decode_response(message), self._to_object
)
entry_data_list = map_entry_set_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_entry_set_codec.encode_request(self.name)

Expand Down Expand Up @@ -648,9 +648,8 @@ def execute_on_entries(
)

def handler(message):
return ImmutableLazyDataList(
map_execute_with_predicate_codec.decode_response(message), self._to_object
)
entry_data_list = map_execute_with_predicate_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_execute_with_predicate_codec.encode_request(
self.name, entry_processor_data, predicate_data
Expand All @@ -664,9 +663,8 @@ def handler(message):
)

def handler(message):
return ImmutableLazyDataList(
map_execute_on_all_keys_codec.decode_response(message), self._to_object
)
entry_data_list = map_execute_on_all_keys_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_execute_on_all_keys_codec.encode_request(self.name, entry_processor_data)

Expand Down Expand Up @@ -730,9 +728,8 @@ def execute_on_keys(
return self._send_schema_and_retry(e, self.execute_on_keys, keys, entry_processor)

def handler(message):
return ImmutableLazyDataList(
map_execute_on_keys_codec.decode_response(message), self._to_object
)
entry_data_list = map_execute_on_keys_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_execute_on_keys_codec.encode_request(
self.name, entry_processor_data, key_list
Expand Down Expand Up @@ -943,7 +940,8 @@ def handler(message):
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
return ImmutableLazyDataList(response["response"], self._to_object)
data_list = response["response"]
return deserialize_list_in_place(data_list, self._to_object)

request = map_key_set_with_paging_predicate_codec.encode_request(self.name, holder)
else:
Expand All @@ -953,17 +951,15 @@ def handler(message):
return self._send_schema_and_retry(e, self.key_set, predicate)

def handler(message):
return ImmutableLazyDataList(
map_key_set_with_predicate_codec.decode_response(message), self._to_object
)
data_list = map_key_set_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_key_set_with_predicate_codec.encode_request(self.name, predicate_data)
else:

def handler(message):
return ImmutableLazyDataList(
map_key_set_codec.decode_response(message), self._to_object
)
data_list = map_key_set_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_key_set_codec.encode_request(self.name)

Expand Down Expand Up @@ -1067,9 +1063,8 @@ def project(
return self._send_schema_and_retry(e, self.project, projection, predicate)

def handler(message):
return ImmutableLazyDataList(
map_project_with_predicate_codec.decode_response(message), self._to_object
)
data_list = map_project_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_project_with_predicate_codec.encode_request(
self.name, projection_data, predicate_data
Expand All @@ -1081,9 +1076,8 @@ def handler(message):
return self._send_schema_and_retry(e, self.project, projection, predicate)

def handler(message):
return ImmutableLazyDataList(
map_project_codec.decode_response(message), self._to_object
)
data_list = map_project_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_project_codec.encode_request(self.name, projection_data)

Expand Down Expand Up @@ -1652,7 +1646,8 @@ def handler(message):
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
return ImmutableLazyDataList(response["response"], self._to_object)
data_list = response["response"]
return deserialize_list_in_place(data_list, self._to_object)

request = map_values_with_paging_predicate_codec.encode_request(self.name, holder)
else:
Expand All @@ -1662,17 +1657,15 @@ def handler(message):
return self._send_schema_and_retry(e, self.values, predicate)

def handler(message):
return ImmutableLazyDataList(
map_values_with_predicate_codec.decode_response(message), self._to_object
)
data_list = map_values_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_values_with_predicate_codec.encode_request(self.name, predicate_data)
else:

def handler(message):
return ImmutableLazyDataList(
map_values_codec.decode_response(message), self._to_object
)
data_list = map_values_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_values_codec.encode_request(self.name)

Expand All @@ -1698,9 +1691,8 @@ def _get_all_internal(self, partition_to_keys, futures=None):
futures = []

def handler(message):
return ImmutableLazyDataList(
map_get_all_codec.decode_response(message), self._to_object
)
entry_data_list = map_get_all_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

for partition_id, key_dict in partition_to_keys.items():
request = map_get_all_codec.encode_request(self.name, key_dict.values())
Expand Down
34 changes: 17 additions & 17 deletions hazelcast/proxy/multi_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@
from hazelcast.types import ValueType, KeyType
from hazelcast.serialization.data import Data
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, thread_id, to_millis, ImmutableLazyDataList

from hazelcast.util import (
check_not_none,
thread_id,
to_millis,
deserialize_list_in_place,
deserialize_entry_list_in_place,
)

EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None]

Expand Down Expand Up @@ -213,9 +218,8 @@ def entry_set(self) -> Future[typing.List[typing.Tuple[KeyType, ValueType]]]:
"""

def handler(message):
return ImmutableLazyDataList(
multi_map_entry_set_codec.decode_response(message), self._to_object
)
entry_data_list = multi_map_entry_set_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = multi_map_entry_set_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down Expand Up @@ -246,9 +250,8 @@ def get(self, key: KeyType) -> Future[typing.Optional[typing.List[ValueType]]]:
return self._send_schema_and_retry(e, self.get, key)

def handler(message):
return ImmutableLazyDataList(
multi_map_get_codec.decode_response(message), self._to_object
)
data_list = multi_map_get_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_get_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data, handler)
Expand Down Expand Up @@ -314,9 +317,8 @@ def key_set(self) -> Future[typing.List[KeyType]]:
"""

def handler(message):
return ImmutableLazyDataList(
multi_map_key_set_codec.decode_response(message), self._to_object
)
data_list = multi_map_key_set_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_key_set_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down Expand Up @@ -410,9 +412,8 @@ def remove_all(self, key: KeyType) -> Future[typing.List[ValueType]]:
check_not_none(key, "key can't be None")

def handler(message):
return ImmutableLazyDataList(
multi_map_remove_codec.decode_response(message), self._to_object
)
data_list = multi_map_remove_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

try:
key_data = self._to_data(key)
Expand Down Expand Up @@ -551,9 +552,8 @@ def values(self) -> Future[typing.List[ValueType]]:
"""

def handler(message):
return ImmutableLazyDataList(
multi_map_values_codec.decode_response(message), self._to_object
)
data_list = multi_map_values_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_values_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down
7 changes: 3 additions & 4 deletions hazelcast/proxy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType
from hazelcast.types import ItemType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, to_millis, ImmutableLazyDataList
from hazelcast.util import check_not_none, to_millis, deserialize_list_in_place


class Queue(PartitionSpecificProxy["BlockingQueue"], typing.Generic[ItemType]):
Expand Down Expand Up @@ -202,9 +202,8 @@ def iterator(self) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
queue_iterator_codec.decode_response(message), self._to_object
)
data_list = queue_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = queue_iterator_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/reliable_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _handle_next_batch(self, future):
result = future.result()

# Check if there are any messages lost since the last read
# and whether or not the listener can tolerate that.
# and whether the listener can tolerate that.
lost_count = (result.next_sequence_to_read_from - result.read_count) - self._sequence
if lost_count != 0 and not self._is_loss_tolerable(lost_count):
self.cancel()
Expand Down Expand Up @@ -254,7 +254,7 @@ def _handle_next_batch(self, future):

topic_message = TopicMessage(
self._topic_name,
self._to_object(message.payload),
message.payload,
message.publish_time,
member,
)
Expand Down

0 comments on commit cd9a317

Please sign in to comment.