diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 9465eddfe1084..4ca0f021f9c10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -82,8 +82,8 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.Objects.nonNull; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; @@ -1561,13 +1561,13 @@ private void sendRebalanceFinishedEvent() { * * @return List demanders. * */ - private List demanders(){ + private Set demanders(){ return ctx.cacheContexts().stream() .map(GridCacheContext::preloader) .filter(GridDhtPreloader.class::isInstance) .map(GridDhtPreloader.class::cast) .map(GridDhtPreloader::demander) - .collect(toList()); + .collect(toSet()); } /** @@ -1610,10 +1610,10 @@ private void printRebalanceStatistics() throws IgniteCheckedException { return; } - List demanders = demanders(); + Set demanders = demanders(); - Map> rebFutrs = - demanders.stream().collect(toMap(demander -> demander.grp, demander -> demander.lastStatFutures)); + Map> rebFutrs = demanders.stream() + .collect(toMap(demander -> demander.grp, demander -> demander.lastStatFutures)); try { log.info(rebalanceStatistics(true, rebFutrs)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java index 79704f1da3f23..0523fabfa859a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java @@ -16,16 +16,25 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; +import java.io.ByteArrayOutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.junits.SystemPropertiesRule; import org.apache.ignite.testframework.junits.WithSystemProperty; @@ -34,30 +43,24 @@ import org.junit.Test; import org.junit.rules.TestRule; -import java.io.ByteArrayOutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; - import static java.lang.Integer.parseInt; import static java.util.Objects.nonNull; -import static java.util.function.Function.identity; import static java.util.regex.Pattern.compile; import static java.util.stream.Collectors.toMap; import static java.util.stream.IntStream.range; +import static java.util.stream.IntStream.rangeClosed; import static java.util.stream.Stream.of; import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WRITE_REBALANCE_STATISTICS; import static org.apache.ignite.testframework.GridTestUtils.assertNotContains; +/** + * For testing of rebalance statistics. + */ @WithSystemProperty(key = IGNITE_QUIET, value = "false") @WithSystemProperty(key = IGNITE_WRITE_REBALANCE_STATISTICS, value = "true") @WithSystemProperty(key = IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS, value = "true") -/** For testing of rebalance statistics. */ public class RebalanceStatisticsTest extends GridCommonAbstractTest { /** Class rule. */ @ClassRule public static final TestRule classRule = new SystemPropertiesRule(); @@ -104,6 +107,9 @@ public class RebalanceStatisticsTest extends GridCommonAbstractTest { /** Coordinator. */ private IgniteEx crd; + /** Cache group name. */ + private String grpName; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); @@ -139,15 +145,15 @@ private CacheConfiguration cacheConfiguration(final String cacheName, final int ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg.setAffinity(new RendezvousAffinityFunction(false, parts)); ccfg.setBackups(backups); + ccfg.setGroupName(grpName); return ccfg; } /** - * Test check that not present statistics in log output, if we not set - * system properties {@code IGNITE_QUIET}, + * Test check that not present statistics in log output, if we not set system properties {@code IGNITE_QUIET}, * {@code IGNITE_WRITE_REBALANCE_STATISTICS}. * - * @throws Exception + * @throws Exception if any error occurs. * @see IgniteSystemProperties#IGNITE_QUIET * @see IgniteSystemProperties#IGNITE_WRITE_REBALANCE_STATISTICS */ @@ -155,11 +161,7 @@ private CacheConfiguration cacheConfiguration(final String cacheName, final int @WithSystemProperty(key = IGNITE_QUIET, value = "true") @WithSystemProperty(key = IGNITE_WRITE_REBALANCE_STATISTICS, value = "false") public void testNotPrintStat() throws Exception { - cacheCfgs = defaultCacheConfigurations(10, 0); - - crd = startGrids(DEFAULT_NODE_CNT); - - fillCaches(100); + createCluster(); log.registerListener(pw::write); @@ -173,21 +175,16 @@ public void testNotPrintStat() throws Exception { } /** - * Test check that not present partition distribution in log output, - * if we not set system properties - * {@code IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS}. + * Test check that not present partition distribution in log output, if we not set system properties {@code + * IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS}. * - * @throws Exception + * @throws Exception if any error occurs. * @see IgniteSystemProperties#IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS */ @Test @WithSystemProperty(key = IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS, value = "false") public void testNotPrintPartitionDistribution() throws Exception { - cacheCfgs = defaultCacheConfigurations(10, 0); - - crd = startGrids(DEFAULT_NODE_CNT); - - fillCaches(100); + createCluster(); log.registerListener(pw::write); @@ -197,91 +194,83 @@ public void testNotPrintPartitionDistribution() throws Exception { /** * The test checks the correctness of the output rebalance statistics. * - * @throws Exception - * */ + * @throws Exception if any error occurs. + */ @Test public void testPrintCorrectStatistic() throws Exception { - cacheCfgs = defaultCacheConfigurations(10,2); - - crd = startGrids(DEFAULT_NODE_CNT); + createCluster(); - fillCaches(100); - - List statPerCacheGrps = new ArrayList<>(); - List totalStats = new ArrayList<>(); - - log.registerListener(logStr -> { - if (!logStr.contains(INFORMATION_PER_CACHE_GROUP_TEXT)) - return; - - (logStr.contains(TOTAL_INFORMATION_TEXT) ? totalStats : statPerCacheGrps).add(logStr); - }); - - IgniteEx newNode = startGrid(DEFAULT_NODE_CNT); - - awaitPartitionMapExchange(); - - //+1 - because ignite-sys-cache - assertEquals(cacheCfgs.length + 1, statPerCacheGrps.size()); - assertEquals(1, totalStats.size()); + checkOutputRebalanceStatistics(DEFAULT_NODE_CNT); + } - Map partDistribution = perCacheGroupPartitionDistribution(newNode); + /** + * The test checks the correctness of the statistics output for two cache groups. + * + * @throws Exception if any error occurs. + */ + @Test + public void testPrintCorrectStatisticTwoCacheGroups() throws Exception { + grpName = "Test"; - Map topicStats = perCacheGroupTopicStatistics(totalStats.get(0)).entrySet().stream() - .collect(toMap(Map.Entry::getKey, entry -> sumNum(entry.getValue(), "p=([0-9]+)"))); + createCluster(); - partDistribution.forEach((cacheName, partCnt) -> assertEquals(partCnt, topicStats.get(cacheName))); + checkOutputRebalanceStatistics(DEFAULT_NODE_CNT); } /** - * The test checks the correctness of the output rebalance statistics - * in multi jvm mode. + * The test checks the correctness of the output rebalance statistics in multi jvm mode. * - * @throws Exception - * */ + * @throws Exception if any error occurs. + */ @Test - public void testPrintCorrectStatisticInMultiJvm() throws Exception{ + public void testPrintCorrectStatisticInMultiJvm() throws Exception { multiJvm = true; - cacheCfgs = defaultCacheConfigurations(100,2); + createCluster(); - crd = startGrids(3); + stopGrid(0); - fillCaches(100); + awaitPartitionMapExchange(); - Map partDistribution = perCacheGroupPartitionDistribution(crd); + checkOutputRebalanceStatistics(0); + } - stopGrid(0); + /** + * Creating a cluster and populating caches. + * + * @throws Exception if any error occurs. + * */ + private void createCluster() throws Exception{ + cacheCfgs = defaultCacheConfigurations(10, 2); - awaitPartitionMapExchange(); + crd = startGrids(DEFAULT_NODE_CNT); - List statPerCacheGrps = new ArrayList<>(); - List totalStats = new ArrayList<>(); + fillCaches(100); + } - log.registerListener(logStr -> { - if (!logStr.contains(INFORMATION_PER_CACHE_GROUP_TEXT)) - return; + /** + * Starting a node with checking rebalance statistics. + * + * @param nodeId ID of the new node. + * @throws Exception if any error occurs. + */ + private void checkOutputRebalanceStatistics(int nodeId) throws Exception { + LogListener logLsnr = new LogListener(); - (logStr.contains(TOTAL_INFORMATION_TEXT) ? totalStats : statPerCacheGrps).add(logStr); - }); + log.registerListener(logLsnr); - IgniteEx newNode = startGrid(0); + IgniteEx newNode = startGrid(nodeId); awaitPartitionMapExchange(); - //+1 - because ignite-sys-cache - assertEquals(cacheCfgs.length + 1, statPerCacheGrps.size()); - assertEquals(1, totalStats.size()); - - Map newPartDistribution = perCacheGroupPartitionDistribution(newNode); + assertEquals(newNode.context().cache().cacheGroups().size(), logLsnr.statPerCacheGrps.size()); + assertEquals(1, logLsnr.totalStats.size()); - Map topicStats = perCacheGroupTopicStatistics(totalStats.get(0)).entrySet().stream() + Map topicStats = perCacheGroupTopicStatistics(logLsnr.totalStats.get(0)).entrySet().stream() .collect(toMap(Map.Entry::getKey, entry -> sumNum(entry.getValue(), "p=([0-9]+)"))); - newPartDistribution.forEach((cacheName, partCnt) -> { - assertEquals(partCnt, topicStats.get(cacheName)); - assertEquals(partCnt, partDistribution.get(cacheName)); - }); + logLsnr.cacheGrpRebParts + .forEach((cacheGrpName, parts) -> assertEquals(parts.size(), topicStats.get(cacheGrpName).intValue())); } /** @@ -324,30 +313,13 @@ private Map perCacheGroupTopicStatistics(final String s) { return perCacheGroupTopicStatistics; } - /** - * Return partition distribution per cache groups use internal api. - * - * @param node Require not null. - * @return Partition distribution per cache groups - * */ - private Map perCacheGroupPartitionDistribution(final IgniteEx node) { - assert nonNull(node); - - ClusterNode localNode = node.localNode(); - - return node.context().cache().cacheGroups().stream() - .map(CacheGroupContext::config) - .map(CacheConfiguration::getName) - .collect(toMap(identity(), cacheName -> node.affinity(cacheName).allPartitions(localNode).length)); - } - /** * Create {@link #DEFAULT_CACHE_NAMES} cache configurations. * * @param parts Count of partitions. * @param backups Count backup. * @return Cache group configurations. - * */ + */ private CacheConfiguration[] defaultCacheConfigurations(final int parts, final int backups) { return of(DEFAULT_CACHE_NAMES) .map(cacheName -> cacheConfiguration(cacheName, parts, backups)) @@ -374,7 +346,7 @@ private void fillCaches(final int cnt) { * * @param idx New node index. * @param notContainsStr String for assertNotContains in log output. - * @throws Exception + * @throws Exception if any error occurs. */ private void assertNotContainsAfterCreateNewNode(final int idx, final String notContainsStr) throws Exception { baos.reset(); @@ -405,4 +377,80 @@ private int sumNum(final String s, final String pattern) { return num; } + + /** + * Log listener for testing rebalance statistics. + */ + private class LogListener implements Consumer { + /** Started rebalance routine text. */ + static final String STARTED_REBALANCE_ROUTINE_TEXT = "Started rebalance routine"; + + /** Output statistics per cache group. */ + List statPerCacheGrps = new ArrayList<>(); + + /** Output total statistics. */ + List totalStats = new ArrayList<>(); + + /** Rebalanced partitions by cache groups. */ + Map> cacheGrpRebParts = new HashMap<>(); + + /** Pattern for extracting the name of a cache group. */ + Pattern cacheGrpExtractor = compile(STARTED_REBALANCE_ROUTINE_TEXT + " \\[(.+?)\\,"); + + /** Pattern for extracting fullPartitions for a cache group. */ + Pattern fullPartsExtractor = compile("fullPartitions=\\[(.+?)\\]"); + + /** {@inheritDoc} */ + @Override public void accept(String logStr) { + if (logStr.contains(INFORMATION_PER_CACHE_GROUP_TEXT)) + (logStr.contains(TOTAL_INFORMATION_TEXT) ? totalStats : statPerCacheGrps).add(logStr); + + if (logStr.contains(STARTED_REBALANCE_ROUTINE_TEXT)) { + cacheGrpRebParts.computeIfAbsent(extractValue(cacheGrpExtractor, logStr), s -> new HashSet<>()) + .addAll(parseParts(extractValue(fullPartsExtractor, logStr))); + } + } + + /** + * Parsing partition. + * + * @param s Partition string. + * @return Parsed partition. + */ + private Set parseParts(String s) { + assert nonNull(s); + + Set parts = new HashSet<>(); + + for (String num : s.split(", ")) { + if (num.contains("-")) { + String[] range = num.split("-"); + + rangeClosed(parseInt(range[0]), parseInt(range[1])).forEach(parts::add); + } + else + parts.add(parseInt(num)); + } + + return parts; + } + + /** + * Extracting a value from a string by pattern. + * + * @param extractor Pattern for extracting value. + * @param s String to extract the value. + * @return Extracted value. + */ + private String extractValue(Pattern extractor, String s) { + assert nonNull(extractor); + assert nonNull(s); + + Matcher matcher = extractor.matcher(s); + + assert matcher.find(); + + return matcher.group(1); + } + } }