Skip to content

Commit

Permalink
Implement putAllWithMetadata [HZ-2583] [5.3.z] (#25144)
Browse files Browse the repository at this point in the history
The PR adds a new codec/message task and internal API in ClientMapProxy
and reuses existing MergeOperation.
Using the same operation as during split-brain/WAN replication makes it
easy to explain the behavior.

Client protocol PR:
hazelcast/hazelcast-client-protocol#468

We exceptionally add a new API in patch to support the data migration
tool on 5.3.z as the target cluster.

Breaking changes (list specific methods/types/messages):
- New API addition in a patch version . The new API is internal though

Co-authored-by: Serkan Özel <serkan.ozel@hazelcast.com>
  • Loading branch information
frant-hartm and srknzl committed Aug 9, 2023
1 parent fc868ee commit 6dfbead
Show file tree
Hide file tree
Showing 14 changed files with 31,382 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@
import com.hazelcast.client.impl.protocol.codec.MapProjectCodec;
import com.hazelcast.client.impl.protocol.codec.MapProjectWithPredicateCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutAllCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutAllWithMetadataCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentWithMaxIdleCodec;
Expand Down Expand Up @@ -624,6 +625,7 @@
import com.hazelcast.client.impl.protocol.task.map.MapProjectionWithPredicateMessageTask;
import com.hazelcast.client.impl.protocol.task.map.MapPublisherCreateMessageTask;
import com.hazelcast.client.impl.protocol.task.map.MapPublisherCreateWithValueMessageTask;
import com.hazelcast.client.impl.protocol.task.map.MapPutAllWithMetadataMessageTask;
import com.hazelcast.client.impl.protocol.task.map.MapPutAllMessageTask;
import com.hazelcast.client.impl.protocol.task.map.MapPutIfAbsentMessageTask;
import com.hazelcast.client.impl.protocol.task.map.MapPutIfAbsentWithMaxIdleMessageTask;
Expand Down Expand Up @@ -1488,6 +1490,8 @@ private void initializeMapTaskFactories() {
(cm, con) -> new MapPutIfAbsentWithMaxIdleMessageTask(cm, node, con));
factories.put(MapPutTransientWithMaxIdleCodec.REQUEST_MESSAGE_TYPE,
(cm, con) -> new MapPutTransientWithMaxIdleMessageTask(cm, node, con));
factories.put(MapPutAllWithMetadataCodec.REQUEST_MESSAGE_TYPE,
(cm, con) -> new MapPutAllWithMetadataMessageTask(cm, node, con));
}

private void initializeGeneralTaskFactories() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.impl.protocol.codec;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
import com.hazelcast.client.impl.protocol.codec.builtin.*;
import com.hazelcast.client.impl.protocol.codec.custom.*;

import javax.annotation.Nullable;

import static com.hazelcast.client.impl.protocol.ClientMessage.*;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;

/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
* To change this file, edit the templates or the protocol
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* and regenerate it.
*/

/**
* Copies all of the mappings from the specified entry list to this map, including metadata.
* The implementation uses MergeOperation with PassThroughMergePolicy, so the effect of
* this call is equivalent to synchronizing given entries using WAN replication.
* Please note that all the keys in the request should belong to the partition id to which this request is being sent, all keys
* matching to a different partition id shall be ignored. The API implementation using this request may need to send multiple
* of these request messages for filling a request for a key set if the keys belong to different partitions.
*/
@Generated("a8d1d03ec3de96de55d76858ff81db9a")
public final class MapPutAllWithMetadataCodec {
//hex: 0x014900
public static final int REQUEST_MESSAGE_TYPE = 84224;
//hex: 0x014901
public static final int RESPONSE_MESSAGE_TYPE = 84225;
private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;

private MapPutAllWithMetadataCodec() {
}

@edu.umd.cs.findbugs.annotations.SuppressFBWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"})
public static class RequestParameters {

/**
* name of map
*/
public java.lang.String name;

/**
* entries with metadata
*/
public java.util.List<com.hazelcast.map.impl.SimpleEntryView<com.hazelcast.internal.serialization.Data, com.hazelcast.internal.serialization.Data>> entries;
}

public static ClientMessage encodeRequest(java.lang.String name, java.util.Collection<com.hazelcast.map.impl.SimpleEntryView<com.hazelcast.internal.serialization.Data, com.hazelcast.internal.serialization.Data>> entries) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setContainsSerializedDataInRequest(true);
clientMessage.setRetryable(false);
clientMessage.setOperationName("Map.PutAllWithMetadata");
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
clientMessage.add(initialFrame);
StringCodec.encode(clientMessage, name);
ListMultiFrameCodec.encode(clientMessage, entries, SimpleEntryViewCodec::encode);
return clientMessage;
}

public static MapPutAllWithMetadataCodec.RequestParameters decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
RequestParameters request = new RequestParameters();
//empty initial frame
iterator.next();
request.name = StringCodec.decode(iterator);
request.entries = ListMultiFrameCodec.decode(iterator, SimpleEntryViewCodec::decode);
return request;
}

public static ClientMessage encodeResponse() {
ClientMessage clientMessage = ClientMessage.createForEncode();
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
clientMessage.add(initialFrame);

return clientMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.impl.protocol.task.map;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.MapPutAllWithMetadataCodec;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.util.Timer;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.SimpleEntryView;
import com.hazelcast.map.impl.operation.MergeOperation;
import com.hazelcast.security.permission.ActionConstants;
import com.hazelcast.security.permission.MapPermission;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.merge.PassThroughMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.MapMergeTypes;

import java.security.Permission;
import java.util.ArrayList;
import java.util.List;

import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;

public class MapPutAllWithMetadataMessageTask
extends AbstractMapPartitionMessageTask<MapPutAllWithMetadataCodec.RequestParameters> {

private volatile long startTimeNanos;

public MapPutAllWithMetadataMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
super(clientMessage, node, connection);
}

@Override
protected Operation prepareOperation() {
SerializationService ss = nodeEngine.getSerializationService();

List<MapMergeTypes<Object, Object>> mergingEntries = new ArrayList<>();
for (SimpleEntryView<Data, Data> entry : parameters.entries) {
mergingEntries.add(createMergingEntry(ss, entry));
}

SplitBrainMergePolicy mergePolicy = nodeEngine.getSplitBrainMergePolicyProvider()
.getMergePolicy(PassThroughMergePolicy.class.getName());

return new MergeOperation(parameters.name, mergingEntries, mergePolicy, true);
}

@Override
protected MapPutAllWithMetadataCodec.RequestParameters decodeClientMessage(ClientMessage clientMessage) {
return MapPutAllWithMetadataCodec.decodeRequest(clientMessage);
}

@Override
protected ClientMessage encodeResponse(Object response) {
return MapPutAllWithMetadataCodec.encodeResponse();
}

@Override
public String getServiceName() {
return MapService.SERVICE_NAME;
}

@Override
protected void beforeProcess() {
startTimeNanos = Timer.nanos();
}

@Override
protected Object processResponseBeforeSending(Object response) {
MapService mapService = getService(MapService.SERVICE_NAME);
MapContainer mapContainer = mapService.getMapServiceContext().getMapContainer(parameters.name);
if (mapContainer.getMapConfig().isStatisticsEnabled()) {
mapService.getMapServiceContext().getLocalMapStatsProvider().getLocalMapStatsImpl(parameters.name)
.incrementPutLatencyNanos(parameters.entries.size(),
Timer.nanosElapsed(startTimeNanos));
}
return response;
}

@Override
public Permission getRequiredPermission() {
return new MapPermission(parameters.name, ActionConstants.ACTION_PUT);
}

@Override
public String getDistributedObjectName() {
return parameters.name;
}

@Override
public String getMethodName() {
return "putAllWithMetadata";
}

@Override
public Object[] getParameters() {
List<SimpleEntryView<Data, Data>> entries = new ArrayList<>(parameters.entries);
return new Object[]{entries};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.hazelcast.client.impl.protocol.codec.MapProjectCodec;
import com.hazelcast.client.impl.protocol.codec.MapProjectWithPredicateCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutAllCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutAllWithMetadataCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentCodec;
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentWithMaxIdleCodec;
Expand Down Expand Up @@ -160,6 +161,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -185,6 +187,7 @@
import static java.lang.Thread.currentThread;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.groupingBy;

/**
* Proxy implementation of {@link IMap}.
Expand Down Expand Up @@ -1670,6 +1673,76 @@ private void putAllInternal(@Nonnull Map<? extends K, ? extends V> map,
protected void finalizePutAll(Map<? extends K, ? extends V> map, Map<Integer, List<Entry<Data, Data>>> entryMap) {
}

public CompletableFuture<Void> putAllWithMetadataAsync(@Nonnull Collection<? extends EntryView<K, V>> entries) {
checkNotNull(entries, "Null argument entries is not allowed");
ClientPartitionService partitionService = getContext().getPartitionService();

Map<Integer, List<SimpleEntryView<Data, Data>>> entriesByPartition =
entries.stream()
.map(e -> {
checkNotNull(e.getKey(), NULL_KEY_IS_NOT_ALLOWED);
checkNotNull(e.getValue(), NULL_VALUE_IS_NOT_ALLOWED);

Data keyData = toData(e.getKey());
if (e instanceof SimpleEntryView
&& e.getKey() instanceof Data
&& e.getValue() instanceof Data
) {
return (SimpleEntryView<Data, Data>) e;
} else {
return new SimpleEntryView<>(keyData, toData(e.getValue()))
.withCost(e.getCost())
.withCreationTime(e.getCreationTime())
.withExpirationTime(e.getExpirationTime())
.withHits(e.getHits())
.withLastAccessTime(e.getLastAccessTime())
.withLastStoredTime(e.getLastStoredTime())
.withLastUpdateTime(e.getLastUpdateTime())
.withVersion(e.getVersion())
.withTtl(e.getTtl())
.withMaxIdle(e.getMaxIdle());
}
})
.collect(groupingBy(
(SimpleEntryView<Data, Data> e) -> partitionService.getPartitionId(e.getKey())
));

AtomicInteger counter = new AtomicInteger(entriesByPartition.size());
InternalCompletableFuture<Void> resultFuture = new InternalCompletableFuture<>();
if (counter.get() == 0) {
resultFuture.complete(null);
}
for (Entry<Integer, ? extends List<SimpleEntryView<Data, Data>>> entry : entriesByPartition.entrySet()) {
Integer partitionId = entry.getKey();
ClientMessage request = MapPutAllWithMetadataCodec.encodeRequest(name, entry.getValue());
ClientInvocationFuture future = new ClientInvocation(getClient(), request, getName(), partitionId)
.invoke();

future.whenCompleteAsync((clientMessage, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
return;
}
if (counter.decrementAndGet() == 0) {
finalizePutAll(
entries,
entriesByPartition
);
if (!resultFuture.isDone()) {
resultFuture.complete(null);
}
}
}, ConcurrencyUtil.getDefaultAsyncExecutor());
}

return resultFuture;
}

protected void finalizePutAll(
Collection<? extends EntryView<K, V>> entries, Map<Integer,
List<SimpleEntryView<Data, Data>>> entryMap) {
}

@Override
public void clear() {
ClientMessage request = MapClearCodec.encodeRequest(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.hazelcast.client.impl.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.impl.spi.impl.ListenerMessageCodec;
import com.hazelcast.config.NearCacheConfig;
import com.hazelcast.core.EntryView;
import com.hazelcast.internal.adapter.IMapDataStructureAdapter;
import com.hazelcast.internal.monitor.impl.LocalMapStatsImpl;
import com.hazelcast.internal.nearcache.NearCache;
Expand All @@ -39,6 +40,7 @@
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.LocalMapStats;
import com.hazelcast.map.impl.SimpleEntryView;
import com.hazelcast.query.Predicate;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.core.ReadOnly;
Expand Down Expand Up @@ -560,6 +562,24 @@ protected void finalizePutAll(Map<? extends K, ? extends V> map, Map<Integer, Li
}
}

@Override
protected void finalizePutAll(
Collection<? extends EntryView<K, V>> entries,
Map<Integer, List<SimpleEntryView<Data, Data>>> entriesByPartition
) {
if (serializeKeys) {
for (List<SimpleEntryView<Data, Data>> partitionEntries : entriesByPartition.values()) {
for (EntryView<Data, Data> entry : partitionEntries) {
invalidateNearCache(entry.getKey());
}
}
} else {
for (EntryView<K, V> entry : entries) {
invalidateNearCache(entry.getKey());
}
}
}

@Override
public void clear() {
nearCache.clear();
Expand Down

0 comments on commit 6dfbead

Please sign in to comment.