Skip to content

Commit

Permalink
GG-21151 Add extended logging for rebalance.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktkalenko committed Aug 14, 2019
1 parent e5609aa commit 60941c1
Show file tree
Hide file tree
Showing 5 changed files with 1,210 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,13 @@ public final class IgniteSystemProperties {
*/
public static final String INDEX_REBUILDING_PARALLELISM = "INDEX_REBUILDING_PARALLELISM";

/** Enable write rebalnce statistics into log. Default: false */
public static final String IGNITE_WRITE_REBALANCE_STATISTICS = "IGNITE_WRITE_REBALANCE_STATISTICS";

/** Enable write rebalnce statistics by partitions into log. Default: false */
public static final String IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS =
"IGNITE_WRITE_REBALANCE_PARTITION_STATISTICS";

/**
* Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with
* information about how much time did it spent in system time (time while aquiring locks, preparing,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -52,6 +53,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceStatisticsUtils.RebalanceFutureStatistics;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
Expand All @@ -76,10 +78,17 @@
import org.apache.ignite.spi.IgniteSpiException;
import org.jetbrains.annotations.Nullable;

import static java.lang.System.currentTimeMillis;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceStatisticsUtils.rebalanceStatistics;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
Expand Down Expand Up @@ -117,6 +126,10 @@ public class GridDhtPartitionDemander {
/** Cached rebalance topics. */
private final Map<Integer, Object> rebalanceTopics;

/** Futures involved in the last rebalance. For statistics. */
@GridToStringExclude
private final Collection<RebalanceFuture> lastStatFutures = new ConcurrentLinkedQueue<>();

/**
* @param grp Ccahe group.
*/
Expand Down Expand Up @@ -363,17 +376,18 @@ Runnable addAssignments(
}

return () -> {
if (next != null)
fut.listen(f -> {
try {
if (f.get()) // Not cancelled.
next.run(); // Starts next cache rebalancing (according to the order).
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug(e.getMessage());
}
});
fut.listen(f -> {
try {
printRebalanceStatistics();

if (f.get() && nonNull(next))
next.run();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug(e.getMessage());
}
});

requestPartitions(fut, assignments);
};
Expand Down Expand Up @@ -511,6 +525,8 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign
return;

try {
fut.stat.addMessageStatistics(topicId, node);

ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());

Expand Down Expand Up @@ -737,6 +753,8 @@ public void handleSupplyMessage(
}

try {
fut.stat.addReceivePartitionStatistics(topicId, ctx.node(nodeId), supplyMsg);

AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);

// Preload.
Expand Down Expand Up @@ -1224,6 +1242,10 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
* partition in OWNING state. */
private final ReentrantReadWriteLock cancelLock;

/** Rebalance statistics */
@GridToStringExclude
final RebalanceFutureStatistics stat = new RebalanceFutureStatistics();

/**
* @param grp Cache group.
* @param assignments Assignments.
Expand Down Expand Up @@ -1532,4 +1554,74 @@ private void sendRebalanceFinishedEvent() {
return S.toString(RebalanceFuture.class, this);
}
}

/**
* Collect demander per cache groups. For print statistics.
*
* @return List demanders.
* */
private List<GridDhtPartitionDemander> demanders(){
return ctx.cacheContexts().stream()
.map(GridCacheContext::preloader)
.filter(GridDhtPreloader.class::isInstance)
.map(GridDhtPreloader.class::cast)
.map(GridDhtPreloader::demander)
.collect(toList());
}

/**
* Print rebalance statistics into log.
* Statistic will print if
* {@link RebalanceStatisticsUtils#printRebalanceStatistics()
* printRebalanceStatistics()} == true.
* To use correctly you need to call this method exactly once right after
* {@code RebalanceFuture} was completed (successfully or not).
* <p/>
* If {@link #rebalanceFut} was done successfully, prints statistics
* for cache group.
* <p/>
* If the whole rebalance is over, print statistics for all cache groups.
* The end of the rebalance is determined by the successful done all
* {@code RebalanceFuture}'s.
*
* @throws IgniteCheckedException when get result {@code RebalanceFuture}
* @see RebalanceFuture RebalanceFuture
*/
private void printRebalanceStatistics() throws IgniteCheckedException {
if (!RebalanceStatisticsUtils.printRebalanceStatistics())
return;

RebalanceFuture currRebFut = rebalanceFut;
assert currRebFut.isDone() : "RebalanceFuture should be done.";

currRebFut.stat.endTime(currentTimeMillis());
lastStatFutures.add(currRebFut);

if (currRebFut.get()) //Success rebalance for current cache group
log.info(rebalanceStatistics(false, singletonMap(grp, singletonList(currRebFut))));
else
return;

for (GridCacheContext gridCacheContext : ctx.cacheContexts()) {
IgniteInternalFuture<Boolean> rebalanceFuture = gridCacheContext.preloader().rebalanceFuture();

if (!rebalanceFuture.isDone() || !rebalanceFuture.get()) //Rebalance not done or not success
return;
}

List<GridDhtPartitionDemander> demanders = demanders();

Map<CacheGroupContext, Collection<RebalanceFuture>> rebFutrs =
demanders.stream().collect(toMap(demander -> demander.grp, demander -> demander.lastStatFutures));

try {
log.info(rebalanceStatistics(true, rebFutrs));
}
finally {
demanders.forEach(demander -> {
demander.rebalanceFut.stat.clear();
demander.lastStatFutures.clear();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,4 +632,13 @@ private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCach
@Override public void dumpDebugInfo() {
// No-op
}

/**
* Return demander.
*
* @return Demander.
* */
public GridDhtPartitionDemander demander() {
return demander;
}
}

0 comments on commit 60941c1

Please sign in to comment.