Skip to content
Permalink
Browse files
IGNITE-14449 CDC Metadata replication added.
  • Loading branch information
nizhikov committed Apr 19, 2022
1 parent 981070a commit 57e14df79b44de6fe9c69984042179acf9e2d252
Showing 13 changed files with 623 additions and 87 deletions.
@@ -21,18 +21,27 @@
import java.util.Map;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration;
import org.apache.ignite.cdc.kafka.KafkaToIgniteMetadataUpdater;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;

/**
@@ -57,6 +66,34 @@ public abstract class CdcEventsApplier {
/** */
private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);

/**
* Update closure.
* @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache)
*/
private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> updClo =
new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() {
/** {@inheritDoc} */
@Override public void applyx(
IgniteInternalCache<BinaryObject, BinaryObject> cache
) throws IgniteCheckedException {
cache.putAllConflict(updBatch);
}
};

/**
* Remove closure.
* @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache)
*/
private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> rmvClo =
new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() {
/** {@inheritDoc} */
@Override public void applyx(
IgniteInternalCache<BinaryObject, BinaryObject> cache
) throws IgniteCheckedException {
cache.removeAllConflict(rmvBatch);
}
};

/**
* @param maxBatchSize Maximum batch size.
*/
@@ -84,7 +121,11 @@ protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
// IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
ignite().cache(cacheName);

return ignite().cachex(cacheName).keepBinary();
IgniteInternalCache<Object, Object> cache0 = ignite().cachex(cacheName);

assert cache0 != null;

return cache0.keepBinary();
}
}

@@ -140,20 +181,19 @@ protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
* @param applyUpd Apply update batch flag supplier.
* @param applyRmv Apply remove batch flag supplier.
* @return Number of applied events.
* @throws IgniteCheckedException In case of error.
*/
private int applyIf(
IgniteInternalCache<BinaryObject, BinaryObject> cache,
BooleanSupplier applyUpd,
BooleanSupplier applyRmv
) throws IgniteCheckedException {
) {
int evtsApplied = 0;

if (applyUpd.getAsBoolean()) {
if (log().isDebugEnabled())
log().debug("Applying put batch [cache=" + cache.name() + ']');

cache.putAllConflict(updBatch);
applyWithRetry(updClo, cache);

evtsApplied += updBatch.size();

@@ -164,7 +204,7 @@ private int applyIf(
if (log().isDebugEnabled())
log().debug("Applying remove batch [cache=" + cache.name() + ']');

cache.removeAllConflict(rmvBatch);
applyWithRetry(rmvClo, cache);

evtsApplied += rmvBatch.size();

@@ -174,11 +214,94 @@ private int applyIf(
return evtsApplied;
}

/**
* Executes closure with retry logic.
* Metadata update thread polls metadata asynchronously with {@link KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()} interval.
* This means metadata updates can be seen later than data updates.
* In this case {@link BinaryObjectException} can point to absence of metadata.
* To overcome lack of metadata invoke {@link #updateMetadata()} and retry closure.
*
* @param clo Closure to apply.
* @param cache Cache for closure.
* @see KafkaToIgniteMetadataUpdater
* @see KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()
*/
private void applyWithRetry(
IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> clo,
IgniteInternalCache<BinaryObject, BinaryObject> cache
) {
try {
clo.apply(cache);
}
catch (Exception e) {
// Retry only if cause is BinaryObjectException.
if (!X.hasCause(e, BinaryObjectException.class))
throw e;

// Retry after metadata update.
updateMetadata();

clo.apply(cache);
}
}

/** @return {@code True} if update batch should be applied. */
private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
return map.size() >= maxBatchSize || map.containsKey(key);
}

/**
* Register {@code meta} inside {@code ign} instance.
*
* @param ign Ignite instance.
* @param log Logger.
* @param meta Binary metadata to register.
*/
public static void registerBinaryMeta(IgniteEx ign, IgniteLogger log, BinaryMetadata meta) {
ign.context().cacheObjects().addMeta(
meta.typeId(),
new BinaryTypeImpl(
((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext(),
meta
),
false
);

if (log.isInfoEnabled())
log.info("BinaryMeta[meta=" + meta + ']');
}

/**
* Register {@code mapping} inside {@code ign} instance.
*
* @param ign Ignite instance.
* @param log Logger.
* @param mapping Type mapping to register.
*/
public static void registerMapping(IgniteEx ign, IgniteLogger log, TypeMapping mapping) {
assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;

try {
ign.context().marshallerContext().registerClassName(
(byte)mapping.platformType().ordinal(),
mapping.typeId(),
mapping.typeName(),
false
);

if (log.isInfoEnabled())
log.info("Mapping[mapping=" + mapping + ']');
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/** Update metadata if possible. */
protected void updateMetadata() {
// No-op.
}

/** @return Ignite instance. */
protected abstract IgniteEx ignite();

@@ -24,10 +24,13 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
@@ -59,9 +62,21 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
/** */
public static final String EVTS_CNT = "EventsCount";

/** */
public static final String TYPES_CNT = "TypesCount";

/** */
public static final String MAPPINGS_CNT = "MappingsCount";

/** */
public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";

/** */
public static final String TYPES_CNT_DESC = "Count of received binary types events";

/** */
public static final String MAPPINGS_CNT_DESC = "Count of received mappings events";

/** */
public static final String LAST_EVT_TIME = "LastEventTime";

@@ -89,6 +104,12 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
/** Count of events applied to destination cluster. */
protected AtomicLongMetric evtsCnt;

/** Count of binary types applied to destination cluster. */
protected AtomicLongMetric typesCnt;

/** Count of mappings applied to destination cluster. */
protected AtomicLongMetric mappingsCnt;

/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -114,6 +135,8 @@ public IgniteToIgniteCdcStreamer() {
dest = (IgniteEx)Ignition.start(destIgniteCfg);

this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
}

@@ -143,6 +166,30 @@ public IgniteToIgniteCdcStreamer() {
}
}

/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(t -> {
BinaryMetadata meta = ((BinaryTypeImpl)t).metadata();

registerBinaryMeta(dest, log, meta);

typesCnt.increment();
});

lastEvtTs.value(System.currentTimeMillis());
}

/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
mappings.forEachRemaining(m -> {
registerMapping(dest, log, m);

mappingsCnt.increment();
});

lastEvtTs.value(System.currentTimeMillis());
}

/** {@inheritDoc} */
@Override public void stop() {
dest.close();

0 comments on commit 57e14df

Please sign in to comment.