Skip to content
Permalink
Browse files
IGNITE-16386 Set clusterId on node restart (#98)
  • Loading branch information
nizhikov committed Jan 26, 2022
1 parent aa2a168 commit 3c2f8613111a49bcc86813468cd9016c598def79
Showing 10 changed files with 272 additions and 51 deletions.
@@ -76,7 +76,7 @@ protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {

for (CdcEvent evt : evts) {
if (log().isDebugEnabled())
log().debug("Event received [key=" + evt.key() + ']');
log().debug("Event received [evt=" + evt + ']');

IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> {
for (String cacheName : ignite().cacheNames()) {
@@ -17,6 +17,7 @@

package org.apache.ignite.cdc.conflictresolve;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
@@ -29,6 +30,12 @@
* @see CacheVersionConflictResolver
*/
public class CacheConflictResolutionManagerImpl<K, V> implements CacheConflictResolutionManager<K, V> {
/** Logger. */
private IgniteLogger log;

/** Logger for {@link CacheVersionConflictResolverImpl}. */
private IgniteLogger conflictResolverLog;

/**
* Field for conflict resolve.
* Value of this field will be used to compare two entries in case of conflicting changes.
@@ -38,28 +45,49 @@
*/
private final String conflictResolveField;

/** CLuster Id. */
private final byte clusterId;

/** Grid cache context. */
private GridCacheContext<K, V> cctx;

/**
* @param conflictResolveField Field to resolve conflicts.
*/
public CacheConflictResolutionManagerImpl(String conflictResolveField) {
public CacheConflictResolutionManagerImpl(String conflictResolveField, byte clusterId) {
this.conflictResolveField = conflictResolveField;
this.clusterId = clusterId;
}

/** {@inheritDoc} */
@Override public CacheVersionConflictResolver conflictResolver() {
return new CacheVersionConflictResolverImpl(
cctx.versions().dataCenterId(),
conflictResolveField,
cctx.logger(CacheVersionConflictResolverImpl.class)
);
CacheVersionConflictResolver rslvr;

if (conflictResolverLog.isDebugEnabled()) {
rslvr = new DebugCacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
conflictResolverLog
);
}
else {
rslvr = new CacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
conflictResolverLog
);
}

log.info("Conflict resolver created [rslvr=" + rslvr + ']');

return rslvr;
}

/** {@inheritDoc} */
@Override public void start(GridCacheContext<K, V> cctx) {
this.cctx = cctx;
this.log = cctx.logger(CacheConflictResolutionManagerImpl.class);
this.conflictResolverLog = cctx.logger(CacheVersionConflictResolverImpl.class);
}

/** {@inheritDoc} */
@@ -44,18 +44,23 @@
*/
private final String conflictResolveField;

/** Cluster Id. */
private final byte clusterId;

/**
* @param conflictResolveField Field to resolve conflicts.
* @param clusterId Cluster ID.
*/
public CacheVersionConflictResolverCachePluginProvider(String conflictResolveField) {
public CacheVersionConflictResolverCachePluginProvider(String conflictResolveField, byte clusterId) {
this.conflictResolveField = conflictResolveField;
this.clusterId = clusterId;
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public <T> T createComponent(Class<T> cls) {
if (cls.equals(CacheConflictResolutionManager.class))
return (T)new CacheConflictResolutionManagerImpl<>(conflictResolveField);
return (T)new CacheConflictResolutionManagerImpl<>(conflictResolveField, clusterId);

return null;
}
@@ -23,6 +23,8 @@
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
@@ -43,7 +45,8 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes
/**
* Cluster id.
*/
private final byte clusterId;
@GridToStringInclude
protected final byte clusterId;

/**
* Field for conflict resolve.
@@ -53,13 +56,15 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes
*
* @see CacheVersionConflictResolverImpl
*/
@GridToStringInclude
private final String conflictResolveField;

/** Logger. */
private final IgniteLogger log;
protected final IgniteLogger log;

/** If {@code true} then conflict resolving with the value field enabled. */
private final boolean conflictResolveFieldEnabled;
@GridToStringInclude
protected final boolean conflictResolveFieldEnabled;

/**
* @param clusterId Data center id.
@@ -100,7 +105,7 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi
* @return {@code True} is should use new entry.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private <K, V> boolean isUseNew(
protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
@@ -119,20 +124,8 @@ private <K, V> boolean isUseNew(
Object newVal = newEntry.value(ctx);

if (oldVal != null && newVal != null) {
Comparable oldResolveField;
Comparable newResolveField;

try {
if (oldVal instanceof BinaryObject) {
oldResolveField = ((BinaryObject)oldVal).field(conflictResolveField);
newResolveField = ((BinaryObject)newVal).field(conflictResolveField);
}
else {
oldResolveField = U.field(oldVal, conflictResolveField);
newResolveField = U.field(newVal, conflictResolveField);
}

return oldResolveField.compareTo(newResolveField) < 0;
return value(oldVal).compareTo(value(newVal)) < 0;
}
catch (Exception e) {
log.error(
@@ -143,10 +136,22 @@ private <K, V> boolean isUseNew(
}
}

log.error("Conflict can't be resolved, update ignored [key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId()
+ ", toCluster=" + oldEntry.dataCenterId() + ']');
log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
"[key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId() + ", toCluster=" + oldEntry.dataCenterId() + ']');

// Ignoring update.
return false;
}

/** @return Conflict resolve field value. */
protected Comparable value(Object val) {
return (val instanceof BinaryObject)
? ((BinaryObject)val).field(conflictResolveField)
: U.field(val, conflictResolveField);
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheVersionConflictResolverImpl.class, this);
}
}
@@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
@@ -40,15 +41,15 @@
* @see CacheVersionConflictResolver
*/
public class CacheVersionConflictResolverPluginProvider<C extends PluginConfiguration> implements PluginProvider<C> {
/** Plugin context. */
private PluginContext ctx;

/** Cluster id. */
private byte clusterId;

/** Cache names. */
private Set<String> caches;

/** Plugin name. */
private String name = "Cache version conflict resolver";

/**
* Field for conflict resolve.
* Value of this field will be used to compare two entries in case of conflicting changes.
@@ -61,14 +62,17 @@
/** Cache plugin provider. */
private CachePluginProvider<?> provider;

/** Log. */
private IgniteLogger log;

/** */
public CacheVersionConflictResolverPluginProvider() {
// No-op.
}

/** {@inheritDoc} */
@Override public String name() {
return "Cache version conflict resolver";
return name + " [clusterId=" + clusterId + ", conflictResolveField=" + conflictResolveField + ", caches=" + caches + ']';
}

/** {@inheritDoc} */
@@ -83,24 +87,28 @@ public CacheVersionConflictResolverPluginProvider() {

/** {@inheritDoc} */
@Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
this.ctx = ctx;

this.provider = new CacheVersionConflictResolverCachePluginProvider<>(conflictResolveField);
this.log = ctx.log(CacheVersionConflictResolverPluginProvider.class);
this.provider = new CacheVersionConflictResolverCachePluginProvider<>(conflictResolveField, clusterId);
}

/** {@inheritDoc} */
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
if (caches.contains(ctx.igniteCacheConfiguration().getName()))
String cacheName = ctx.igniteCacheConfiguration().getName();

if (caches.contains(cacheName)) {
log.info("ConflictResolver provider set for cache [cacheName=" + cacheName + ']');

return provider;
}

log.info("Skip ConflictResolver provider for cache [cacheName=" + cacheName + ']');

return null;
}

/** {@inheritDoc} */
@Override public void onIgniteStart() {
IgniteEx ign = (IgniteEx)ctx.grid();

ign.context().cache().context().versions().dataCenterId(clusterId);
// No-op.
}

/** {@inheritDoc} */
@@ -123,9 +131,14 @@ public void setConflictResolveField(String conflictResolveField) {
this.conflictResolveField = conflictResolveField;
}

/** @param name Plugin name. */
public void setName(String name) {
this.name = name;
}

/** {@inheritDoc} */
@Override public void start(PluginContext ctx) {
// No-op.
((IgniteEx)ctx.grid()).context().cache().context().versions().dataCenterId(clusterId);
}

/** {@inheritDoc} */
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cdc.conflictresolve;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.typedef.internal.S;

/** Debug aware resolver. */
public class DebugCacheVersionConflictResolverImpl extends CacheVersionConflictResolverImpl {
/**
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
*/
public DebugCacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
super(clusterId, conflictResolveField, log);
}

/** {@inheritDoc} */
@Override protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
) {
boolean res = super.isUseNew(ctx, oldEntry, newEntry);

Object oldVal = conflictResolveFieldEnabled ? oldEntry.value(ctx) : null;
Object newVal = conflictResolveFieldEnabled ? newEntry.value(ctx) : null;

if (oldVal != null)
oldVal = value(oldVal);

if (newVal != null)
newVal = value(newVal);

log.debug("isUseNew[" +
"start=" + oldEntry.isStartVersion() +
", oldVer=" + oldEntry.version() +
", newVer=" + newEntry.version() +
", old=" + oldVal +
", new=" + newVal +
", res=" + res + ']');

return res;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheVersionConflictResolverImpl.class, this);
}
}
@@ -27,7 +27,6 @@
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
@@ -176,12 +175,6 @@ private enum WaitDataMode {
cfgPlugin1.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
cfgPlugin1.setConflictResolveField("reqId");

CacheVersionConflictResolverPluginProvider<?> cfgPlugin2 = new CacheVersionConflictResolverPluginProvider<>();

cfgPlugin2.setClusterId(clusterId);
cfgPlugin2.setCaches(new HashSet<>(Arrays.asList("T1")));
cfgPlugin2.setConflictResolveField("ID");

cfg.setPluginProviders(cfgPlugin1);

cfg.setDataStorageConfiguration(new DataStorageConfiguration()

0 comments on commit 3c2f861

Please sign in to comment.