From 9b46c24b334864ab25a3b95579cd4345c2e9ecf6 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Fri, 21 Jun 2024 17:49:22 +0300 Subject: [PATCH 1/7] IGNITE-22536 conflict resolver metric --- .../CacheConflictResolutionManagerImpl.java | 12 +++++- .../CacheVersionConflictResolverImpl.java | 43 +++++++++++++++++-- ...DebugCacheVersionConflictResolverImpl.java | 10 ++++- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java index 970a562c..8cac8ef4 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.lang.IgniteFuture; /** @@ -54,6 +55,9 @@ public class CacheConflictResolutionManagerImpl implements CacheConflictRe /** Grid cache context. */ private GridCacheContext cctx; + /** Conflict Resolver metric registry. */ + private MetricRegistryImpl mreg; + /** * @param conflictResolveField Field to resolve conflicts. * @param clusterId Cluster id. @@ -78,14 +82,16 @@ else if (conflictResolverLog.isDebugEnabled()) { rslvr = new DebugCacheVersionConflictResolverImpl( clusterId, conflictResolveField, - conflictResolverLog + conflictResolverLog, + mreg ); } else { rslvr = new CacheVersionConflictResolverImpl( clusterId, conflictResolveField, - conflictResolverLog + conflictResolverLog, + mreg ); } @@ -99,6 +105,8 @@ else if (conflictResolverLog.isDebugEnabled()) { this.cctx = cctx; this.log = cctx.logger(CacheConflictResolutionManagerImpl.class); this.conflictResolverLog = cctx.logger(CacheVersionConflictResolverImpl.class); + + mreg = cctx.grid().context().metric().registry("conflictResolver"); } /** {@inheritDoc} */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 422d8701..4ed16197 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -66,17 +68,48 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes @GridToStringInclude protected final boolean conflictResolveFieldEnabled; + /** Conflict Resolver metric registry. */ + private final MetricRegistryImpl mreg; + + /** Counter of new entry selected. */ + private final AtomicLongMetric newCnt; + + /** Counter of old entry selected. */ + private final AtomicLongMetric oldCnt; + + /** Count of the new version used name. */ + private final String NEW_EVENTS_CNT = "newSelectedCount"; + + /** Count of the new version used description. */ + private final String NEW_EVENTS_CNT_DESC = "Count of new version used"; + + /** Count of the old version used name. */ + private final String OLD_EVENTS_CNT = "oldSelectedCount"; + + /** Count of the old version used description. */ + private final String OLD_EVENTS_CNT_DESC = "Count of old version used"; + /** * @param clusterId Data center id. * @param conflictResolveField Field to resolve conflicts. * @param log Logger. */ - public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) { + public CacheVersionConflictResolverImpl( + byte clusterId, + String conflictResolveField, + IgniteLogger log, + MetricRegistryImpl mreg + ) { this.clusterId = clusterId; this.conflictResolveField = conflictResolveField; this.log = log; conflictResolveFieldEnabled = conflictResolveField != null; + + this.mreg = mreg; + + newCnt = mreg.longMetric(NEW_EVENTS_CNT, NEW_EVENTS_CNT_DESC); + oldCnt = mreg.longMetric(OLD_EVENTS_CNT, OLD_EVENTS_CNT_DESC); } /** {@inheritDoc} */ @@ -90,10 +123,14 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi boolean useNew = isUseNew(ctx, oldEntry, newEntry); - if (useNew) + if (useNew) { + newCnt.increment(); res.useNew(); - else + } + else { + oldCnt.increment(); res.useOld(); + } return res; } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java index 9b1b4908..5258f707 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java @@ -20,6 +20,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.util.typedef.internal.S; /** Debug aware resolver. */ @@ -29,8 +30,13 @@ public class DebugCacheVersionConflictResolverImpl extends CacheVersionConflictR * @param conflictResolveField Field to resolve conflicts. * @param log Logger. */ - public DebugCacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) { - super(clusterId, conflictResolveField, log); + public DebugCacheVersionConflictResolverImpl( + byte clusterId, + String conflictResolveField, + IgniteLogger log, + MetricRegistryImpl mreg + ) { + super(clusterId, conflictResolveField, log, mreg); } /** {@inheritDoc} */ From dda4605bf073fb27c0c7a4f32cde2e88e9ca428f Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Wed, 26 Jun 2024 19:07:48 +0300 Subject: [PATCH 2/7] IGNITE-22536 test --- .../CacheVersionConflictResolverImpl.java | 8 +-- .../cdc/CacheConflictOperationsTest.java | 65 ++++++++++++++----- ...flictOperationsWithCustomResolverTest.java | 5 ++ 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 4ed16197..4796745d 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -78,16 +78,16 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes private final AtomicLongMetric oldCnt; /** Count of the new version used name. */ - private final String NEW_EVENTS_CNT = "newSelectedCount"; + public static final String NEW_EVENTS_CNT = "newSelectedCount"; /** Count of the new version used description. */ - private final String NEW_EVENTS_CNT_DESC = "Count of new version used"; + public static final String NEW_EVENTS_CNT_DESC = "Count of new version used"; /** Count of the old version used name. */ - private final String OLD_EVENTS_CNT = "oldSelectedCount"; + public static final String OLD_EVENTS_CNT = "oldSelectedCount"; /** Count of the old version used description. */ - private final String OLD_EVENTS_CNT_DESC = "Count of old version used"; + public static final String OLD_EVENTS_CNT_DESC = "Count of old version used"; /** * @param clusterId Data center id. diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 20a0e5f2..3a0584e3 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -24,8 +24,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.function.Function; +import javax.management.DynamicMBean; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntry; @@ -49,6 +52,8 @@ import static java.util.Collections.singletonMap; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.NEW_EVENTS_CNT; +import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.OLD_EVENTS_CNT; /** * Cache conflict operations test. @@ -76,13 +81,16 @@ public static Collection parameters() { } /** */ - private static IgniteCache cache; + private IgniteCache cache; /** */ - private static IgniteInternalCache cachex; + private IgniteInternalCache cachex; /** */ - private static IgniteEx client; + private IgniteEx client; + + /** */ + private IgniteEx ign; /** */ private static final byte FIRST_CLUSTER_ID = 1; @@ -104,24 +112,14 @@ public static Collection parameters() { return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg); } - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrid(1); - - client = startClientGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() { - cache = null; - cachex = null; - client = null; - } - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); + ign = startGrid(1); + + client = startClientGrid(2); + if (cachex == null || cachex.configuration().getAtomicityMode() != cacheMode) { if (cachex != null) client.cache(DEFAULT_CACHE_NAME).destroy(); @@ -133,6 +131,11 @@ public static Collection parameters() { } } + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + /** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */ @Test public void testSimpleUpdates() { @@ -197,6 +200,8 @@ public void testUpdatesReorderFromOtherCluster() throws Exception { // Remove with the higher topVer should succeed. putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true); + checkMetrics(4, 8); + key = key("UpdateClusterUpdateReorder3", otherClusterId); int topVer = 1; @@ -207,12 +212,16 @@ public void testUpdatesReorderFromOtherCluster() throws Exception { putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); + checkMetrics(5, 10); + // Remove with the equal or lower nodeOrder should ignored. removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); // Remove with the higher nodeOrder should succeed. putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true); + + checkMetrics(6, 12); } /** Tests cache operations for entry replicated from another cluster. */ @@ -334,4 +343,26 @@ private String key(String key, byte otherClusterId) { protected String conflictResolveField() { return null; } + + /** Checks metrics for conflict resolver. */ + protected void checkMetrics(int newCnt, int oldCnt) { + Function> jmxVal = mxBean -> m -> { + try { + return mxBean.getAttribute(m); + } + catch (Exception e) { + throw new IgniteException(e); + } + }; + + DynamicMBean jmxCdcReg = metricRegistry(ign.name(), null, "conflictResolver"); + + checkResolverMetrics((Function)jmxVal.apply(jmxCdcReg), newCnt, oldCnt); + } + + /** */ + private void checkResolverMetrics(Function longMetric, int newCnt, int oldCnt) { + assertEquals(newCnt, (long)longMetric.apply(NEW_EVENTS_CNT)); + assertEquals(oldCnt, (long)longMetric.apply(OLD_EVENTS_CNT)); + } } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java index ae48a2c2..b1fa79a5 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java @@ -68,4 +68,9 @@ private static final class LwwConflictResolver implements CacheVersionConflictRe return res; } } + + /** {@inheritDoc} */ + @Override protected void checkMetrics(int newCnt, int oldCnt) { + // No op. + } } From 8dbd9a42d465628a02bd457194389cc216971ae5 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Thu, 27 Jun 2024 11:02:46 +0300 Subject: [PATCH 3/7] IGNITE-22536 docs --- docs/_docs/cdc/change-data-capture-extensions.adoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc b/docs/_docs/cdc/change-data-capture-extensions.adoc index 9f6a1a70..3e5e8ed9 100644 --- a/docs/_docs/cdc/change-data-capture-extensions.adoc +++ b/docs/_docs/cdc/change-data-capture-extensions.adoc @@ -173,6 +173,16 @@ Conflict resolution field should contain user provided monotonically increasing . If `conflictResolveField` if provided then field values comparison used to determine order. . Conflict resolution failed. Update will be ignored. +=== Metrics +Metrics are registered for each node configured with `CacheVersionConflictResolverPluginProvider`. + +[cols="35%,65%",opts="header"] +|=== +|Name |Description +| `newSelectedCount` | Count of new version used. +| `oldSelectedCount` | Count of old version used. +|=== + === Configuration example Configuration is done via Ignite node plugin: From dc7b923cc72274e6464fd7ef13b0b3ef226fde8d Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Wed, 3 Jul 2024 11:54:31 +0300 Subject: [PATCH 4/7] IGNITE-22536 metrics naming fixed --- .../cdc/change-data-capture-extensions.adoc | 6 ++-- .../CacheConflictResolutionManagerImpl.java | 10 +++--- .../CacheVersionConflictResolverImpl.java | 18 ++++------ ...DebugCacheVersionConflictResolverImpl.java | 1 + .../cdc/CacheConflictOperationsTest.java | 35 ++++++++++++------- 5 files changed, 38 insertions(+), 32 deletions(-) diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc b/docs/_docs/cdc/change-data-capture-extensions.adoc index 3e5e8ed9..d8a27ee5 100644 --- a/docs/_docs/cdc/change-data-capture-extensions.adoc +++ b/docs/_docs/cdc/change-data-capture-extensions.adoc @@ -174,13 +174,13 @@ Conflict resolution field should contain user provided monotonically increasing . Conflict resolution failed. Update will be ignored. === Metrics -Metrics are registered for each node configured with `CacheVersionConflictResolverPluginProvider`. +Metrics are registered for each node configured with `CacheVersionConflictResolverPluginProvider`. With that provider the corresponding metrics MBean is registered under `org.apache.ignite-instance-name.conflict-resolver`. [cols="35%,65%",opts="header"] |=== |Name |Description -| `newSelectedCount` | Count of new version used. -| `oldSelectedCount` | Count of old version used. +| `AcceptedCount` | Counter of accepted entries. +| `RejectedCount` | Counter of rejected entries. |=== === Configuration example diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java index 8cac8ef4..61ca3e29 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java @@ -31,6 +31,9 @@ * @see CacheVersionConflictResolver */ public class CacheConflictResolutionManagerImpl implements CacheConflictResolutionManager { + /** */ + public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME = "conflict-resolver"; + /** Logger. */ private IgniteLogger log; @@ -55,9 +58,6 @@ public class CacheConflictResolutionManagerImpl implements CacheConflictRe /** Grid cache context. */ private GridCacheContext cctx; - /** Conflict Resolver metric registry. */ - private MetricRegistryImpl mreg; - /** * @param conflictResolveField Field to resolve conflicts. * @param clusterId Cluster id. @@ -76,6 +76,8 @@ public CacheConflictResolutionManagerImpl( @Override public CacheVersionConflictResolver conflictResolver() { CacheVersionConflictResolver rslvr; + MetricRegistryImpl mreg = cctx.grid().context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME); + if (resolver != null) rslvr = resolver; else if (conflictResolverLog.isDebugEnabled()) { @@ -105,8 +107,6 @@ else if (conflictResolverLog.isDebugEnabled()) { this.cctx = cctx; this.log = cctx.logger(CacheConflictResolutionManagerImpl.class); this.conflictResolverLog = cctx.logger(CacheVersionConflictResolverImpl.class); - - mreg = cctx.grid().context().metric().registry("conflictResolver"); } /** {@inheritDoc} */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 4796745d..3536a3d1 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -68,9 +68,6 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes @GridToStringInclude protected final boolean conflictResolveFieldEnabled; - /** Conflict Resolver metric registry. */ - private final MetricRegistryImpl mreg; - /** Counter of new entry selected. */ private final AtomicLongMetric newCnt; @@ -78,21 +75,22 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes private final AtomicLongMetric oldCnt; /** Count of the new version used name. */ - public static final String NEW_EVENTS_CNT = "newSelectedCount"; + public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount"; /** Count of the new version used description. */ - public static final String NEW_EVENTS_CNT_DESC = "Count of new version used"; + public static final String ACCEPTED_EVENTS_CNT_DESC = "Counter of accepted entries"; /** Count of the old version used name. */ - public static final String OLD_EVENTS_CNT = "oldSelectedCount"; + public static final String REJECTED_EVENTS_CNT = "RejectedCount"; /** Count of the old version used description. */ - public static final String OLD_EVENTS_CNT_DESC = "Count of old version used"; + public static final String REJECTED_EVENTS_CNT_DESC = "Counter of rejected entries"; /** * @param clusterId Data center id. * @param conflictResolveField Field to resolve conflicts. * @param log Logger. + * @param mreg Metric registry. */ public CacheVersionConflictResolverImpl( byte clusterId, @@ -106,10 +104,8 @@ public CacheVersionConflictResolverImpl( conflictResolveFieldEnabled = conflictResolveField != null; - this.mreg = mreg; - - newCnt = mreg.longMetric(NEW_EVENTS_CNT, NEW_EVENTS_CNT_DESC); - oldCnt = mreg.longMetric(OLD_EVENTS_CNT, OLD_EVENTS_CNT_DESC); + newCnt = mreg.longMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC); + oldCnt = mreg.longMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC); } /** {@inheritDoc} */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java index 5258f707..b19585a4 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java @@ -29,6 +29,7 @@ public class DebugCacheVersionConflictResolverImpl extends CacheVersionConflictR * @param clusterId Data center id. * @param conflictResolveField Field to resolve conflicts. * @param log Logger. + * @param mreg Metric registry. */ public DebugCacheVersionConflictResolverImpl( byte clusterId, diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 3a0584e3..3b7394e0 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -52,8 +52,9 @@ import static java.util.Collections.singletonMap; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.NEW_EVENTS_CNT; -import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.OLD_EVENTS_CNT; +import static org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl.CONFLICT_RESOLVER_METRICS_REGISTRY_NAME; +import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.ACCEPTED_EVENTS_CNT; +import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.REJECTED_EVENTS_CNT; /** * Cache conflict operations test. @@ -346,7 +347,25 @@ protected String conflictResolveField() { /** Checks metrics for conflict resolver. */ protected void checkMetrics(int newCnt, int oldCnt) { - Function> jmxVal = mxBean -> m -> { + DynamicMBean jmxCdcReg = metricRegistry(ign.name(), null, CONFLICT_RESOLVER_METRICS_REGISTRY_NAME); + + checkResolverMetrics((Function)metricExtractor(jmxCdcReg), newCnt, oldCnt); + } + + /** */ + private void checkResolverMetrics(Function longMetric, int newCnt, int oldCnt) { + assertEquals(newCnt, (long)longMetric.apply(ACCEPTED_EVENTS_CNT)); + assertEquals(oldCnt, (long)longMetric.apply(REJECTED_EVENTS_CNT)); + } + + /** + * Creates lambda function, that converts metric name to its value. In other words it extracts jmx attribute for + * proposed MBean. + * @param mxBean {@link DynamicMBean} for which lambda should be created. + * @return {@link Function} which converts metric specified name to its value. + */ + private static Function metricExtractor(DynamicMBean mxBean) { + return m -> { try { return mxBean.getAttribute(m); } @@ -354,15 +373,5 @@ protected void checkMetrics(int newCnt, int oldCnt) { throw new IgniteException(e); } }; - - DynamicMBean jmxCdcReg = metricRegistry(ign.name(), null, "conflictResolver"); - - checkResolverMetrics((Function)jmxVal.apply(jmxCdcReg), newCnt, oldCnt); - } - - /** */ - private void checkResolverMetrics(Function longMetric, int newCnt, int oldCnt) { - assertEquals(newCnt, (long)longMetric.apply(NEW_EVENTS_CNT)); - assertEquals(oldCnt, (long)longMetric.apply(OLD_EVENTS_CNT)); } } From 5fc2bcf13cc347557bc2e84cfc0505ce17a33848 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Thu, 4 Jul 2024 15:17:34 +0300 Subject: [PATCH 5/7] IGNITE-22536 Metrics refactor and docs --- .../cdc/change-data-capture-extensions.adoc | 7 ++-- .../CacheVersionConflictResolverImpl.java | 14 ++++---- .../cdc/CacheConflictOperationsTest.java | 36 +++++-------------- ...flictOperationsWithCustomResolverTest.java | 2 +- 4 files changed, 21 insertions(+), 38 deletions(-) diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc b/docs/_docs/cdc/change-data-capture-extensions.adoc index d8a27ee5..8254de17 100644 --- a/docs/_docs/cdc/change-data-capture-extensions.adoc +++ b/docs/_docs/cdc/change-data-capture-extensions.adoc @@ -173,8 +173,9 @@ Conflict resolution field should contain user provided monotonically increasing . If `conflictResolveField` if provided then field values comparison used to determine order. . Conflict resolution failed. Update will be ignored. -=== Metrics -Metrics are registered for each node configured with `CacheVersionConflictResolverPluginProvider`. With that provider the corresponding metrics MBean is registered under `org.apache.ignite-instance-name.conflict-resolver`. +=== Conflict Resolver Metrics + +The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides with usefull metrics. [cols="35%,65%",opts="header"] |=== @@ -183,6 +184,8 @@ Metrics are registered for each node configured with `CacheVersionConflictResolv | `RejectedCount` | Counter of rejected entries. |=== +These metrics are registered under `conflict-resolver` registry for each node configured with this plugin. + === Configuration example Configuration is done via Ignite node plugin: diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 3536a3d1..f7f94582 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; -import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -69,10 +69,10 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes protected final boolean conflictResolveFieldEnabled; /** Counter of new entry selected. */ - private final AtomicLongMetric newCnt; + private final LongAdderMetric newCnt; /** Counter of old entry selected. */ - private final AtomicLongMetric oldCnt; + private final LongAdderMetric oldCnt; /** Count of the new version used name. */ public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount"; @@ -104,8 +104,8 @@ public CacheVersionConflictResolverImpl( conflictResolveFieldEnabled = conflictResolveField != null; - newCnt = mreg.longMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC); - oldCnt = mreg.longMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC); + newCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC); + oldCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC); } /** {@inheritDoc} */ @@ -120,12 +120,12 @@ public CacheVersionConflictResolverImpl( boolean useNew = isUseNew(ctx, oldEntry, newEntry); if (useNew) { - newCnt.increment(); res.useNew(); + newCnt.increment(); } else { - oldCnt.increment(); res.useOld(); + oldCnt.increment(); } return res; diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 3b7394e0..3c2bceb3 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -24,11 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.function.Function; -import javax.management.DynamicMBean; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntry; @@ -44,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import org.junit.runner.RunWith; @@ -346,32 +345,13 @@ protected String conflictResolveField() { } /** Checks metrics for conflict resolver. */ - protected void checkMetrics(int newCnt, int oldCnt) { - DynamicMBean jmxCdcReg = metricRegistry(ign.name(), null, CONFLICT_RESOLVER_METRICS_REGISTRY_NAME); + protected void checkMetrics(int acceptedCnt, int rejectedCnt) { + MetricRegistryImpl mreg = ign.context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME); - checkResolverMetrics((Function)metricExtractor(jmxCdcReg), newCnt, oldCnt); - } - - /** */ - private void checkResolverMetrics(Function longMetric, int newCnt, int oldCnt) { - assertEquals(newCnt, (long)longMetric.apply(ACCEPTED_EVENTS_CNT)); - assertEquals(oldCnt, (long)longMetric.apply(REJECTED_EVENTS_CNT)); - } + assertNotNull(mreg.findMetric(ACCEPTED_EVENTS_CNT)); + assertNotNull(mreg.findMetric(REJECTED_EVENTS_CNT)); - /** - * Creates lambda function, that converts metric name to its value. In other words it extracts jmx attribute for - * proposed MBean. - * @param mxBean {@link DynamicMBean} for which lambda should be created. - * @return {@link Function} which converts metric specified name to its value. - */ - private static Function metricExtractor(DynamicMBean mxBean) { - return m -> { - try { - return mxBean.getAttribute(m); - } - catch (Exception e) { - throw new IgniteException(e); - } - }; + assertEquals(acceptedCnt, ((LongAdderMetric)mreg.findMetric(ACCEPTED_EVENTS_CNT)).value()); + assertEquals(rejectedCnt, ((LongAdderMetric)mreg.findMetric(REJECTED_EVENTS_CNT)).value()); } } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java index b1fa79a5..621aaf60 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java @@ -70,7 +70,7 @@ private static final class LwwConflictResolver implements CacheVersionConflictRe } /** {@inheritDoc} */ - @Override protected void checkMetrics(int newCnt, int oldCnt) { + @Override protected void checkMetrics(int acceptedCnt, int rejectedCnt) { // No op. } } From f7a778cd8b965fbfc8cc8b160f11f95d0c6fa4f5 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Thu, 4 Jul 2024 15:48:20 +0300 Subject: [PATCH 6/7] IGNITE-22536 Code style fixes --- .../cdc/change-data-capture-extensions.adoc | 6 +-- .../CacheConflictResolutionManagerImpl.java | 2 +- .../CacheVersionConflictResolverImpl.java | 40 +++++++++---------- .../cdc/CacheConflictOperationsTest.java | 14 +++---- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc b/docs/_docs/cdc/change-data-capture-extensions.adoc index 8254de17..ac6b29d3 100644 --- a/docs/_docs/cdc/change-data-capture-extensions.adoc +++ b/docs/_docs/cdc/change-data-capture-extensions.adoc @@ -175,13 +175,13 @@ Conflict resolution field should contain user provided monotonically increasing === Conflict Resolver Metrics -The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides with usefull metrics. +The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides the following metrics: [cols="35%,65%",opts="header"] |=== |Name |Description -| `AcceptedCount` | Counter of accepted entries. -| `RejectedCount` | Counter of rejected entries. +| `AcceptedCount` | Count of accepted entries. +| `RejectedCount` | Count of rejected entries. |=== These metrics are registered under `conflict-resolver` registry for each node configured with this plugin. diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java index 61ca3e29..3d42d53f 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java @@ -31,7 +31,7 @@ * @see CacheVersionConflictResolver */ public class CacheConflictResolutionManagerImpl implements CacheConflictResolutionManager { - /** */ + /** Conflict resolver metrics registry name. */ public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME = "conflict-resolver"; /** Logger. */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index f7f94582..fb14932e 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -44,6 +44,18 @@ * */ public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver { + /** Count of the new version used name. */ + public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount"; + + /** Count of the new version used description. */ + public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted entries"; + + /** Count of the old version used name. */ + public static final String REJECTED_EVENTS_CNT = "RejectedCount"; + + /** Count of the old version used description. */ + public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected entries"; + /** * Cluster id. */ @@ -68,23 +80,11 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes @GridToStringInclude protected final boolean conflictResolveFieldEnabled; - /** Counter of new entry selected. */ - private final LongAdderMetric newCnt; + /** Count of the new version used. */ + private final LongAdderMetric acceptedCnt; - /** Counter of old entry selected. */ - private final LongAdderMetric oldCnt; - - /** Count of the new version used name. */ - public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount"; - - /** Count of the new version used description. */ - public static final String ACCEPTED_EVENTS_CNT_DESC = "Counter of accepted entries"; - - /** Count of the old version used name. */ - public static final String REJECTED_EVENTS_CNT = "RejectedCount"; - - /** Count of the old version used description. */ - public static final String REJECTED_EVENTS_CNT_DESC = "Counter of rejected entries"; + /** Count of the old version used. */ + private final LongAdderMetric rejectedCnt; /** * @param clusterId Data center id. @@ -104,8 +104,8 @@ public CacheVersionConflictResolverImpl( conflictResolveFieldEnabled = conflictResolveField != null; - newCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC); - oldCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC); + acceptedCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC); + rejectedCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC); } /** {@inheritDoc} */ @@ -121,11 +121,11 @@ public CacheVersionConflictResolverImpl( if (useNew) { res.useNew(); - newCnt.increment(); + acceptedCnt.increment(); } else { res.useOld(); - oldCnt.increment(); + rejectedCnt.increment(); } return res; diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 3c2bceb3..7131f49a 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -81,25 +81,25 @@ public static Collection parameters() { } /** */ - private IgniteCache cache; + private static final byte FIRST_CLUSTER_ID = 1; /** */ - private IgniteInternalCache cachex; + private static final byte SECOND_CLUSTER_ID = 2; /** */ - private IgniteEx client; + private static final byte THIRD_CLUSTER_ID = 3; /** */ - private IgniteEx ign; + private IgniteCache cache; /** */ - private static final byte FIRST_CLUSTER_ID = 1; + private IgniteInternalCache cachex; /** */ - private static final byte SECOND_CLUSTER_ID = 2; + private IgniteEx client; /** */ - private static final byte THIRD_CLUSTER_ID = 3; + private IgniteEx ign; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { From 082cf367328e4b5510bd6861a3658f5186acbc5b Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Thu, 4 Jul 2024 16:00:58 +0300 Subject: [PATCH 7/7] IGNITE-22536 javadocs --- .../CacheVersionConflictResolverImpl.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index fb14932e..010eb3d9 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -44,16 +44,16 @@ * */ public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver { - /** Count of the new version used name. */ + /** Accepted entries count name. */ public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount"; - /** Count of the new version used description. */ + /** Accepted entries count description. */ public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted entries"; - /** Count of the old version used name. */ + /** Rejected entries count name. */ public static final String REJECTED_EVENTS_CNT = "RejectedCount"; - /** Count of the old version used description. */ + /** Rejected entries count description. */ public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected entries"; /** @@ -80,10 +80,10 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes @GridToStringInclude protected final boolean conflictResolveFieldEnabled; - /** Count of the new version used. */ + /** Accepted entries count. */ private final LongAdderMetric acceptedCnt; - /** Count of the old version used. */ + /** Rejected entries count. */ private final LongAdderMetric rejectedCnt; /**