diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc b/docs/_docs/cdc/change-data-capture-extensions.adoc index 9f6a1a700..ac6b29d39 100644 --- a/docs/_docs/cdc/change-data-capture-extensions.adoc +++ b/docs/_docs/cdc/change-data-capture-extensions.adoc @@ -173,6 +173,19 @@ Conflict resolution field should contain user provided monotonically increasing . If `conflictResolveField` if provided then field values comparison used to determine order. . Conflict resolution failed. Update will be ignored. +=== Conflict Resolver Metrics + +The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides the following metrics: + +[cols="35%,65%",opts="header"] +|=== +|Name |Description +| `AcceptedCount` | Count of accepted entries. +| `RejectedCount` | Count of rejected entries. +|=== + +These metrics are registered under `conflict-resolver` registry for each node configured with this plugin. + === Configuration example Configuration is done via Ignite node plugin: diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java index 970a562c2..3d42d53fc 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.lang.IgniteFuture; /** @@ -30,6 +31,9 @@ * @see CacheVersionConflictResolver */ public class CacheConflictResolutionManagerImpl implements CacheConflictResolutionManager { + /** Conflict resolver metrics registry name. */ + public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME = "conflict-resolver"; + /** Logger. */ private IgniteLogger log; @@ -72,20 +76,24 @@ public CacheConflictResolutionManagerImpl( @Override public CacheVersionConflictResolver conflictResolver() { CacheVersionConflictResolver rslvr; + MetricRegistryImpl mreg = cctx.grid().context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME); + if (resolver != null) rslvr = resolver; else if (conflictResolverLog.isDebugEnabled()) { rslvr = new DebugCacheVersionConflictResolverImpl( clusterId, conflictResolveField, - conflictResolverLog + conflictResolverLog, + mreg ); } else { rslvr = new CacheVersionConflictResolverImpl( clusterId, conflictResolveField, - conflictResolverLog + conflictResolverLog, + mreg ); } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 422d87016..010eb3d93 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -42,6 +44,18 @@ * */ public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver { + /** Accepted entries count name. */ + public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount"; + + /** Accepted entries count description. */ + public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted entries"; + + /** Rejected entries count name. */ + public static final String REJECTED_EVENTS_CNT = "RejectedCount"; + + /** Rejected entries count description. */ + public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected entries"; + /** * Cluster id. */ @@ -66,17 +80,32 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes @GridToStringInclude protected final boolean conflictResolveFieldEnabled; + /** Accepted entries count. */ + private final LongAdderMetric acceptedCnt; + + /** Rejected entries count. */ + private final LongAdderMetric rejectedCnt; + /** * @param clusterId Data center id. * @param conflictResolveField Field to resolve conflicts. * @param log Logger. + * @param mreg Metric registry. */ - public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) { + public CacheVersionConflictResolverImpl( + byte clusterId, + String conflictResolveField, + IgniteLogger log, + MetricRegistryImpl mreg + ) { this.clusterId = clusterId; this.conflictResolveField = conflictResolveField; this.log = log; conflictResolveFieldEnabled = conflictResolveField != null; + + acceptedCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC); + rejectedCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC); } /** {@inheritDoc} */ @@ -90,10 +119,14 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi boolean useNew = isUseNew(ctx, oldEntry, newEntry); - if (useNew) + if (useNew) { res.useNew(); - else + acceptedCnt.increment(); + } + else { res.useOld(); + rejectedCnt.increment(); + } return res; } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java index 9b1b49081..b19585a43 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java @@ -20,6 +20,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.util.typedef.internal.S; /** Debug aware resolver. */ @@ -28,9 +29,15 @@ public class DebugCacheVersionConflictResolverImpl extends CacheVersionConflictR * @param clusterId Data center id. * @param conflictResolveField Field to resolve conflicts. * @param log Logger. + * @param mreg Metric registry. */ - public DebugCacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) { - super(clusterId, conflictResolveField, log); + public DebugCacheVersionConflictResolverImpl( + byte clusterId, + String conflictResolveField, + IgniteLogger log, + MetricRegistryImpl mreg + ) { + super(clusterId, conflictResolveField, log, mreg); } /** {@inheritDoc} */ diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 20a0e5f27..7131f49a4 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -41,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,6 +51,9 @@ import static java.util.Collections.singletonMap; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl.CONFLICT_RESOLVER_METRICS_REGISTRY_NAME; +import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.ACCEPTED_EVENTS_CNT; +import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.REJECTED_EVENTS_CNT; /** * Cache conflict operations test. @@ -76,22 +81,25 @@ public static Collection parameters() { } /** */ - private static IgniteCache cache; + private static final byte FIRST_CLUSTER_ID = 1; /** */ - private static IgniteInternalCache cachex; + private static final byte SECOND_CLUSTER_ID = 2; /** */ - private static IgniteEx client; + private static final byte THIRD_CLUSTER_ID = 3; /** */ - private static final byte FIRST_CLUSTER_ID = 1; + private IgniteCache cache; /** */ - private static final byte SECOND_CLUSTER_ID = 2; + private IgniteInternalCache cachex; /** */ - private static final byte THIRD_CLUSTER_ID = 3; + private IgniteEx client; + + /** */ + private IgniteEx ign; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -104,24 +112,14 @@ public static Collection parameters() { return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg); } - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrid(1); - - client = startClientGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() { - cache = null; - cachex = null; - client = null; - } - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); + ign = startGrid(1); + + client = startClientGrid(2); + if (cachex == null || cachex.configuration().getAtomicityMode() != cacheMode) { if (cachex != null) client.cache(DEFAULT_CACHE_NAME).destroy(); @@ -133,6 +131,11 @@ public static Collection parameters() { } } + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + /** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */ @Test public void testSimpleUpdates() { @@ -197,6 +200,8 @@ public void testUpdatesReorderFromOtherCluster() throws Exception { // Remove with the higher topVer should succeed. putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true); + checkMetrics(4, 8); + key = key("UpdateClusterUpdateReorder3", otherClusterId); int topVer = 1; @@ -207,12 +212,16 @@ public void testUpdatesReorderFromOtherCluster() throws Exception { putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); + checkMetrics(5, 10); + // Remove with the equal or lower nodeOrder should ignored. removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); // Remove with the higher nodeOrder should succeed. putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true); + + checkMetrics(6, 12); } /** Tests cache operations for entry replicated from another cluster. */ @@ -334,4 +343,15 @@ private String key(String key, byte otherClusterId) { protected String conflictResolveField() { return null; } + + /** Checks metrics for conflict resolver. */ + protected void checkMetrics(int acceptedCnt, int rejectedCnt) { + MetricRegistryImpl mreg = ign.context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME); + + assertNotNull(mreg.findMetric(ACCEPTED_EVENTS_CNT)); + assertNotNull(mreg.findMetric(REJECTED_EVENTS_CNT)); + + assertEquals(acceptedCnt, ((LongAdderMetric)mreg.findMetric(ACCEPTED_EVENTS_CNT)).value()); + assertEquals(rejectedCnt, ((LongAdderMetric)mreg.findMetric(REJECTED_EVENTS_CNT)).value()); + } } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java index ae48a2c21..621aaf605 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java @@ -68,4 +68,9 @@ private static final class LwwConflictResolver implements CacheVersionConflictRe return res; } } + + /** {@inheritDoc} */ + @Override protected void checkMetrics(int acceptedCnt, int rejectedCnt) { + // No op. + } }