From cf09e76916c38f867cd5cb618e345c74672026db Mon Sep 17 00:00:00 2001 From: Ilya Lantukh Date: Tue, 19 Jun 2018 18:10:48 +0300 Subject: [PATCH] IGNITE-8554 Cache metrics: expose metrics with rebalance info about keys - Fixes #4094. Signed-off-by: Ivan Rakov --- .../org/apache/ignite/cache/CacheMetrics.java | 10 +++ .../cache/CacheAffinitySharedManager.java | 3 +- .../cache/CacheClusterMetricsMXBeanImpl.java | 10 +++ .../cache/CacheLocalMetricsMXBeanImpl.java | 8 ++ .../processors/cache/CacheMetricsImpl.java | 15 +++- .../cache/CacheMetricsSnapshot.java | 30 +++++++ .../GridCachePartitionExchangeManager.java | 13 ++- .../dht/GridClientPartitionTopology.java | 35 ++++++++ .../dht/GridDhtPartitionTopology.java | 11 +++ .../dht/GridDhtPartitionTopologyImpl.java | 34 ++++++++ .../preloader/GridDhtPartitionDemander.java | 28 ++++--- .../GridDhtPartitionSupplyMessage.java | 9 +-- .../GridDhtPartitionsExchangeFuture.java | 60 ++++++++++++-- .../GridDhtPartitionsFullMessage.java | 80 +++++++++++++++++-- .../GridDhtPartitionsSingleMessage.java | 24 +++--- .../platform/cache/PlatformCache.java | 2 + .../visor/cache/VisorCacheMetrics.java | 35 ++++++++ .../visor/node/VisorNodeDataCollectorJob.java | 20 ++++- .../CacheGroupsMetricsRebalanceTest.java | 50 +++++++----- .../PlatformCacheWriteMetricsTask.java | 10 +++ .../Apache.Ignite.Core/Cache/ICacheMetrics.cs | 18 +++++ .../Impl/Cache/CacheMetricsImpl.cs | 14 ++++ 22 files changed, 446 insertions(+), 73 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 99bf2c2f185d7..e3e64462bebce 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -508,6 +508,16 @@ public interface CacheMetrics { */ public int getRebalancingPartitionsCount(); + /** + * @return Number of already rebalanced keys. + */ + public long getRebalancedKeys(); + + /** + * @return Number estimated to rebalance keys. + */ + public long getEstimatedRebalancingKeys(); + /** * @return Estimated number of keys to be rebalanced on current node. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 92b8d3ea87019..08eb43f5291c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -472,6 +472,7 @@ void onCacheGroupCreated(CacheGroupContext grp) { clientTop.partitionMap(true), clientTop.fullUpdateCounters(), Collections.emptySet(), + null, null); } @@ -530,7 +531,7 @@ else if (!fetchFuts.containsKey(grp.groupId())) { grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); - grp.topology().update(topVer, partMap, null, Collections.emptySet(), null); + grp.topology().update(topVer, partMap, null, Collections.emptySet(), null, null); topFut.validate(grp, discoCache.allNodes()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index 32603cbf940d5..3d5278cc96496 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -369,6 +369,16 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { return cache.clusterMetrics().getTotalPartitionsCount(); } + /** {@inheritDoc} */ + @Override public long getRebalancedKeys() { + return cache.clusterMetrics().getRebalancedKeys(); + } + + /** {@inheritDoc} */ + @Override public long getEstimatedRebalancingKeys() { + return cache.clusterMetrics().getEstimatedRebalancingKeys(); + } + /** {@inheritDoc} */ @Override public int getRebalancingPartitionsCount() { return cache.clusterMetrics().getRebalancingPartitionsCount(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index d3060d3ab3835..212c7a07c4620 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -370,6 +370,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { return cache.metrics0().getTotalPartitionsCount(); } + @Override public long getRebalancedKeys() { + return cache.metrics0().getRebalancedKeys(); + } + + @Override public long getEstimatedRebalancingKeys() { + return cache.metrics0().getEstimatedRebalancingKeys(); + } + /** {@inheritDoc} */ @Override public int getRebalancingPartitionsCount() { return cache.metrics0().getRebalancingPartitionsCount(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 96f40bfd72ea3..0f6d06f7edd56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -856,6 +856,16 @@ public EntriesStatMetrics getEntriesStat() { return getEntriesStat().rebalancingPartitionsCount(); } + /** {@inheritDoc} */ + @Override public long getRebalancedKeys() { + return rebalancedKeys.get(); + } + + /** {@inheritDoc} */ + @Override public long getEstimatedRebalancingKeys() { + return estimatedRebalancingKeys.get(); + } + /** {@inheritDoc} */ @Override public long getKeysToRebalanceLeft() { return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get()); @@ -935,7 +945,10 @@ public void rebalanceClearingPartitions(int partitions) { * First rebalance supply message callback. * @param keysCnt Estimated number of keys. */ - public void onRebalancingKeysCountEstimateReceived(long keysCnt) { + public void onRebalancingKeysCountEstimateReceived(Long keysCnt) { + if (keysCnt == null) + return; + estimatedRebalancingKeys.addAndGet(keysCnt); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 539ad59a818a1..8a0f0e4b326ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -197,6 +197,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Rebalancing partitions count. */ private int rebalancingPartitionsCnt; + /** Number of already rebalanced keys. */ + private long rebalancedKeys; + + /** Number estimated to rebalance keys. */ + private long estimatedRebalancingKeys; + /** Keys to rebalance left. */ private long keysToRebalanceLeft; @@ -331,6 +337,8 @@ public CacheMetricsSnapshot(CacheMetricsImpl m) { totalPartitionsCnt = entriesStat.totalPartitionsCount(); rebalancingPartitionsCnt = entriesStat.rebalancingPartitionsCount(); + rebalancedKeys = m.getRebalancedKeys(); + estimatedRebalancingKeys = m.getEstimatedRebalancingKeys(); keysToRebalanceLeft = m.getKeysToRebalanceLeft(); rebalancingBytesRate = m.getRebalancingBytesRate(); rebalancingKeysRate = m.getRebalancingKeysRate(); @@ -459,6 +467,8 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection metrics) else writeBehindErrorRetryCnt = -1; + rebalancedKeys += e.getRebalancedKeys(); + estimatedRebalancingKeys += e.getEstimatedRebalancingKeys(); totalPartitionsCnt += e.getTotalPartitionsCount(); rebalancingPartitionsCnt += e.getRebalancingPartitionsCount(); keysToRebalanceLeft += e.getKeysToRebalanceLeft(); @@ -733,6 +743,14 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection metrics) return totalPartitionsCnt; } + @Override public long getRebalancedKeys() { + return rebalancedKeys; + } + + @Override public long getEstimatedRebalancingKeys() { + return estimatedRebalancingKeys; + } + /** {@inheritDoc} */ @Override public int getRebalancingPartitionsCount() { return rebalancingPartitionsCnt; @@ -926,6 +944,12 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection metrics) out.writeLong(keysToRebalanceLeft); out.writeLong(rebalancingBytesRate); out.writeLong(rebalancingKeysRate); + + out.writeLong(rebalancedKeys); + out.writeLong(estimatedRebalancingKeys); + out.writeLong(rebalanceStartTime); + out.writeLong(rebalanceFinishTime); + out.writeLong(rebalanceClearingPartitionsLeft); } /** {@inheritDoc} */ @@ -981,5 +1005,11 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection metrics) keysToRebalanceLeft = in.readLong(); rebalancingBytesRate = in.readLong(); rebalancingKeysRate = in.readLong(); + + rebalancedKeys = in.readLong(); + estimatedRebalancingKeys = in.readLong(); + rebalanceStartTime = in.readLong(); + rebalanceFinishTime = in.readLong(); + rebalanceClearingPartitionsLeft = in.readLong(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 38ddaf661de92..715c2907b2506 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1121,6 +1121,8 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( affCache.similarAffinityKey()); } + m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes()); + if (exchId != null) { CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters(); @@ -1154,6 +1156,8 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( m.addPartitionUpdateCounters(top.groupId(), cntrsMap); else m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap)); + + m.addPartitionSizes(top.groupId(), top.globalPartSizes()); } } @@ -1264,9 +1268,9 @@ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( m.addPartitionUpdateCounters(grp.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - - m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes()); } + + m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes()); } } @@ -1288,9 +1292,9 @@ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( m.addPartitionUpdateCounters(top.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - - m.addPartitionSizes(top.groupId(), top.partitionSizes()); } + + m.addPartitionSizes(top.groupId(), top.partitionSizes()); } return m; @@ -1482,6 +1486,7 @@ else if (!grp.isLocal()) entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId), + msg.partitionSizes(grpId), msg.topologyVersion()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index b3652331b210b..fc80bbc0916c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -119,6 +119,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** */ private final int parts; + /** */ + private volatile Map globalPartSizes; + /** * @param cctx Context. * @param discoCache Discovery data cache. @@ -707,6 +710,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, Set partsToReload, + @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@ -810,6 +814,9 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD if (cntrMap != null) this.cntrMap = new CachePartitionFullCountersMap(cntrMap); + if (partSizes != null) + this.globalPartSizes = partSizes; + consistencyCheck(); if (log.isDebugEnabled()) @@ -1222,6 +1229,34 @@ private void removeNode(UUID nodeId) { return Collections.emptyMap(); } + /** {@inheritDoc} */ + @Override @Nullable public Map globalPartSizes() { + lock.readLock().lock(); + + try { + if (globalPartSizes == null) + return Collections.emptyMap(); + + return Collections.unmodifiableMap(globalPartSizes); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void globalPartSizes(@Nullable Map partSizes) { + lock.writeLock().lock(); + + try { + this.globalPartSizes = partSizes; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { assert false : "Should not be called on non-affinity node"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d6c54506e1208..b77dbd6e0d719 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -287,6 +287,7 @@ public boolean update( GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, Set partsToReload, + @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer); /** @@ -382,6 +383,16 @@ public boolean update(@Nullable GridDhtPartitionExchangeId exchId, */ public void printMemoryStats(int threshold); + /** + * @return Sizes of up-to-date partition versions in topology. + */ + Map globalPartSizes(); + + /** + * @param partSizes Sizes of up-to-date partition versions in topology. + */ + void globalPartSizes(@Nullable Map partSizes); + /** * @param topVer Topology version. * @return {@code True} if rebalance process finished. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0599c23f30fcb..cabb0b8e11604 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -137,6 +137,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Partition update counter. */ private final CachePartitionFullCountersMap cntrMap; + /** */ + private volatile Map globalPartSizes; + /** */ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; @@ -1329,6 +1332,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap incomeCntrMap, Set partsToReload, + @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) { log.debug("Updating full partition map [grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + @@ -1544,6 +1548,9 @@ else if (state == MOVING) { updateRebalanceVersion(aff.assignment()); } + if (partSizes != null) + this.globalPartSizes = partSizes; + consistencyCheck(); if (log.isDebugEnabled()) { @@ -2624,6 +2631,33 @@ private void removeNode(UUID nodeId) { } } + /** {@inheritDoc} */ + @Override public Map globalPartSizes() { + lock.readLock().lock(); + + try { + if (globalPartSizes == null) + return Collections.emptyMap(); + + return Collections.unmodifiableMap(globalPartSizes); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void globalPartSizes(@Nullable Map partSizes) { + lock.writeLock().lock(); + + try { + this.globalPartSizes = partSizes; + } + finally { + lock.writeLock().unlock(); + } + } + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { AffinityTopologyVersion curTopVer = this.readyTopVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index c94f511d91887..3cfc25f6b7fbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -307,9 +307,22 @@ Runnable addAssignments( metrics.clearRebalanceCounters(); - metrics.startRebalance(0); + for (GridDhtPartitionDemandMessage msg : assignments.values()) { + for (Integer partId : msg.partitions().fullSet()) { + metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId)); + } + + CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap(); - rebalanceFut.listen(f -> metrics.clearRebalanceCounters()); + for (int i = 0; i < histMap.size(); i++) { + long from = histMap.initialUpdateCounterAt(i); + long to = histMap.updateCounterAt(i); + + metrics.onRebalancingKeysCountEstimateReceived(to - from); + } + } + + metrics.startRebalance(0); } } @@ -714,8 +727,6 @@ public void handleSupplyMessage( try { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); - GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); - ctx.database().checkpointReadLock(); try { @@ -749,11 +760,10 @@ public void handleSupplyMessage( break; } - if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) - cctx = ctx.cacheContext(entry.cacheId()); - - if (cctx != null && cctx.statisticsEnabled()) - cctx.cache().metrics0().onRebalanceKeyReceived(); + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) + cctx.cache().metrics0().onRebalanceKeyReceived(); + } } // If message was last for this partition, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 77baa38cc52c3..4ecffc492c7b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -443,7 +443,7 @@ public int size() { * @return Estimated keys count. */ public long estimatedKeysCount() { - return estimatedKeysCnt; + return -1; } /** @@ -457,12 +457,7 @@ public void addEstimatedKeysCount(long cnt) { * @return Estimated keys count for a given cache ID. */ public long keysForCache(int cacheId) { - if (this.keysPerCache == null) - return -1; - - Long cnt = this.keysPerCache.get(cacheId); - - return cnt != null ? cnt : 0; + return -1; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5b1034716b7e9..366d8ea47dec2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -897,6 +897,7 @@ private void updateTopologies(boolean crd) throws IgniteCheckedException { clientTop.partitionMap(true), clientTop.fullUpdateCounters(), Collections.emptySet(), + null, null); } } @@ -2362,6 +2363,37 @@ private void onAffinityInitialized(IgniteInternalFuture partSizes = new HashMap<>(); + + for (Map.Entry e : msgs.entrySet()) { + GridDhtPartitionsSingleMessage singleMsg = e.getValue(); + + GridDhtPartitionMap partMap = singleMsg.partitions().get(top.groupId()); + + if (partMap == null) + continue; + + for (Map.Entry e0 : partMap.entrySet()) { + int p = e0.getKey(); + GridDhtPartitionState state = e0.getValue(); + + if (state == GridDhtPartitionState.OWNING) + partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p)); + } + } + + for (GridDhtLocalPartition locPart : top.currentLocalPartitions()) { + if (locPart.state() == GridDhtPartitionState.OWNING) + partSizes.put(locPart.id(), locPart.fullSize()); + } + + top.globalPartSizes(partSizes); + } + /** * Collects and determines new owners of partitions for all nodes for given {@code top}. * @@ -2402,7 +2434,7 @@ private void assignPartitionStates(GridDhtPartitionTopology top) { CounterWithNodes maxCntr = maxCntrs.get(p); if (maxCntr == null || cntr > maxCntr.cnt) - maxCntrs.put(p, new CounterWithNodes(cntr, uuid)); + maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), uuid)); else if (cntr == maxCntr.cnt) maxCntr.nodes.add(uuid); } @@ -2428,7 +2460,7 @@ else if (cntr == maxCntr.cnt) CounterWithNodes maxCntr = maxCntrs.get(part.id()); if (maxCntr == null && cntr == 0) { - CounterWithNodes cntrObj = new CounterWithNodes(0, cctx.localNodeId()); + CounterWithNodes cntrObj = new CounterWithNodes(0, 0L, cctx.localNodeId()); for (UUID nodeId : msgs.keySet()) { if (top.partitionState(nodeId, part.id()) == GridDhtPartitionState.OWNING) @@ -2438,7 +2470,7 @@ else if (cntr == maxCntr.cnt) maxCntrs.put(part.id(), cntrObj); } else if (maxCntr == null || cntr > maxCntr.cnt) - maxCntrs.put(part.id(), new CounterWithNodes(cntr, cctx.localNodeId())); + maxCntrs.put(part.id(), new CounterWithNodes(cntr, part.fullSize(), cctx.localNodeId())); else if (cntr == maxCntr.cnt) maxCntr.nodes.add(cctx.localNodeId()); } @@ -2490,6 +2522,12 @@ else if (cntr == maxCntr.cnt) for (Map.Entry e : maxCntrs.entrySet()) ownersByUpdCounters.put(e.getKey(), e.getValue().nodes); + Map partSizes = new HashMap<>(maxCntrs.size()); + for (Map.Entry e : maxCntrs.entrySet()) + partSizes.put(e.getKey(), e.getValue().size); + + top.globalPartSizes(partSizes); + Map> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory); for (Map.Entry> e : partitionsToRebalance.entrySet()) { @@ -2903,16 +2941,16 @@ private void assignPartitionsStates() { if (grpDesc.config().getCacheMode() == CacheMode.LOCAL) continue; - if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) - continue; - CacheGroupContext grpCtx = cctx.cache().cacheGroup(e.getKey()); GridDhtPartitionTopology top = grpCtx != null ? grpCtx.topology() : cctx.exchange().clientTopology(e.getKey(), events().discoveryCache()); - assignPartitionStates(top); + if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) + assignPartitionSizes(top); + else + assignPartitionStates(top); } } @@ -3303,6 +3341,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa entry.getValue(), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId), + msg.partitionSizes(grpId), null); } else { @@ -3318,6 +3357,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa entry.getValue(), cntrMap, Collections.emptySet(), + null, null); } } @@ -3902,6 +3942,9 @@ private static class CounterWithNodes { /** */ private final long cnt; + /** */ + private final long size; + /** */ private final Set nodes = new HashSet<>(); @@ -3909,8 +3952,9 @@ private static class CounterWithNodes { * @param cnt Count. * @param firstNode Node ID. */ - private CounterWithNodes(long cnt, UUID firstNode) { + private CounterWithNodes(long cnt, @Nullable Long size, UUID firstNode) { this.cnt = cnt; + this.size = size != null ? size : 0; nodes.add(firstNode); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 4a449d18a09d8..59624687b9606 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -93,6 +93,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Serialized partitions that must be cleared and re-loaded. */ private byte[] partsToReloadBytes; + /** Partitions sizes. */ + @GridToStringInclude + @GridDirectTransient + private Map> partsSizes; + + /** Serialized partitions sizes. */ + private byte[] partsSizesBytes; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -164,6 +172,8 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, cp.partHistSuppliersBytes = partHistSuppliersBytes; cp.partsToReload = partsToReload; cp.partsToReloadBytes = partsToReloadBytes; + cp.partsSizes = partsSizes; + cp.partsSizesBytes = partsSizesBytes; cp.topVer = topVer; cp.errs = errs; cp.errsBytes = errsBytes; @@ -331,6 +341,9 @@ public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() { return partHistSuppliers; } + /** + * + */ public Set partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); @@ -338,6 +351,35 @@ public Set partsToReload(UUID nodeId, int grpId) { return partsToReload.get(nodeId, grpId); } + /** + * Adds partition sizes map for specified {@code grpId} to the current message. + * + * @param grpId Group id. + * @param partSizesMap Partition sizes map. + */ + public void addPartitionSizes(int grpId, Map partSizesMap) { + if (partSizesMap.isEmpty()) + return; + + if (partsSizes == null) + partsSizes = new HashMap<>(); + + partsSizes.put(grpId, partSizesMap); + } + + /** + * Returns partition sizes map for specified {@code grpId}. + * + * @param grpId Group id. + * @return Partition sizes map (partId, partSize). + */ + public Map partitionSizes(int grpId) { + if (partsSizes == null) + return Collections.emptyMap(); + + return partsSizes.getOrDefault(grpId, Collections.emptyMap()); + } + /** * @return Errors map. */ @@ -369,6 +411,7 @@ void setErrorsMap(Map errs) { byte[] partCntrsBytes20 = null; byte[] partHistSuppliersBytes0 = null; byte[] partsToReloadBytes0 = null; + byte[] partsSizesBytes0 = null; byte[] errsBytes0 = null; if (!F.isEmpty(parts) && partsBytes == null) @@ -386,6 +429,9 @@ void setErrorsMap(Map errs) { if (partsToReload != null && partsToReloadBytes == null) partsToReloadBytes0 = U.marshal(ctx, partsToReload); + if (partsSizes != null && partsSizesBytes == null) + partsSizesBytes0 = U.marshal(ctx, partsSizes); + if (!F.isEmpty(errs) && errsBytes == null) errsBytes0 = U.marshal(ctx, errs); @@ -398,6 +444,7 @@ void setErrorsMap(Map errs) { byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20); byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); + byte[] partsSizesBytesZip = U.zip(partsSizesBytes0); byte[] exsBytesZip = U.zip(errsBytes0); partsBytes0 = partsBytesZip; @@ -405,6 +452,7 @@ void setErrorsMap(Map errs) { partCntrsBytes20 = partCntrsBytes2Zip; partHistSuppliersBytes0 = partHistSuppliersBytesZip; partsToReloadBytes0 = partsToReloadBytesZip; + partsSizesBytes0 = partsSizesBytesZip; errsBytes0 = exsBytesZip; compressed(true); @@ -419,6 +467,7 @@ void setErrorsMap(Map errs) { partCntrsBytes2 = partCntrsBytes20; partHistSuppliersBytes = partHistSuppliersBytes0; partsToReloadBytes = partsToReloadBytes0; + partsSizesBytes = partsSizesBytes0; errsBytes = errsBytes0; } } @@ -506,6 +555,13 @@ public void topologyVersion(AffinityTopologyVersion topVer) { partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + if (partsSizesBytes != null && partsSizes == null) { + if (compressed()) + partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); @@ -584,18 +640,24 @@ public void topologyVersion(AffinityTopologyVersion topVer) { writer.incrementState(); case 13: - if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + if (!writer.writeByteArray("partsSizesBytes", partsSizesBytes)) return false; writer.incrementState(); case 14: - if (!writer.writeMessage("resTopVer", resTopVer)) + if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) return false; writer.incrementState(); case 15: + if (!writer.writeMessage("resTopVer", resTopVer)) + return false; + + writer.incrementState(); + + case 16: if (!writer.writeMessage("topVer", topVer)) return false; @@ -682,7 +744,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 13: - partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + partsSizesBytes = reader.readByteArray("partsSizesBytes"); if (!reader.isLastRead()) return false; @@ -690,7 +752,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 14: - resTopVer = reader.readMessage("resTopVer"); + partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); if (!reader.isLastRead()) return false; @@ -698,6 +760,14 @@ public void topologyVersion(AffinityTopologyVersion topVer) { reader.incrementState(); case 15: + resTopVer = reader.readMessage("resTopVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -717,7 +787,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 17; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index b60070e6b4dd2..804cc030489f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -70,7 +70,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Partitions sizes. */ @GridToStringInclude @GridDirectTransient - private Map> partSizes; + private Map> partsSizes; /** Serialized partitions counters. */ private byte[] partsSizesBytes; @@ -237,10 +237,10 @@ public void addPartitionSizes(int grpId, Map partSizesMap) { if (partSizesMap.isEmpty()) return; - if (partSizes == null) - partSizes = new HashMap<>(); + if (partsSizes == null) + partsSizes = new HashMap<>(); - partSizes.put(grpId, partSizesMap); + partsSizes.put(grpId, partSizesMap); } /** @@ -250,10 +250,10 @@ public void addPartitionSizes(int grpId, Map partSizesMap) { * @return Partition sizes map (partId, partSize). */ public Map partitionSizes(int grpId) { - if (partSizes == null) + if (partsSizes == null) return Collections.emptyMap(); - return partSizes.getOrDefault(grpId, Collections.emptyMap()); + return partsSizes.getOrDefault(grpId, Collections.emptyMap()); } /** @@ -324,7 +324,7 @@ public void setError(Exception ex) { boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || (partHistCntrs != null && partHistCntrsBytes == null) || - (partSizes != null && partsSizesBytes == null) || + (partsSizes != null && partsSizesBytes == null) || (err != null && errBytes == null); if (marshal) { @@ -343,8 +343,8 @@ public void setError(Exception ex) { if (partHistCntrs != null && partHistCntrsBytes == null) partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs); - if (partSizes != null && partsSizesBytes == null) - partSizesBytes0 = U.marshal(ctx, partSizes); + if (partsSizes != null && partsSizesBytes == null) + partSizesBytes0 = U.marshal(ctx, partsSizes); if (err != null && errBytes == null) errBytes0 = U.marshal(ctx, err); @@ -405,11 +405,11 @@ public void setError(Exception ex) { partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (partsSizesBytes != null && partSizes == null) { + if (partsSizesBytes != null && partsSizes == null) { if (compressed()) - partSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else - partSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } if (errBytes != null && err == null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index 818fd67ebe63d..d930c6bf89b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -1504,6 +1504,8 @@ public static void writeCacheMetrics(BinaryRawWriter writer, CacheMetrics metric writer.writeLong(metrics.getRebalancingStartTime()); writer.writeLong(metrics.getRebalanceClearingPartitionsLeft()); writer.writeLong(metrics.getCacheSize()); + writer.writeLong(metrics.getRebalancedKeys()); + writer.writeLong(metrics.getEstimatedRebalancingKeys()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java index 59f16b2a5e7cd..854bbd7ad1e2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java @@ -175,6 +175,12 @@ public class VisorCacheMetrics extends VisorDataTransferObject { /** Total number of partitions on current node. */ private int totalPartsCnt; + /** Number of already rebalanced keys. */ + private long rebalancedKeys; + + /** Number estimated to rebalance keys. */ + private long estimatedRebalancingKeys; + /** Number of currently rebalancing partitions on current node. */ private int rebalancingPartsCnt; @@ -276,6 +282,8 @@ public VisorCacheMetrics(IgniteEx ignite, String cacheName) { offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount(); totalPartsCnt = m.getTotalPartitionsCount(); + rebalancedKeys = m.getRebalancedKeys(); + estimatedRebalancingKeys = m.getEstimatedRebalancingKeys(); rebalancingPartsCnt = m.getRebalancingPartitionsCount(); keysToRebalanceLeft = m.getKeysToRebalanceLeft(); rebalancingKeysRate = m.getRebalancingKeysRate(); @@ -622,6 +630,20 @@ public int getTotalPartitionsCount() { return totalPartsCnt; } + /** + * @return Number of already rebalanced keys. + */ + public long getRebalancedKeys() { + return rebalancedKeys; + } + + /** + * @return Number estimated to rebalance keys. + */ + public long getEstimatedRebalancingKeys() { + return estimatedRebalancingKeys; + } + /** * @return Number of currently rebalancing partitions on current node. */ @@ -650,6 +672,11 @@ public long getRebalancingBytesRate() { return rebalancingBytesRate; } + /** {@inheritDoc} */ + @Override public byte getProtocolVersion() { + return V2; + } + /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { U.writeString(out, name); @@ -708,6 +735,9 @@ public long getRebalancingBytesRate() { out.writeObject(qryMetrics); out.writeLong(cacheSize); + + out.writeLong(rebalancedKeys); + out.writeLong(estimatedRebalancingKeys); } /** {@inheritDoc} */ @@ -768,6 +798,11 @@ public long getRebalancingBytesRate() { if (in.available() > 0) cacheSize = in.readLong(); + + if (protoVer > V1) { + rebalancedKeys = in.readLong(); + estimatedRebalancingKeys = in.readLong(); + } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index fda23a2a917c6..14b928164978c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -183,8 +183,9 @@ protected void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollecto List resCaches = res.getCaches(); + int partitions = 0; double total = 0; - double moving = 0; + double ready = 0; for (String cacheName : cacheProc.cacheNames()) { if (proxyCache(cacheName)) @@ -201,8 +202,16 @@ protected void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollecto CacheMetrics cm = ca.localMetrics(); - total += cm.getTotalPartitionsCount(); - moving += cm.getRebalancingPartitionsCount(); + partitions += cm.getTotalPartitionsCount(); + + long partTotal = cm.getEstimatedRebalancingKeys(); + long partReady = cm.getRebalancedKeys(); + + if (partReady >= partTotal) + partReady = Math.max(partTotal - 1, 0); + + total += partTotal; + ready += partReady; resCaches.add(new VisorCache(ignite, ca, arg.isCollectCacheMetrics())); } @@ -217,7 +226,10 @@ protected void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollecto } } - res.setRebalance(total > 0 ? (total - moving) / total : -1); + if (partitions == 0) + res.setRebalance(-1); + else + res.setRebalance(total > 0 ? ready / total : 1); } catch (Exception e) { res.setCachesEx(new VisorExceptionWrapper(e)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java index a05590935d3bb..af2dc633d825e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; @@ -159,12 +160,12 @@ public void testRebalance() throws Exception { assertTrue(rate1 > 0); assertTrue(rate2 > 0); - // rate1 has to be roughly twice more than rate2. - double ratio = ((double)rate2 / rate1) * 100; + // rate1 has to be roughly the same as rate2 + double ratio = ((double)rate2 / rate1); log.info("Ratio: " + ratio); - assertTrue(ratio > 40 && ratio < 60); + assertTrue(ratio > 0.9 && ratio < 1.1); } /** @@ -225,29 +226,27 @@ public void testRebalanceEstimateFinishTime() throws Exception { log.info("Wait until keys left will be less than: " + keysLine); - try { - while (finishRebalanceLatch.getCount() != 0) { - CacheMetrics m = ig2.cache(CACHE1).localMetrics(); + while (true) { + CacheMetrics m = ig2.cache(CACHE1).localMetrics(); - long keyLeft = m.getKeysToRebalanceLeft(); + long keyLeft = m.getKeysToRebalanceLeft(); - if (keyLeft > 0 && keyLeft < keysLine) - latch.countDown(); + if (keyLeft > 0 && keyLeft < keysLine) { + latch.countDown(); - log.info("Keys left: " + m.getKeysToRebalanceLeft()); + break; + } - try { - Thread.sleep(1_000); - } - catch (InterruptedException e) { - log.warning("Interrupt thread", e); + log.info("Keys left: " + m.getKeysToRebalanceLeft()); - Thread.currentThread().interrupt(); - } + try { + Thread.sleep(1_000); + } + catch (InterruptedException e) { + log.warning("Interrupt thread", e); + + Thread.currentThread().interrupt(); } - } - finally { - latch.countDown(); } } }); @@ -270,8 +269,15 @@ public void testRebalanceEstimateFinishTime() throws Exception { long timePassed = currTime - startTime; long timeLeft = finishTime - currTime; - assertTrue("Got timeout while waiting for rebalancing. Estimated left time: " + timeLeft, - finishRebalanceLatch.await(timeLeft + 2_000L, TimeUnit.MILLISECONDS)); + // TODO: finishRebalanceLatch gets countdown much earlier because of ForceRebalanceExchangeTask triggered by cache with delay +// assertTrue("Got timeout while waiting for rebalancing. Estimated left time: " + timeLeft, +// finishRebalanceLatch.await(timeLeft + 10_000L, TimeUnit.MILLISECONDS)); + + waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0; + } + }, timeLeft + 10_000L); log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft + ", Time to rebalance=" + (finishTime - startTime) + diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java index 44a15d500ca44..fe61e35b11dd1 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java @@ -473,5 +473,15 @@ private static class TestCacheMetrics implements CacheMetrics { @Override public long getCacheSize() { return 65; } + + /** {@inheritDoc} */ + @Override public long getRebalancedKeys() { + return 66; + } + + /** {@inheritDoc} */ + @Override public long getEstimatedRebalancingKeys() { + return 67; + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs index d775c05691076..e0e7301c4d233 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs @@ -654,5 +654,23 @@ public interface ICacheMetrics /// Number of clearing partitions for rebalance. /// long RebalanceClearingPartitionsLeft { get; } + + /// + /// Gets number of already rebalanced keys. + /// need to be cleared before actual rebalance start. + /// + /// + /// Number of already rebalanced keys. + /// + long RebalancedKeys { get; } + + /// + /// Gets number of estimated keys to rebalance. + /// need to be cleared before actual rebalance start. + /// + /// + /// Number of estimated keys to rebalance. + /// + long EstimatedRebalancingKeys { get; } } } \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs index 1fdc8771f832e..be6980d7cdd97 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs @@ -247,6 +247,12 @@ internal class CacheMetricsImpl : ICacheMetrics /** */ private readonly long _rebalancingClearingPartitionsLeft; + /** */ + private readonly long _rebalancedKeys; + + /** */ + private readonly long _estimatedRebalancedKeys; + /// /// Initializes a new instance of the class. /// @@ -327,6 +333,8 @@ public CacheMetricsImpl(IBinaryRawReader reader) _rebalancingStartTime = reader.ReadLong(); _rebalancingClearingPartitionsLeft = reader.ReadLong(); _cacheSize = reader.ReadLong(); + _rebalancedKeys = reader.ReadLong(); + _estimatedRebalancedKeys = reader.ReadLong(); } /** */ @@ -550,5 +558,11 @@ public CacheMetricsImpl(IBinaryRawReader reader) /** */ public long RebalanceClearingPartitionsLeft { get { return _rebalancingClearingPartitionsLeft; } } + + /** */ + public long RebalancedKeys { get { return _rebalancedKeys; } } + + /** */ + public long EstimatedRebalancingKeys { get { return _estimatedRebalancedKeys; } } } } \ No newline at end of file