From 5cc0906838f4916060149a62b51d6904f63a11a7 Mon Sep 17 00:00:00 2001 From: Dmitry Lukjanenko Date: Wed, 28 Feb 2018 03:14:36 +0300 Subject: [PATCH] IGNITE-5357 replicated cache reads load balancing was added --- .../dht/GridPartitionedSingleGetFuture.java | 61 +++++++----- ...cheAbstractReplicatedAffinityNodeTest.java | 99 +++++++++++++++++++ ...GridCacheReplicatedGetPrimaryNodeTest.java | 27 +++++ ...GridCacheReplicatedGetSameMacNodeTest.java | 39 ++++++++ 4 files changed, 204 insertions(+), 22 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractReplicatedAffinityNodeTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetPrimaryNodeTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetSameMacNodeTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index adb634a79f0e9..b0eef028a2253 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -17,31 +17,18 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.EntryGetResult; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; -import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure; -import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; @@ -58,6 +45,12 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; +import org.jsr166.ThreadLocalRandom8; + +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; @@ -705,17 +698,41 @@ private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopol * @param affNodes All affinity nodes. * @return Affinity node to get key from. */ - @Nullable private ClusterNode affinityNode(List affNodes) { - if (!canRemap) { - for (ClusterNode node : affNodes) { - if (cctx.discovery().alive(node)) + @Nullable private ClusterNode affinityNode(final List affNodes) { + if (forcePrimary) { + return affNodes.get(0); + } + + final String mac = cctx.localNode().attribute(IgniteNodeAttributes.ATTR_MACS); + assert mac != null; + + int lastMatch = -1; + ClusterNode affinityNode = null; + final int nodesSize = affNodes.size(); + final int randNodeIndex = ThreadLocalRandom8.current().nextInt(nodesSize); + + for (int i = 0; i < nodesSize; i++) { + final ClusterNode node = affNodes.get(i); + + if (canRemap || cctx.discovery().alive(node)) { + // Prefer collocated node + if (mac.equals(node.attribute(IgniteNodeAttributes.ATTR_MACS))) { return node; + } + + if (i == randNodeIndex) { + affinityNode = node; + } + + lastMatch = i; } + } - return null; + if (affinityNode == null && lastMatch != -1) { + return affNodes.get(lastMatch); } - else - return affNodes.get(0); + + return affinityNode; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractReplicatedAffinityNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractReplicatedAffinityNodeTest.java new file mode 100644 index 0000000000000..86fdd50264377 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractReplicatedAffinityNodeTest.java @@ -0,0 +1,99 @@ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +/** + * Abstract test to check for new + * {@link org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture#affinityNode(java.util.List) affinityNode} + * method for replicated and partitioned caches + */ +public abstract class GridCacheAbstractReplicatedAffinityNodeTest extends GridCommonAbstractTest { + private static final int KEY = 1; + private static final int NODE_COUNT = 4; + + @Override + protected void beforeTestsStarted() throws Exception { + startGrids(NODE_COUNT); + } + + @Override + protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + @Override + protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration configuration = super.getConfiguration(igniteInstanceName); + configuration.setCommunicationSpi(new CheckNodeCommunicationSpi()); + + if (igniteInstanceName.equals(getTestIgniteInstanceName(NODE_COUNT - 1))) { + configuration.setClientMode(true); + } else { + CacheConfiguration cacheConfiguration = new CacheConfiguration(DEFAULT_CACHE_NAME); + cacheConfiguration.setCacheMode(CacheMode.REPLICATED); + cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheConfiguration.setReadFromBackup(readFromBackup()); + + configuration.setCacheConfiguration(cacheConfiguration); + } + + return configuration; + } + + protected int getKey() { + return KEY; + } + + protected Ignite getClientNode() { + return ignite(NODE_COUNT - 1); + } + + protected CheckNodeCommunicationSpi communication(Ignite ignite) { + return (CheckNodeCommunicationSpi) ignite.configuration().getCommunicationSpi(); + } + + protected abstract boolean readFromBackup(); + + /** + * Custom TcpCommunicationSpi to check for expected node + */ + protected static class CheckNodeCommunicationSpi extends TcpCommunicationSpi { + /** */ + private volatile ClusterNode expectedNode; + + /** + * @param expectedNode expected node + */ + public void setExpectedNode(final ClusterNode expectedNode) { + this.expectedNode = expectedNode; + } + + @Override + public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + final Message realMessage = ((GridIoMessage) msg).message(); + if (realMessage instanceof GridNearSingleGetRequest) { + Assert.assertEquals(expectedNode, node); + } + } + + super.sendMessage(node, msg, ackC); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetPrimaryNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetPrimaryNodeTest.java new file mode 100644 index 0000000000000..a26348861bc14 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetPrimaryNodeTest.java @@ -0,0 +1,27 @@ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; + +/** + * + */ +public class GridCacheReplicatedGetPrimaryNodeTest extends GridCacheAbstractReplicatedAffinityNodeTest { + @Override + protected boolean readFromBackup() { + return false; + } + + public void testGetPrimaryNode() { + final Ignite ignite = getClientNode(); + final Affinity affinity = ignite.affinity(DEFAULT_CACHE_NAME); + final IgniteCache cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME); + + cache.put(getKey(), getKey()); + final ClusterNode primaryNode = affinity.mapKeyToNode(getKey()); + communication(ignite).setExpectedNode(primaryNode); + cache.get(getKey()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetSameMacNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetSameMacNodeTest.java new file mode 100644 index 0000000000000..1b0598749ecf8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheReplicatedGetSameMacNodeTest.java @@ -0,0 +1,39 @@ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class GridCacheReplicatedGetSameMacNodeTest extends GridCacheAbstractReplicatedAffinityNodeTest { + private static final String CLIENT_MAC = "CLIENT_MAC"; + + @Override + protected boolean readFromBackup() { + return true; + } + + public void testGetSameMacNode() { + final Map userAttributes = new HashMap<>(); + userAttributes.put(IgniteNodeAttributes.ATTR_MACS, CLIENT_MAC); + + final Ignite ignite = getClientNode(); + final ClusterNode clientNode = ignite.cluster().forClients().node(); + ((TcpDiscoveryNode)clientNode).setAttributes(userAttributes); + ClusterNode randomServerNode = ignite.cluster().forServers().forRandom().node(); + ((TcpDiscoveryNode)randomServerNode).setAttributes(userAttributes); + + final IgniteCache cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME); + + cache.put(getKey(), getKey()); + communication(ignite).setExpectedNode(randomServerNode); + cache.get(getKey()); + } +}