Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Metrics : + LongValueMetric example #11332

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ public void runX() throws Exception {
cdcModeState = state.loadCdcMode();

if (walState != null) {
committedSegmentIdx.value(walState.get1().index());
committedSegmentOffset.value(walState.get1().fileOffset());
committedSegmentIdx.set(walState.get1().index());
committedSegmentOffset.set(walState.get1().fileOffset());
}

consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
Expand Down Expand Up @@ -572,9 +572,9 @@ private boolean consumeSegment(Path segment) {

long segmentIdx = segmentIndex(segment);

lastSegmentConsumptionTs.value(System.currentTimeMillis());
lastSegmentConsumptionTs.set(System.currentTimeMillis());

curSegmentIdx.value(segmentIdx);
curSegmentIdx.set(segmentIdx);

if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
if (consumeSegmentPassively(builder))
Expand Down Expand Up @@ -847,8 +847,8 @@ private void saveStateAndRemoveProcessed(T2<WALPointer, Integer> curState) throw

state.saveWal(curState);

committedSegmentIdx.value(curState.get1().index());
committedSegmentOffset.value(curState.get1().fileOffset());
committedSegmentIdx.set(curState.get1().index());
committedSegmentOffset.set(curState.get1().fileOffset());

Iterator<Path> rmvIter = processedSegments.iterator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public boolean onRecords(

evtsCnt.increment();

lastEvtTs.value(System.currentTimeMillis());
lastEvtTs.set(System.currentTimeMillis());

return next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public IoStatisticsHolderCache(String grpName, int grpId, GridMetricManager mmgr

MetricRegistryImpl mreg = mmgr.registry(metricRegistryName());

mreg.longMetric("startTime", null).value(U.currentTimeMillis());
mreg.longMetric("startTime", null).set(U.currentTimeMillis());
mreg.objectMetric("name", String.class, null).value(grpName);
mreg.intMetric("grpId", null).value(grpId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public IoStatisticsHolderIndex(

MetricRegistryImpl mreg = mmgr.registry(metricRegistryName());

mreg.longMetric("startTime", null).value(U.currentTimeMillis());
mreg.longMetric("startTime", null).set(U.currentTimeMillis());
mreg.objectMetric("name", String.class, null).value(grpName);
mreg.objectMetric("indexName", String.class, null).value(idxName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMetrics;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.metric.LongValueMetric;
import org.apache.ignite.metric.MetricRegistry;
import org.jetbrains.annotations.Nullable;

Expand All @@ -59,7 +58,7 @@ public class CacheGroupMetricsImpl {
private final CacheGroupContext ctx;

/** Number of local partitions initialized on current node. */
private final AtomicLongMetric initLocPartitionsNum;
private final LongValueMetric initLocPartitionsNum;

/** Memory page metrics. Will be {@code null} on client nodes. */
@Nullable
Expand Down Expand Up @@ -91,7 +90,7 @@ public CacheGroupMetricsImpl(CacheGroupContext ctx) {

persistenceEnabled = !kernalCtx.clientNode() && CU.isPersistentCache(cacheCfg, dsCfg);

MetricRegistryImpl mreg = kernalCtx.metric().registry(metricGroupName());
MetricRegistry mreg = kernalCtx.metric().registry(metricGroupName());

mreg.register("Caches", this::getCaches, List.class, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public CacheMetricsImpl(GridCacheContext<?, ?> cctx, boolean isNear) {
rebalanceStartTime = mreg.longMetric("RebalanceStartTime",
"Rebalance start time");

rebalanceStartTime.value(-1);
rebalanceStartTime.set(-1);

estimatedRebalancingKeys = mreg.longMetric("EstimatedRebalancingKeys",
"Number estimated to rebalance keys.");
Expand Down Expand Up @@ -1456,14 +1456,14 @@ public void clearRebalanceCounters() {

rebalancingKeysRate.reset();

rebalanceStartTime.value(-1L);
rebalanceStartTime.set(-1L);
}

/**
*
*/
public void startRebalance(long delay) {
rebalanceStartTime.value(delay + U.currentTimeMillis());
rebalanceStartTime.set(delay + U.currentTimeMillis());
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ public DataRegionMetricsImpl(
"dirty pages during the ongoing checkpoint.");

mreg.longMetric("InitialSize", "Initial memory region size in bytes defined by its data region.")
.value(dataRegionCfg.getInitialSize());
.set(dataRegionCfg.getInitialSize());

mreg.longMetric("MaxSize", "Maximum memory region size in bytes defined by its data region.")
.value(dataRegionCfg.getMaxSize());
.set(dataRegionCfg.getMaxSize());

if (persistenceEnabled) {
// Reserve 1 sec, page ts can be slightly lower than currentTimeMillis, due to applied to ts mask. This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ public void onWallRollOver() {
if (!metricsEnabled)
return;

lastWalSegmentRollOverTime.value(U.currentTimeMillis());
lastWalSegmentRollOverTime.set(U.currentTimeMillis());
}

/**
Expand Down Expand Up @@ -688,23 +688,23 @@ public void onCheckpoint(
if (!metricsEnabled)
return;

lastCpBeforeLockDuration.value(beforeLockDuration);
lastCpLockWaitDuration.value(lockWaitDuration);
lastCpListenersExecuteDuration.value(listenersExecuteDuration);
lastCpMarkDuration.value(markDuration);
lastCpLockHoldDuration.value(lockHoldDuration);
lastCpPagesWriteDuration.value(pagesWriteDuration);
lastCpFsyncDuration.value(fsyncDuration);
lastCpWalRecordFsyncDuration.value(walRecordFsyncDuration);
lastCpWriteEntryDuration.value(writeEntryDuration);
lastCpSplitAndSortPagesDuration.value(splitAndSortPagesDuration);
lastCpDuration.value(duration);
lastCpStart.value(start);
lastCpTotalPages.value(totalPages);
lastCpDataPages.value(dataPages);
lastCpCowPages.value(cowPages);
this.storageSize.value(storageSize);
this.sparseStorageSize.value(sparseStorageSize);
lastCpBeforeLockDuration.set(beforeLockDuration);
lastCpLockWaitDuration.set(lockWaitDuration);
lastCpListenersExecuteDuration.set(listenersExecuteDuration);
lastCpMarkDuration.set(markDuration);
lastCpLockHoldDuration.set(lockHoldDuration);
lastCpPagesWriteDuration.set(pagesWriteDuration);
lastCpFsyncDuration.set(fsyncDuration);
lastCpWalRecordFsyncDuration.set(walRecordFsyncDuration);
lastCpWriteEntryDuration.set(writeEntryDuration);
lastCpSplitAndSortPagesDuration.set(splitAndSortPagesDuration);
lastCpDuration.set(duration);
lastCpStart.set(start);
lastCpTotalPages.set(totalPages);
lastCpDataPages.set(dataPages);
lastCpCowPages.set(cowPages);
this.storageSize.set(storageSize);
this.sparseStorageSize.set(sparseStorageSize);

totalCheckpointTime.add(duration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public GridCacheQueryMetricsAdapter(GridMetricManager mmgr, String cacheName, bo
MetricRegistryImpl mreg = mmgr.registry(MetricUtils.cacheMetricsRegistryName(cacheName, isNear));

minTime = mreg.longMetric("QueryMinimalTime", null);
minTime.value(Long.MAX_VALUE);
minTime.set(Long.MAX_VALUE);

maxTime = mreg.longMetric("QueryMaximumTime", null);
sumTime = mreg.longAdderMetric("QuerySumTime", null);
Expand Down Expand Up @@ -134,7 +134,7 @@ public QueryMetrics snapshot() {

/** Resets query metrics. */
public void reset() {
minTime.value(Long.MAX_VALUE);
minTime.set(Long.MAX_VALUE);
maxTime.reset();
sumTime.reset();
execs.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void onTxManagerStarted() {
* Transaction commit callback.
*/
public void onTxCommit() {
commitTime.value(U.currentTimeMillis());
commitTime.set(U.currentTimeMillis());

txCommits.increment();
}
Expand All @@ -212,7 +212,7 @@ public void onTxCommit() {
* Transaction rollback callback.
*/
public void onTxRollback() {
rollbackTime.value(U.currentTimeMillis());
rollbackTime.set(U.currentTimeMillis());

txRollbacks.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void onLocalJoin(long topVer) {

last = new GridCacheVersion(0, order.get(), 0, dataCenterId);

lastDataVer.value(last.order());
lastDataVer.set(last.order());

isolatedStreamerVer = new GridCacheVersion(1 + offset, 0, 1, dataCenterId);
}
Expand Down Expand Up @@ -301,7 +301,7 @@ private GridCacheVersion next(long topVer, long nodeOrder, byte dataCenterId) {

last = next;

lastDataVer.value(ord);
lastDataVer.set(ord);

return next;
}
Expand All @@ -322,7 +322,7 @@ private GridCacheVersion nextForLoad(long topVer, long nodeOrder, byte dataCente

last = next;

lastDataVer.value(ord);
lastDataVer.set(ord);

return next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
Expand All @@ -92,6 +90,8 @@
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.metric.LongValueMetric;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.spi.collision.CollisionSpi;
import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
import org.apache.ignite.spi.metric.DoubleMetric;
Expand Down Expand Up @@ -249,28 +249,28 @@ public class GridJobProcessor extends GridProcessorAdapter {
private final AtomicLong metricsLastUpdateTstamp = new AtomicLong();

/** Number of started jobs. */
final AtomicLongMetric startedJobsMetric;
private final LongValueMetric startedJobsMetric;

/** Number of active jobs currently executing. */
final AtomicLongMetric activeJobsMetric;
private final LongValueMetric activeJobsMetric;

/** Number of currently queued jobs waiting to be executed. */
final AtomicLongMetric waitingJobsMetric;
private final LongValueMetric waitingJobsMetric;

/** Number of cancelled jobs that are still running. */
final AtomicLongMetric canceledJobsMetric;
private final LongValueMetric canceledJobsMetric;

/** Number of jobs rejected after more recent collision resolution operation. */
final AtomicLongMetric rejectedJobsMetric;
private final LongValueMetric rejectedJobsMetric;

/** Number of finished jobs. */
final AtomicLongMetric finishedJobsMetric;
private final LongValueMetric finishedJobsMetric;

/** Total job execution time. */
final AtomicLongMetric totalExecutionTimeMetric;
private final LongValueMetric totalExecutionTimeMetric;

/** Total time jobs spent on waiting queue. */
final AtomicLongMetric totalWaitTimeMetric;
private final LongValueMetric totalWaitTimeMetric;

/** */
private boolean stopping;
Expand Down Expand Up @@ -349,7 +349,7 @@ public GridJobProcessor(GridKernalContext ctx) {

cpuLoadMetric = ctx.metric().registry(SYS_METRICS).findMetric(CPU_LOAD);

MetricRegistryImpl mreg = ctx.metric().registry(JOBS_METRICS);
MetricRegistry mreg = ctx.metric().registry(JOBS_METRICS);

startedJobsMetric = mreg.longMetric(STARTED, "Number of started jobs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.DoubleMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
Expand All @@ -53,6 +52,7 @@
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.metric.IgniteMetrics;
import org.apache.ignite.metric.LongValueMetric;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.spi.metric.HistogramMetric;
import org.apache.ignite.spi.metric.Metric;
Expand Down Expand Up @@ -723,23 +723,23 @@ private double getCpuLoad() {
/** Memory usage metrics. */
public class MemoryUsageMetrics {
/** @see MemoryUsage#getInit() */
private final AtomicLongMetric init;
private final LongValueMetric init;

/** @see MemoryUsage#getUsed() */
private final AtomicLongMetric used;
private final LongValueMetric used;

/** @see MemoryUsage#getCommitted() */
private final AtomicLongMetric committed;
private final LongValueMetric committed;

/** @see MemoryUsage#getMax() */
private final AtomicLongMetric max;
private final LongValueMetric max;

/**
* @param grp Metric registry.
* @param metricNamePrefix Metric name prefix.
*/
public MemoryUsageMetrics(String grp, String metricNamePrefix) {
MetricRegistryImpl mreg = registry(grp);
MetricRegistry mreg = registry(grp);

this.init = mreg.longMetric(metricName(metricNamePrefix, "init"), null);
this.used = mreg.longMetric(metricName(metricNamePrefix, "used"), null);
Expand All @@ -749,10 +749,10 @@ public MemoryUsageMetrics(String grp, String metricNamePrefix) {

/** Updates metric to the provided values. */
public void update(MemoryUsage usage) {
init.value(usage.getInit());
used.value(usage.getUsed());
committed.value(usage.getCommitted());
max.value(usage.getMax());
init.set(usage.getInit());
used.set(usage.getUsed());
committed.set(usage.getCommitted());
max.set(usage.getMax());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,8 @@ public IntMetricImpl intMetric(String name, @Nullable String desc) {
return addMetric(name, new IntMetricImpl(metricName(regName, name), desc));
}

/**
* Creates and register named metric.
* Returned instance are thread safe.
*
* @param name Name.
* @param desc Description.
* @return {@link AtomicLongMetric}.
*/
public AtomicLongMetric longMetric(String name, @Nullable String desc) {
/** {@inheritDoc} */
@Override public AtomicLongMetric longMetric(String name, @Nullable String desc) {
return addMetric(name, new AtomicLongMetric(metricName(regName, name), desc));
}

Expand Down
Loading