Skip to content

Commit

Permalink
Add put_all() to MultiMap [API-1235] (#600)
Browse files Browse the repository at this point in the history
Added multimap put-all
  • Loading branch information
nevzatseferoglu committed Dec 27, 2022
1 parent 5829049 commit ba61af0
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 1 deletion.
10 changes: 10 additions & 0 deletions hazelcast/protocol/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,16 @@ def decode_nullable(msg, decoder):
return ListMultiFrameCodec.decode(msg, decoder)


class ListDataCodec:
@staticmethod
def encode(buf, arr):
ListMultiFrameCodec.encode(buf, arr, DataCodec.encode)

@staticmethod
def decode(msg):
return ListMultiFrameCodec.decode(msg, DataCodec.decode)


class ListUUIDCodec:
@staticmethod
def encode(buf, arr, is_final=False):
Expand Down
19 changes: 19 additions & 0 deletions hazelcast/protocol/codec/multi_map_put_all_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
from hazelcast.protocol.builtin import StringCodec
from hazelcast.protocol.builtin import EntryListCodec
from hazelcast.protocol.builtin import DataCodec
from hazelcast.protocol.builtin import ListDataCodec

# hex: 0x021700
_REQUEST_MESSAGE_TYPE = 136960
# hex: 0x021701
_RESPONSE_MESSAGE_TYPE = 136961

_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE


def encode_request(name, entries):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
EntryListCodec.encode(buf, entries, DataCodec.encode, ListDataCodec.encode, True)
return OutboundMessage(buf, False)
55 changes: 54 additions & 1 deletion hazelcast/proxy/multi_map.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import typing

from hazelcast.future import Future
from collections import defaultdict

from hazelcast.future import combine_futures, Future, ImmediateFuture
from hazelcast.protocol.codec import (
multi_map_add_entry_listener_codec,
multi_map_add_entry_listener_to_key_codec,
Expand All @@ -15,6 +17,7 @@
multi_map_key_set_codec,
multi_map_lock_codec,
multi_map_put_codec,
multi_map_put_all_codec,
multi_map_remove_codec,
multi_map_remove_entry_codec,
multi_map_remove_entry_listener_codec,
Expand All @@ -26,6 +29,7 @@
)
from hazelcast.proxy.base import Proxy, EntryEvent, EntryEventType
from hazelcast.types import ValueType, KeyType
from hazelcast.serialization.data import Data
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, thread_id, to_millis, ImmutableLazyDataList

Expand Down Expand Up @@ -445,6 +449,50 @@ def put(self, key: KeyType, value: ValueType) -> Future[bool]:
request = multi_map_put_codec.encode_request(self.name, key_data, value_data, thread_id())
return self._invoke_on_key(request, key_data, multi_map_put_codec.decode_response)

def put_all(self, multimap: typing.Dict[KeyType, typing.Sequence[ValueType]]) -> Future[None]:
"""Stores the given Map in the MultiMap.
The results of concurrently mutating the given map are undefined.
No atomicity guarantees are given. It could be that in case of failure some of the key/value-pairs get written, while others are not.
Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.
Args:
multimap: the map corresponds to multimap entries.
"""
check_not_none(multimap, "multimap can't be None")
if not multimap:
return ImmediateFuture(None)

partition_service = self._context.partition_service
partition_map: typing.DefaultDict[
int, typing.List[typing.Tuple[Data, typing.List[Data]]]
] = defaultdict(list)

for key, values in multimap.items():
try:
check_not_none(key, "key can't be None")
check_not_none(values, "values can't be None")
serialized_key = self._to_data(key)
serialized_values = []
for value in values:
check_not_none(value, "value can't be None")
serialized_values.append(self._to_data(value))
partition_id = partition_service.get_partition_id(serialized_key)
partition_map[partition_id].append((serialized_key, serialized_values))
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.put_all, multimap)

futures = []
for partition_id, entry_list in partition_map.items():
request = multi_map_put_all_codec.encode_request(self.name, entry_list)
future = self._invoke_on_partition(request, partition_id)
futures.append(future)
return combine_futures(futures).continue_with(lambda _: None)

def remove_entry_listener(self, registration_id: str) -> Future[bool]:
"""Removes the specified entry listener.
Expand Down Expand Up @@ -677,6 +725,11 @@ def put( # type: ignore[override]
) -> bool:
return self._wrapped.put(key, value).result()

def put_all( # type: ignore[override]
self, multimap: typing.Dict[KeyType, typing.Sequence[ValueType]]
) -> None:
self._wrapped.put_all(multimap).result()

def remove_entry_listener( # type: ignore[override]
self,
registration_id: str,
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/backward_compatible/proxy/multi_map_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hazelcast.proxy.map import EntryEventType
from tests.base import SingleMemberTestCase
from tests.util import random_string, event_collector
from tests.util import skip_if_client_version_older_than, skip_if_server_version_older_than


class MultiMapTest(SingleMemberTestCase):
Expand Down Expand Up @@ -150,6 +151,15 @@ def test_put_get(self):

self.assertCountEqual(self.multi_map.get("key"), ["value1", "value2"])

def test_put_all(self):
skip_if_server_version_older_than(self, self.client, "4.1")
skip_if_client_version_older_than(self, "5.2")
self.multi_map.put_all(
{"key1": ["value1", "value2", "value3"], "key2": ["value4", "value5", "value6"]}
)
self.assertCountEqual(self.multi_map.get("key1"), ["value1", "value2", "value3"])
self.assertCountEqual(self.multi_map.get("key2"), ["value4", "value5", "value6"])

def test_remove(self):
self.multi_map.put("key", "value")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
compare_client_version,
compare_server_version_with_rc,
skip_if_client_version_older_than,
skip_if_server_version_older_than,
)

try:
Expand Down Expand Up @@ -996,6 +997,12 @@ def test_put(self):
self.assertTrue(self.multi_map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE))
self.assertEqual([OUTER_COMPACT_INSTANCE], self.multi_map.get(INNER_COMPACT_INSTANCE))

def test_put_all(self):
skip_if_server_version_older_than(self, self.client, "4.1")
skip_if_client_version_older_than(self, "5.2")
self.multi_map.put_all({INNER_COMPACT_INSTANCE: [OUTER_COMPACT_INSTANCE]})
self.assertCountEqual(self.multi_map.get(INNER_COMPACT_INSTANCE), [OUTER_COMPACT_INSTANCE])

def test_value_count(self):
self.assertEqual(0, self.multi_map.value_count(OUTER_COMPACT_INSTANCE))
self.multi_map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE)
Expand Down

0 comments on commit ba61af0

Please sign in to comment.