Skip to content

Commit

Permalink
IGNITE-8554 Cache metrics: expose metrics with rebalance info about k…
Browse files Browse the repository at this point in the history
…eys - Fixes #4094.

Signed-off-by: Ivan Rakov <irakov@apache.org>
  • Loading branch information
ilantukh authored and glukos committed Jun 19, 2018
1 parent 49a565c commit cf09e76
Show file tree
Hide file tree
Showing 22 changed files with 446 additions and 73 deletions.
Expand Up @@ -508,6 +508,16 @@ public interface CacheMetrics {
*/ */
public int getRebalancingPartitionsCount(); public int getRebalancingPartitionsCount();


/**
* @return Number of already rebalanced keys.
*/
public long getRebalancedKeys();

/**
* @return Number estimated to rebalance keys.
*/
public long getEstimatedRebalancingKeys();

/** /**
* @return Estimated number of keys to be rebalanced on current node. * @return Estimated number of keys to be rebalanced on current node.
*/ */
Expand Down
Expand Up @@ -472,6 +472,7 @@ void onCacheGroupCreated(CacheGroupContext grp) {
clientTop.partitionMap(true), clientTop.partitionMap(true),
clientTop.fullUpdateCounters(), clientTop.fullUpdateCounters(),
Collections.<Integer>emptySet(), Collections.<Integer>emptySet(),
null,
null); null);
} }


Expand Down Expand Up @@ -530,7 +531,7 @@ else if (!fetchFuts.containsKey(grp.groupId())) {


grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);


grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null); grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null, null);


topFut.validate(grp, discoCache.allNodes()); topFut.validate(grp, discoCache.allNodes());
} }
Expand Down
Expand Up @@ -369,6 +369,16 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
return cache.clusterMetrics().getTotalPartitionsCount(); return cache.clusterMetrics().getTotalPartitionsCount();
} }


/** {@inheritDoc} */
@Override public long getRebalancedKeys() {
return cache.clusterMetrics().getRebalancedKeys();
}

/** {@inheritDoc} */
@Override public long getEstimatedRebalancingKeys() {
return cache.clusterMetrics().getEstimatedRebalancingKeys();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public int getRebalancingPartitionsCount() { @Override public int getRebalancingPartitionsCount() {
return cache.clusterMetrics().getRebalancingPartitionsCount(); return cache.clusterMetrics().getRebalancingPartitionsCount();
Expand Down
Expand Up @@ -370,6 +370,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
return cache.metrics0().getTotalPartitionsCount(); return cache.metrics0().getTotalPartitionsCount();
} }


@Override public long getRebalancedKeys() {
return cache.metrics0().getRebalancedKeys();
}

@Override public long getEstimatedRebalancingKeys() {
return cache.metrics0().getEstimatedRebalancingKeys();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public int getRebalancingPartitionsCount() { @Override public int getRebalancingPartitionsCount() {
return cache.metrics0().getRebalancingPartitionsCount(); return cache.metrics0().getRebalancingPartitionsCount();
Expand Down
Expand Up @@ -856,6 +856,16 @@ public EntriesStatMetrics getEntriesStat() {
return getEntriesStat().rebalancingPartitionsCount(); return getEntriesStat().rebalancingPartitionsCount();
} }


/** {@inheritDoc} */
@Override public long getRebalancedKeys() {
return rebalancedKeys.get();
}

/** {@inheritDoc} */
@Override public long getEstimatedRebalancingKeys() {
return estimatedRebalancingKeys.get();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public long getKeysToRebalanceLeft() { @Override public long getKeysToRebalanceLeft() {
return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get()); return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get());
Expand Down Expand Up @@ -935,7 +945,10 @@ public void rebalanceClearingPartitions(int partitions) {
* First rebalance supply message callback. * First rebalance supply message callback.
* @param keysCnt Estimated number of keys. * @param keysCnt Estimated number of keys.
*/ */
public void onRebalancingKeysCountEstimateReceived(long keysCnt) { public void onRebalancingKeysCountEstimateReceived(Long keysCnt) {
if (keysCnt == null)
return;

estimatedRebalancingKeys.addAndGet(keysCnt); estimatedRebalancingKeys.addAndGet(keysCnt);
} }


Expand Down
Expand Up @@ -197,6 +197,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** Rebalancing partitions count. */ /** Rebalancing partitions count. */
private int rebalancingPartitionsCnt; private int rebalancingPartitionsCnt;


/** Number of already rebalanced keys. */
private long rebalancedKeys;

/** Number estimated to rebalance keys. */
private long estimatedRebalancingKeys;

/** Keys to rebalance left. */ /** Keys to rebalance left. */
private long keysToRebalanceLeft; private long keysToRebalanceLeft;


Expand Down Expand Up @@ -331,6 +337,8 @@ public CacheMetricsSnapshot(CacheMetricsImpl m) {
totalPartitionsCnt = entriesStat.totalPartitionsCount(); totalPartitionsCnt = entriesStat.totalPartitionsCount();
rebalancingPartitionsCnt = entriesStat.rebalancingPartitionsCount(); rebalancingPartitionsCnt = entriesStat.rebalancingPartitionsCount();


rebalancedKeys = m.getRebalancedKeys();
estimatedRebalancingKeys = m.getEstimatedRebalancingKeys();
keysToRebalanceLeft = m.getKeysToRebalanceLeft(); keysToRebalanceLeft = m.getKeysToRebalanceLeft();
rebalancingBytesRate = m.getRebalancingBytesRate(); rebalancingBytesRate = m.getRebalancingBytesRate();
rebalancingKeysRate = m.getRebalancingKeysRate(); rebalancingKeysRate = m.getRebalancingKeysRate();
Expand Down Expand Up @@ -459,6 +467,8 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
else else
writeBehindErrorRetryCnt = -1; writeBehindErrorRetryCnt = -1;


rebalancedKeys += e.getRebalancedKeys();
estimatedRebalancingKeys += e.getEstimatedRebalancingKeys();
totalPartitionsCnt += e.getTotalPartitionsCount(); totalPartitionsCnt += e.getTotalPartitionsCount();
rebalancingPartitionsCnt += e.getRebalancingPartitionsCount(); rebalancingPartitionsCnt += e.getRebalancingPartitionsCount();
keysToRebalanceLeft += e.getKeysToRebalanceLeft(); keysToRebalanceLeft += e.getKeysToRebalanceLeft();
Expand Down Expand Up @@ -733,6 +743,14 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
return totalPartitionsCnt; return totalPartitionsCnt;
} }


@Override public long getRebalancedKeys() {
return rebalancedKeys;
}

@Override public long getEstimatedRebalancingKeys() {
return estimatedRebalancingKeys;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public int getRebalancingPartitionsCount() { @Override public int getRebalancingPartitionsCount() {
return rebalancingPartitionsCnt; return rebalancingPartitionsCnt;
Expand Down Expand Up @@ -926,6 +944,12 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
out.writeLong(keysToRebalanceLeft); out.writeLong(keysToRebalanceLeft);
out.writeLong(rebalancingBytesRate); out.writeLong(rebalancingBytesRate);
out.writeLong(rebalancingKeysRate); out.writeLong(rebalancingKeysRate);

out.writeLong(rebalancedKeys);
out.writeLong(estimatedRebalancingKeys);
out.writeLong(rebalanceStartTime);
out.writeLong(rebalanceFinishTime);
out.writeLong(rebalanceClearingPartitionsLeft);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -981,5 +1005,11 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics)
keysToRebalanceLeft = in.readLong(); keysToRebalanceLeft = in.readLong();
rebalancingBytesRate = in.readLong(); rebalancingBytesRate = in.readLong();
rebalancingKeysRate = in.readLong(); rebalancingKeysRate = in.readLong();

rebalancedKeys = in.readLong();
estimatedRebalancingKeys = in.readLong();
rebalanceStartTime = in.readLong();
rebalanceFinishTime = in.readLong();
rebalanceClearingPartitionsLeft = in.readLong();
} }
} }
Expand Up @@ -1121,6 +1121,8 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
affCache.similarAffinityKey()); affCache.similarAffinityKey());
} }


m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());

if (exchId != null) { if (exchId != null) {
CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters(); CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();


Expand Down Expand Up @@ -1154,6 +1156,8 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
m.addPartitionUpdateCounters(top.groupId(), cntrsMap); m.addPartitionUpdateCounters(top.groupId(), cntrsMap);
else else
m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap)); m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));

m.addPartitionSizes(top.groupId(), top.globalPartSizes());
} }
} }


Expand Down Expand Up @@ -1264,9 +1268,9 @@ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(


m.addPartitionUpdateCounters(grp.groupId(), m.addPartitionUpdateCounters(grp.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));

m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes());
} }

m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes());
} }
} }


Expand All @@ -1288,9 +1292,9 @@ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(


m.addPartitionUpdateCounters(top.groupId(), m.addPartitionUpdateCounters(top.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));

m.addPartitionSizes(top.groupId(), top.partitionSizes());
} }

m.addPartitionSizes(top.groupId(), top.partitionSizes());
} }


return m; return m;
Expand Down Expand Up @@ -1482,6 +1486,7 @@ else if (!grp.isLocal())
entry.getValue(), entry.getValue(),
null, null,
msg.partsToReload(cctx.localNodeId(), grpId), msg.partsToReload(cctx.localNodeId(), grpId),
msg.partitionSizes(grpId),
msg.topologyVersion()); msg.topologyVersion());
} }
} }
Expand Down
Expand Up @@ -119,6 +119,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** */ /** */
private final int parts; private final int parts;


/** */
private volatile Map<Integer, Long> globalPartSizes;

/** /**
* @param cctx Context. * @param cctx Context.
* @param discoCache Discovery data cache. * @param discoCache Discovery data cache.
Expand Down Expand Up @@ -707,6 +710,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD
GridDhtPartitionFullMap partMap, GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap, @Nullable CachePartitionFullCountersMap cntrMap,
Set<Integer> partsToReload, Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer) { @Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
Expand Down Expand Up @@ -810,6 +814,9 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD
if (cntrMap != null) if (cntrMap != null)
this.cntrMap = new CachePartitionFullCountersMap(cntrMap); this.cntrMap = new CachePartitionFullCountersMap(cntrMap);


if (partSizes != null)
this.globalPartSizes = partSizes;

consistencyCheck(); consistencyCheck();


if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down Expand Up @@ -1222,6 +1229,34 @@ private void removeNode(UUID nodeId) {
return Collections.emptyMap(); return Collections.emptyMap();
} }


/** {@inheritDoc} */
@Override @Nullable public Map<Integer, Long> globalPartSizes() {
lock.readLock().lock();

try {
if (globalPartSizes == null)
return Collections.emptyMap();

return Collections.unmodifiableMap(globalPartSizes);
}
finally {
lock.readLock().unlock();
}
}

/** {@inheritDoc} */
@Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) {
lock.writeLock().lock();

try {
this.globalPartSizes = partSizes;
}
finally {
lock.writeLock().unlock();
}
}


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
assert false : "Should not be called on non-affinity node"; assert false : "Should not be called on non-affinity node";
Expand Down
Expand Up @@ -287,6 +287,7 @@ public boolean update(
GridDhtPartitionFullMap partMap, GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap, @Nullable CachePartitionFullCountersMap cntrMap,
Set<Integer> partsToReload, Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer); @Nullable AffinityTopologyVersion msgTopVer);


/** /**
Expand Down Expand Up @@ -382,6 +383,16 @@ public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
*/ */
public void printMemoryStats(int threshold); public void printMemoryStats(int threshold);


/**
* @return Sizes of up-to-date partition versions in topology.
*/
Map<Integer, Long> globalPartSizes();

/**
* @param partSizes Sizes of up-to-date partition versions in topology.
*/
void globalPartSizes(@Nullable Map<Integer, Long> partSizes);

/** /**
* @param topVer Topology version. * @param topVer Topology version.
* @return {@code True} if rebalance process finished. * @return {@code True} if rebalance process finished.
Expand Down
Expand Up @@ -137,6 +137,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Partition update counter. */ /** Partition update counter. */
private final CachePartitionFullCountersMap cntrMap; private final CachePartitionFullCountersMap cntrMap;


/** */
private volatile Map<Integer, Long> globalPartSizes;

/** */ /** */
private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;


Expand Down Expand Up @@ -1329,6 +1332,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD
GridDhtPartitionFullMap partMap, GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap incomeCntrMap, @Nullable CachePartitionFullCountersMap incomeCntrMap,
Set<Integer> partsToReload, Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer) { @Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Updating full partition map [grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + log.debug("Updating full partition map [grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer +
Expand Down Expand Up @@ -1544,6 +1548,9 @@ else if (state == MOVING) {
updateRebalanceVersion(aff.assignment()); updateRebalanceVersion(aff.assignment());
} }


if (partSizes != null)
this.globalPartSizes = partSizes;

consistencyCheck(); consistencyCheck();


if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2624,6 +2631,33 @@ private void removeNode(UUID nodeId) {
} }
} }


/** {@inheritDoc} */
@Override public Map<Integer, Long> globalPartSizes() {
lock.readLock().lock();

try {
if (globalPartSizes == null)
return Collections.emptyMap();

return Collections.unmodifiableMap(globalPartSizes);
}
finally {
lock.readLock().unlock();
}
}

/** {@inheritDoc} */
@Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) {
lock.writeLock().lock();

try {
this.globalPartSizes = partSizes;
}
finally {
lock.writeLock().unlock();
}
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
AffinityTopologyVersion curTopVer = this.readyTopVer; AffinityTopologyVersion curTopVer = this.readyTopVer;
Expand Down

0 comments on commit cf09e76

Please sign in to comment.