Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22536 CDC metrics for rejected entries by conflict resolver #278

Merged
merged 7 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/_docs/cdc/change-data-capture-extensions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ Conflict resolution field should contain user provided monotonically increasing
. If `conflictResolveField` if provided then field values comparison used to determine order.
. Conflict resolution failed. Update will be ignored.

=== Conflict Resolver Metrics

The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides the following metrics:

[cols="35%,65%",opts="header"]
|===
|Name |Description
| `AcceptedCount` | Count of accepted entries.
| `RejectedCount` | Count of rejected entries.
|===

These metrics are registered under `conflict-resolver` registry for each node configured with this plugin.

=== Configuration example
Configuration is done via Ignite node plugin:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.lang.IgniteFuture;

/**
Expand All @@ -30,6 +31,9 @@
* @see CacheVersionConflictResolver
*/
public class CacheConflictResolutionManagerImpl<K, V> implements CacheConflictResolutionManager<K, V> {
/** Conflict resolver metrics registry name. */
public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME = "conflict-resolver";

/** Logger. */
private IgniteLogger log;

Expand Down Expand Up @@ -72,20 +76,24 @@ public CacheConflictResolutionManagerImpl(
@Override public CacheVersionConflictResolver conflictResolver() {
CacheVersionConflictResolver rslvr;

MetricRegistryImpl mreg = cctx.grid().context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);

if (resolver != null)
rslvr = resolver;
else if (conflictResolverLog.isDebugEnabled()) {
rslvr = new DebugCacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
conflictResolverLog
conflictResolverLog,
mreg
);
}
else {
rslvr = new CacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
conflictResolverLog
conflictResolverLog,
mreg
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -42,6 +44,18 @@
* </ul>
*/
public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver {
/** Accepted entries count name. */
public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount";

/** Accepted entries count description. */
public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted entries";

/** Rejected entries count name. */
public static final String REJECTED_EVENTS_CNT = "RejectedCount";

/** Rejected entries count description. */
public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected entries";

/**
* Cluster id.
*/
Expand All @@ -66,17 +80,32 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes
@GridToStringInclude
protected final boolean conflictResolveFieldEnabled;

/** Accepted entries count. */
private final LongAdderMetric acceptedCnt;

/** Rejected entries count. */
private final LongAdderMetric rejectedCnt;

/**
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
* @param mreg Metric registry.
*/
public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
public CacheVersionConflictResolverImpl(
byte clusterId,
String conflictResolveField,
IgniteLogger log,
MetricRegistryImpl mreg
) {
this.clusterId = clusterId;
this.conflictResolveField = conflictResolveField;
this.log = log;

conflictResolveFieldEnabled = conflictResolveField != null;

acceptedCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC);
rejectedCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC);
}

/** {@inheritDoc} */
Expand All @@ -90,10 +119,14 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi

boolean useNew = isUseNew(ctx, oldEntry, newEntry);

if (useNew)
if (useNew) {
res.useNew();
else
acceptedCnt.increment();
}
else {
res.useOld();
rejectedCnt.increment();
}

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.typedef.internal.S;

/** Debug aware resolver. */
Expand All @@ -28,9 +29,15 @@ public class DebugCacheVersionConflictResolverImpl extends CacheVersionConflictR
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
* @param mreg Metric registry.
*/
public DebugCacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
super(clusterId, conflictResolveField, log);
public DebugCacheVersionConflictResolverImpl(
byte clusterId,
String conflictResolveField,
IgniteLogger log,
MetricRegistryImpl mreg
) {
super(clusterId, conflictResolveField, log, mreg);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -49,6 +51,9 @@
import static java.util.Collections.singletonMap;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl.CONFLICT_RESOLVER_METRICS_REGISTRY_NAME;
import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.ACCEPTED_EVENTS_CNT;
import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.REJECTED_EVENTS_CNT;

/**
* Cache conflict operations test.
Expand Down Expand Up @@ -76,22 +81,25 @@ public static Collection<?> parameters() {
}

/** */
private static IgniteCache<String, ConflictResolvableTestData> cache;
private static final byte FIRST_CLUSTER_ID = 1;

/** */
private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
private static final byte SECOND_CLUSTER_ID = 2;

/** */
private static IgniteEx client;
private static final byte THIRD_CLUSTER_ID = 3;

/** */
private static final byte FIRST_CLUSTER_ID = 1;
private IgniteCache<String, ConflictResolvableTestData> cache;

/** */
private static final byte SECOND_CLUSTER_ID = 2;
private IgniteInternalCache<BinaryObject, BinaryObject> cachex;

/** */
private static final byte THIRD_CLUSTER_ID = 3;
private IgniteEx client;

/** */
private IgniteEx ign;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
Expand All @@ -104,24 +112,14 @@ public static Collection<?> parameters() {
return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
}

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrid(1);

client = startClientGrid(2);
}

/** {@inheritDoc} */
@Override protected void afterTestsStopped() {
cache = null;
cachex = null;
client = null;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

ign = startGrid(1);

client = startClientGrid(2);

if (cachex == null || cachex.configuration().getAtomicityMode() != cacheMode) {
if (cachex != null)
client.cache(DEFAULT_CACHE_NAME).destroy();
Expand All @@ -133,6 +131,11 @@ public static Collection<?> parameters() {
}
}

/** {@inheritDoc} */
@Override protected void afterTest() {
stopAllGrids();
}

/** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */
@Test
public void testSimpleUpdates() {
Expand Down Expand Up @@ -197,6 +200,8 @@ public void testUpdatesReorderFromOtherCluster() throws Exception {
// Remove with the higher topVer should succeed.
putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true);

checkMetrics(4, 8);

key = key("UpdateClusterUpdateReorder3", otherClusterId);

int topVer = 1;
Expand All @@ -207,12 +212,16 @@ public void testUpdatesReorderFromOtherCluster() throws Exception {
putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);

checkMetrics(5, 10);

// Remove with the equal or lower nodeOrder should ignored.
removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);

// Remove with the higher nodeOrder should succeed.
putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true);

checkMetrics(6, 12);
}

/** Tests cache operations for entry replicated from another cluster. */
Expand Down Expand Up @@ -334,4 +343,15 @@ private String key(String key, byte otherClusterId) {
protected String conflictResolveField() {
return null;
}

/** Checks metrics for conflict resolver. */
protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
MetricRegistryImpl mreg = ign.context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);

assertNotNull(mreg.findMetric(ACCEPTED_EVENTS_CNT));
assertNotNull(mreg.findMetric(REJECTED_EVENTS_CNT));

assertEquals(acceptedCnt, ((LongAdderMetric)mreg.findMetric(ACCEPTED_EVENTS_CNT)).value());
assertEquals(rejectedCnt, ((LongAdderMetric)mreg.findMetric(REJECTED_EVENTS_CNT)).value());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ private static final class LwwConflictResolver implements CacheVersionConflictRe
return res;
}
}

/** {@inheritDoc} */
@Override protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
// No op.
}
}