Skip to content

Commit

Permalink
gg-25585 Rebalance logging extended.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanpwc committed Mar 16, 2020
1 parent 780f049 commit b8f3600
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3207,24 +3207,44 @@ else if (cntr == maxCntr.cnt)
if (localReserved != null) {
Long localHistCntr = localReserved.get(p);

if (localHistCntr != null && localHistCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) {
partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, localHistCntr);
if (localHistCntr != null && localHistCntr <= minCntr) {
if (maxCntrObj.nodes.contains(cctx.localNodeId())) {
partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, localHistCntr);

haveHistory.add(p);
haveHistory.add(p);

continue;
continue;
}
else {
if (log.isInfoEnabled()) {
log.info("Historical rebalance is not possible because no suitable supplier exists " +
"[nodeId=" + cctx.localNodeId() + ", grpId=" + top.groupId() +
", grpName=" + cctx.cache().cacheGroupDescriptor(top.groupId()).groupName() +
", part=" + p + ", localHistCntr=" + localHistCntr + ", minCntr=" + minCntr);
}
}
}
}

for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) {
Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);

if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) {
partHistSuppliers.put(e0.getKey(), top.groupId(), p, histCntr);
if (histCntr != null && histCntr <= minCntr) {
if (maxCntrObj.nodes.contains(e0.getKey())) {
partHistSuppliers.put(e0.getKey(), top.groupId(), p, histCntr);

haveHistory.add(p);
haveHistory.add(p);

break;
break;
}
else {
if (log.isInfoEnabled()) {
log.info("Historical rebalance is not possible because no suitable supplier exists " +
"[nodeId=" + e0.getKey() + ", grpId=" + top.groupId() +
", grpName=" + cctx.cache().cacheGroupDescriptor(top.groupId()).groupName() +
", part=" + p + ", histCntr=" + histCntr + ", minCntr=" + minCntr);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
Expand All @@ -41,11 +43,13 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
Expand All @@ -55,6 +59,7 @@
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;

Expand Down Expand Up @@ -208,6 +213,10 @@ public boolean disableRebalancingCancellationOptimization() {

CachePartitionFullCountersMap countersMap = grp.topology().fullUpdateCounters();

GridIntList skippedPartitionsLackHistSupplier = new GridIntList();

GridIntList skippedPartitionsCleared = new GridIntList();

for (int p = 0; p < partitions; p++) {
if (ctx.exchange().hasPendingServerExchange()) {
if (log.isDebugEnabled())
Expand Down Expand Up @@ -252,7 +261,10 @@ public boolean disableRebalancingCancellationOptimization() {
part.resetUpdateCounter();
}

// assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part;
if (part.state() != MOVING) {
throw new AssertionError("Partition has invalid state for rebalance "
+ aff.topologyVersion() + " " + part);
}

ClusterNode histSupplier = null;

Expand All @@ -278,9 +290,16 @@ public boolean disableRebalancingCancellationOptimization() {
}

// TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11790
msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
msg.partitions().
addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
}
else {
if (histSupplier == null)
skippedPartitionsLackHistSupplier.add(p);

if (histSupplier != null && exchFut.isClearingPartition(grp, p))
skippedPartitionsCleared.add(p);

List<ClusterNode> picked = remoteOwners(p, topVer);

if (picked.isEmpty()) {
Expand Down Expand Up @@ -315,6 +334,24 @@ public boolean disableRebalancingCancellationOptimization() {
}
}

if (log.isInfoEnabled()) {
if (!skippedPartitionsLackHistSupplier.isEmpty()) {
log.info("Unable to perform historical rebalance cause " +
"history supplier is not available [grpId=" + grp.groupId() + ", grpName=" + grp.name() +
", parts=" + S.compact(
Arrays.stream(skippedPartitionsLackHistSupplier.array()).boxed().collect(Collectors.toList())) +
", topVer=" + topVer + ']');
}

if (!skippedPartitionsCleared.isEmpty()) {
log.info("Unable to perform historical rebalance because clearing is required for partitions" +
"[grpId=" + grp.groupId() + ", grpName=" + grp.name() +
", parts=" + S.compact(
Arrays.stream(skippedPartitionsCleared.array()).boxed().collect(Collectors.toList())) +
", topVer=" + topVer + ']');
}
}

if (!assignments.isEmpty())
ctx.database().lastCheckpointInapplicableForWalRebalance(grp.groupId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,14 @@ private boolean safeToUpdatePageMemories() {
}
}

if (log.isInfoEnabled()) {
log.info("Following partitions were reserved for potential history rebalance [" +
grpPartsWithCnts.entrySet().stream().map(entry ->
"grpId=" + entry.getKey() +
", grpName=" + cctx.cache().cacheGroupDescriptor(entry.getKey()).groupName() +
", parts=" + S.compact(entry.getValue().keySet())).collect(Collectors.joining(", ")) + ']');
}

return grpPartsWithCnts;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.db.wal;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/**
* Tests for checking rebalance log messages.
*/
public class IgniteWalRebalanceLoggingTest extends GridCommonAbstractTest {
/** */
public static final int CHECKPOINT_FREQUENCY = 100;

/** Test logger. */
private final ListeningTestLogger srvLog = new ListeningTestLogger(false, log);

/** */
public static final int KEYS_LOW_BORDER = 100;

/** */
public static final int KEYS_UPPER_BORDER = 200;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setGridLogger(srvLog);

DataStorageConfiguration storageCfg = new DataStorageConfiguration();

storageCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);

storageCfg.setWalMode(WALMode.LOG_ONLY).setWalCompactionEnabled(true).setWalCompactionLevel(1);

storageCfg.setCheckpointFrequency(CHECKPOINT_FREQUENCY);

cfg.setDataStorageConfiguration(storageCfg);

return cfg;
}

/** {@inheritDoc}*/
@Override protected void beforeTest() throws Exception {
super.beforeTest();

cleanPersistenceDir();
}

/** {@inheritDoc}*/
@Override protected void afterTest() throws Exception {
stopAllGrids();

super.afterTest();
}

/**
* Check that in case of Historical rebalance we log appropriate messages.
* <p>
* <b>Steps:</b>
* <ol>
* <li>set IGNITE_PDS_WAL_REBALANCE_THRESHOLD to 1</li>
* <li>Start two nodes.</li>
* <li>Create two caches each in it's own cache group and populate them with some data.</li>
* <li>Stop second node and add more data to both caches.</li>
* <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be
* created.</li>
* <li>Start, previously stopped node and await for PME.</li>
* </ol>
* <p>
* <b>Expected result:</b>
* Assert that (after restarting second node) log would contain following messages expected number of times:
* <ul>
* <li>'Following partitions were reserved for potential history rebalance [groupId=1813188848
* parts=[0-7], groupId=1813188847 parts=[0-7]]'
* Meaning that partitions were reserved for history rebalance.</li>
* <li>'Starting rebalance routine [cache_group1, topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0],
* supplier=*, fullPartitions=[0], histPartitions=[0-7]]' Meaning that history rebalance started.</li>
* </ul>
* And assert that (after restarting second node) log would <b>not</b> contain following messages:
* <ul>
* <li>Unable to perform historical rebalance...</li>
* </ul>
* @throws Exception If failed.
*/
@Test
public void testHistoricalRebalanceLogMsg() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "1");

LogListener expMsgsLsnr = LogListener.
matches("Following partitions were reserved for potential history rebalance [grpId=1813188848," +
" grpName=cache_group2, parts=[0-7], grpId=1813188847, grpName=cache_group1, parts=[0-7]]").times(3).
andMatches("fullPartitions=[], histPartitions=[0-7]").times(2).build();

LogListener unexpectedMessagesLsnr =
LogListener.matches("Unable to perform historical rebalance").build();

checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr, unexpectedMessagesLsnr);

assertTrue(expMsgsLsnr.check());
assertFalse(unexpectedMessagesLsnr.check());
}

/**
* Check that in case of Full rebalance we log appropriate messages.
* <p>
* <b>Steps:</b>
* <ol>
* <li>restore IGNITE_PDS_WAL_REBALANCE_THRESHOLD to default 500000</li>
* <li>Start two nodes.</li>
* <li>Create two caches each in it's own cache group and populate them with some data.</li>
* <li>Stop second node and add more data to both caches.</li>
* <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be
* created.</li>
* <li>Start, previously stopped node and await for PME.</li>
* </ol>
* <p>
* <b>Expected result:</b>
* Assert that (after restarting second node) log would contain following messages expected number of times:
* <ul>
* <li>'Following partitions were reserved for potential history rebalance []'
* Meaning that no partitions were reserved for history rebalance.</li>
* <li>'Unable to perform historical rebalance cause history supplier is not available'</li>
* <li>'Unable to perform historical rebalance cause partition is supposed to be cleared'</li>
* <li>'Starting rebalance routine [cache_group1, topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0],
* supplier=* fullPartitions=[0-7], histPartitions=[]]'</li>
* </ul>
* @throws Exception If failed.
*/
@Test
public void testFullRebalanceLogMsgs() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "500000");
LogListener expMsgsLsnr = LogListener.
matches("Following partitions were reserved for potential history rebalance []").times(4).
andMatches("Unable to perform historical rebalance cause history supplier is not available [" +
"grpId=1813188848, grpName=cache_group2, parts=[0-7]," +
" topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0]]").
andMatches("Unable to perform historical rebalance cause history supplier is not available [" +
"grpId=1813188847, grpName=cache_group1, parts=[0-7]," +
" topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0]]").
andMatches("fullPartitions=[0-7], histPartitions=[]").times(2).build();

checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr);

assertTrue(expMsgsLsnr.check());
}

/**
* Test utility method.
*
* @param lsnrs Listeners to register with server logger.
* @throws Exception If failed.
*/
private void checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(LogListener... lsnrs)
throws Exception {
startGridsMultiThreaded(2).cluster().active(true);

IgniteCache<Integer, String> cache1 = createCache("cache1", "cache_group1");
IgniteCache<Integer, String> cache2 = createCache("cache2", "cache_group2");

for (int i = 0; i < KEYS_LOW_BORDER; i++) {
cache1.put(i, "abc" + i);
cache2.put(i, "abc" + i);
}

stopGrid(1);

for (int i = KEYS_LOW_BORDER; i < KEYS_UPPER_BORDER; i++) {
cache1.put(i, "abc" + i);
cache2.put(i, "abc" + i);
}

Thread.sleep(CHECKPOINT_FREQUENCY * 2);

srvLog.clearListeners();

for (LogListener lsnr: lsnrs)
srvLog.registerListener(lsnr);

startGrid(1);

awaitPartitionMapExchange();
}

/**
* Create cache with specific name and group name.
* @param cacheName Cache name.
* @param cacheGrpName Cache group name.
* @return Created cache.
*/
private IgniteCache<Integer, String> createCache(String cacheName, String cacheGrpName) {
return ignite(0).createCache(
new CacheConfiguration<Integer, String>(cacheName).
setAffinity(new RendezvousAffinityFunction().setPartitions(8))
.setGroupName(cacheGrpName).
setBackups(1));
}
}

0 comments on commit b8f3600

Please sign in to comment.