IMap.putAll() performance improvements #8023

Merged
merged 3 commits into from May 23, 2016

Projects

None yet

4 participants

@Donnerbart
Contributor
Donnerbart commented Apr 26, 2016 edited
  • Cleanup of MapEntries.
  • Grouped putAll() calls per member.
  • Added batch support.

Please see the TDD for details and benchmarks.

@Donnerbart Donnerbart self-assigned this Apr 26, 2016
@Donnerbart Donnerbart added this to the 3.7 milestone Apr 26, 2016
@Donnerbart Donnerbart changed the title from [DONT MERGE] Cleanup of MapEntries. Grouped putAll() calls per member. Added batch support. to IMap.putAll() performance improvements May 10, 2016
@pveentjer pveentjer commented on an outdated diff May 19, 2016
...cast/map/impl/operation/PutAllPerMemberOperation.java
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+import com.hazelcast.spi.NodeEngine;
+import com.hazelcast.spi.Operation;
+import com.hazelcast.spi.OperationAccessor;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Inserts the {@link MapEntries} for all partitions of a member via locally invoked {@link PutAllOperation}.
+ * <p/>
+ * Used to reduce the number of remote invocations of an {@link com.hazelcast.core.IMap#putAll(Map)} call.
+ */
+public class PutAllPerMemberOperation extends MapOperation implements DataSerializable {
@pveentjer
pveentjer May 19, 2016 Member

I would make this operation IdentifiedDataSerializable. All operations are automatically DataSerializable; so it isn't needed to repeat it.

@pveentjer pveentjer commented on an outdated diff May 19, 2016
...cast/map/impl/operation/PutAllPerMemberOperation.java
+ public PutAllPerMemberOperation(String name, int[] partitions, MapEntries[] mapEntries) {
+ super(name);
+ this.partitions = partitions;
+ this.mapEntries = mapEntries;
+ }
+
+ @Override
+ public void run() throws Exception {
+ NodeEngine nodeEngine = getNodeEngine();
+ Future[] futures = new Future[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ Operation op = new PutAllOperation(name, mapEntries[i]);
+ op.setNodeEngine(nodeEngine)
+ .setPartitionId(partitions[i])
+ .setReplicaIndex(getReplicaIndex())
+ .setServiceName(getServiceName())
@pveentjer
pveentjer May 19, 2016 Member

Map operations have service name set. So no need to set it (call is ignored anyway).

@pveentjer pveentjer commented on an outdated diff May 19, 2016
...cast/map/impl/operation/PutAllPerMemberOperation.java
+ @Override
+ public void run() throws Exception {
+ NodeEngine nodeEngine = getNodeEngine();
+ Future[] futures = new Future[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ Operation op = new PutAllOperation(name, mapEntries[i]);
+ op.setNodeEngine(nodeEngine)
+ .setPartitionId(partitions[i])
+ .setReplicaIndex(getReplicaIndex())
+ .setServiceName(getServiceName())
+ .setService(getService())
+ .setCallerUuid(getCallerUuid());
+ OperationAccessor.setCallerAddress(op, getCallerAddress());
+ futures[i] = nodeEngine.getOperationService().invokeOnPartition(op);
+ }
+ for (Future future : futures) {
@pveentjer
pveentjer May 19, 2016 Member

Can you add line of comment that eventually this method needs to become non blocking. Currently the generic thread is blocked till all putAll completes.

@pveentjer pveentjer commented on the diff May 19, 2016
...cast/map/impl/operation/PutAllPerMemberOperation.java
+ .setService(getService())
+ .setCallerUuid(getCallerUuid());
+ OperationAccessor.setCallerAddress(op, getCallerAddress());
+ futures[i] = nodeEngine.getOperationService().invokeOnPartition(op);
+ }
+ for (Future future : futures) {
+ future.get();
+ }
+ }
+
+ @Override
+ public void afterRun() throws Exception {
+ }
+
+ @Override
+ public Object getResponse() {
@pveentjer
pveentjer May 19, 2016 Member

I would return null since you don't care about the value.

@Donnerbart
Donnerbart May 19, 2016 Contributor

Changed, but comment won't go away since method line was noted.

@pveentjer pveentjer and 1 other commented on an outdated diff May 19, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
protected static final String NULL_VALUE_IS_NOT_ALLOWED = "Null value is not allowed!";
protected static final String NULL_PREDICATE_IS_NOT_ALLOWED = "Predicate should not be null!";
protected static final String NULL_LISTENER_IS_NOT_ALLOWED = "Null listener is not allowed!";
private static final int CHECK_IF_LOADED_TIMEOUT_SECONDS = 60;
+ /**
+ * Defines the batch size for operations of {@link IMap#putAll(Map)} calls.
+ *
+ * A value of {@code 0} disables the batching and will send a single operation per member with all map entries.
+ *
+ * If you set this value too high, you may ran into OOME or blocked network pipelines due to huge operations.
+ * If you set this value too low, you will lower the performance of the putAll() operation.
+ */
+ private static final HazelcastProperty MAP_PUT_ALL_BATCH_SIZE
+ = new HazelcastProperty("hazelcast.map.put.all.batch.size", 0);
@pveentjer
pveentjer May 19, 2016 Member

would it not make sense to have some other default than 0? Now a user could run into a fat packet and he probably would not quickly figure out what is happening.

@Donnerbart
Donnerbart May 19, 2016 Contributor

This will most likely kill the performance (see the TDD).

@pveentjer pveentjer and 1 other commented on an outdated diff May 19, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
+ * The default value of {@code 0} uses an educated guess, depending on the map size, which is a good overall strategy.
+ * If you insert entries which don't match a normal partition distribution you should configure this factor.
+ * The initial size is calculated by this formula:
+ * {@code initialSize = ceil(MAP_PUT_ALL_INITIAL_SIZE_FACTOR * map.size() / PARTITION_COUNT)}
+ *
+ * As a rule of thumb you can try the following values:
+ * <ul>
+ * <li>{@code 10} for map sizes about 100 entries</li>
+ * <li>{@code 5} for map sizes between 500 and 5000 entries</li>
+ * <li>{@code 1.5} for map sizes between about 50000 entries</li>
+ * </ul>
+ *
+ * If you set this value too high, you will waste memory.
+ * If you set this value too low, you will suffer from expensive {@link java.util.Arrays#copyOf} calls.
+ */
+ private static final HazelcastProperty MAP_PUT_ALL_INITIAL_SIZE_FACTOR
@pveentjer
pveentjer May 19, 2016 Member

what does 0 mean? How big is the initial array? Im also confused abou the values. Is this a float or int?

@pveentjer
pveentjer May 19, 2016 Member

These properties are a bit in a hidden location. Normally user modifiable properties are part of GroupProperties (even though the class is internal). These will be included in the documentation; however your properties are not. This could be good; exposing more than strictly needs to be could lead to problems.

@Donnerbart
Donnerbart May 19, 2016 Contributor

I did this on purpose, as mentioned in the TDD. I want to be sure that those properties make sense before we add them "forever" to the canon (and have to provide legacy support for them). Maybe I'll add the @Beta annotation to them as well to underline the intent.

@Donnerbart
Donnerbart May 19, 2016 edited Contributor

The default value of {@code 0} uses an educated guess...

Doesn't this explain what 0 means?

The value is float, I made this more clear by changing the "rule of thumb" examples to floats. The constructor uses an int, because we don't have a float constructor.

@Donnerbart
Donnerbart May 19, 2016 Contributor

I just added the @Beta annotation to the properties. I also added more JavaDoc:

{@link IMap#putAll(Map)} splits up the entries of the user input map per partition,
to eventually send the entries the correct target nodes.
So the method creates multiple arrays with map entries per partition.
This value determines how the initial size of these arrays is calculated.

@pveentjer pveentjer commented on an outdated diff May 19, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
+ * If you set this value too low, you will lower the performance of the putAll() operation.
+ */
+ private static final HazelcastProperty MAP_PUT_ALL_BATCH_SIZE
+ = new HazelcastProperty("hazelcast.map.put.all.batch.size", 0);
+
+ /**
+ * Defines the initial size of arrays per partition for {@link IMap#putAll(Map)} calls.
+ *
+ * The default value of {@code 0} uses an educated guess, depending on the map size, which is a good overall strategy.
+ * If you insert entries which don't match a normal partition distribution you should configure this factor.
+ * The initial size is calculated by this formula:
+ * {@code initialSize = ceil(MAP_PUT_ALL_INITIAL_SIZE_FACTOR * map.size() / PARTITION_COUNT)}
+ *
+ * As a rule of thumb you can try the following values:
+ * <ul>
+ * <li>{@code 10} for map sizes about 100 entries</li>
@pveentjer pveentjer commented on an outdated diff May 19, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
+ private static final HazelcastProperty MAP_PUT_ALL_BATCH_SIZE
+ = new HazelcastProperty("hazelcast.map.put.all.batch.size", 0);
+
+ /**
+ * Defines the initial size of arrays per partition for {@link IMap#putAll(Map)} calls.
+ *
+ * The default value of {@code 0} uses an educated guess, depending on the map size, which is a good overall strategy.
+ * If you insert entries which don't match a normal partition distribution you should configure this factor.
+ * The initial size is calculated by this formula:
+ * {@code initialSize = ceil(MAP_PUT_ALL_INITIAL_SIZE_FACTOR * map.size() / PARTITION_COUNT)}
+ *
+ * As a rule of thumb you can try the following values:
+ * <ul>
+ * <li>{@code 10} for map sizes about 100 entries</li>
+ * <li>{@code 5} for map sizes between 500 and 5000 entries</li>
+ * <li>{@code 1.5} for map sizes between about 50000 entries</li>
@pveentjer
pveentjer May 19, 2016 Member

And what about huge maps?

@pveentjer pveentjer commented on an outdated diff May 19, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
try {
- List<Future> futures = new ArrayList<Future>(partitionCount);
- MapEntries[] entriesPerPartition = new MapEntries[partitionCount];
+ int mapSize = map.size();
+ if (mapSize == 0) {
+ return;
+ }
+
+ boolean useBatching = getPutAllUseBatching(mapSize);
@pveentjer
pveentjer May 19, 2016 Member

is instead of get makes more sense.

@pveentjer pveentjer commented on the diff May 19, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
+ private boolean getPutAllUseBatching(int mapSize) {
+ // we check if the feature is enabled and if the map size is bigger than a single batch per member
+ return (putAllBatchSize > 0 && mapSize > putAllBatchSize * getNodeEngine().getClusterService().getSize());
+ }
+
+ @SuppressWarnings("checkstyle:magicnumber")
+ private int getPutAllInitialSize(boolean useBatching, int mapSize, int partitionCount) {
+ if (mapSize == 1) {
+ return 1;
+ }
+ if (useBatching) {
+ return putAllBatchSize;
+ }
+ if (putAllInitialSizeFactor < 1) {
+ // this is an educated guess for the initial size of the entries per partition, depending on the map size
+ return (int) ceil(20f * mapSize / partitionCount / log10(mapSize));
@pveentjer
pveentjer May 19, 2016 Member

should 20f not be a constant?

@Donnerbart
Donnerbart May 19, 2016 Contributor

Since the value is so arbitrary I thought the formula would give it more context than a name and ripping it out of the formula.

@Donnerbart
Donnerbart May 19, 2016 edited Contributor

I elaborated the formula a bit more in the TDD. Maybe a small summary in the code won't hurt either, when the TDD is approved.

@pveentjer
Member

👍

@ahmetmircik
Member

needs rebase

@ahmetmircik
Member

👍

@Donnerbart Donnerbart merged commit bc69d2a into hazelcast:master May 23, 2016

1 check passed

default Build finished. 13777 tests run, 176 skipped, 0 failed.
Details
@Donnerbart Donnerbart deleted the Donnerbart:putAllPerMemberWithBatching branch May 23, 2016
@ahmetmircik ahmetmircik commented on the diff May 26, 2016
...ava/com/hazelcast/map/impl/proxy/MapProxySupport.java
final long time = System.currentTimeMillis();
- InternalCompletableFuture<Object> future = operationService.invokeOnPartition(SERVICE_NAME, op, partitionId);
+ InternalCompletableFuture<Object> future = operationService.invokeOnTarget(SERVICE_NAME, op, address);
@ahmetmircik
ahmetmircik May 26, 2016 Member

Can it be more appropriate to use a logic similar to invokeOnAllPartitions here because it internally relies on InvokeOnPartitions and it has retry mechanism in case of failures ? Btw, I wrote a small test here and it fails on master and passes on maintenance.

@Donnerbart
Donnerbart May 30, 2016 Contributor

Will work on this issue in this PR: #8275

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment