Skip to content

Commit

Permalink
IGNITE-22072 Fixed compilation after custom metrics (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladsz83 committed Apr 19, 2024
1 parent e8ebaf2 commit af7fdb0
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.resources.LoggerResource;

import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
Expand Down Expand Up @@ -98,14 +99,16 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
protected IgniteLogger log;

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
@Override public void start(MetricRegistry reg) {
A.notEmpty(caches, "caches");

cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.lifecycle.LifecycleBean;
import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.metric.MetricRegistry;

import static org.apache.ignite.lifecycle.LifecycleEventType.AFTER_NODE_STOP;
import static org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_STOP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.resources.LoggerResource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -294,7 +295,7 @@ private <T> void sendOneBatch(
}

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
@Override public void start(MetricRegistry reg) {
A.notNull(kafkaProps, "Kafka properties");
A.notNull(evtTopic, "Kafka topic");
A.notNull(metadataTopic, "Kafka metadata topic");
Expand Down Expand Up @@ -325,6 +326,8 @@ private <T> void sendOneBatch(
throw new RuntimeException(e);
}

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.client.thin.ClientBinary;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.metric.MetricRegistry;

/**
* Change Data Consumer that streams all data changes to destination cluster through Ignite thin client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.metric.LongMetric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
import org.apache.ignite.internal.cdc.TypeMappingImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
Expand Down Expand Up @@ -154,7 +154,7 @@ private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() {

GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class));

streamer.start(new MetricRegistry("test", null, null, log));
streamer.start(new MetricRegistryImpl("test", null, null, log));

return streamer;
}
Expand Down

0 comments on commit af7fdb0

Please sign in to comment.