From af7fdb098f1820327dc24c9873cc93287506505d Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Apr 2024 13:09:48 +0300 Subject: [PATCH] IGNITE-22072 Fixed compilation after custom metrics (#260) --- .../org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java | 7 +++++-- .../org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java | 2 +- .../apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 7 +++++-- .../ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java | 2 +- .../org/apache/ignite/cdc/AbstractReplicationTest.java | 2 +- .../ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java | 4 ++-- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java index bbe184e14..149e03e47 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java @@ -27,11 +27,12 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryTypeImpl; -import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.resources.LoggerResource; import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY; @@ -98,7 +99,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { protected IgniteLogger log; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry reg) { A.notEmpty(caches, "caches"); cachesIds = caches.stream() @@ -106,6 +107,8 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { .boxed() .collect(Collectors.toSet()); + MetricRegistryImpl mreg = (MetricRegistryImpl)reg; + this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC); this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC); this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java index 146fc52dc..618c61d70 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java @@ -26,11 +26,11 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.lang.IgniteExperimental; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.lifecycle.LifecycleEventType; +import org.apache.ignite.metric.MetricRegistry; import static org.apache.ignite.lifecycle.LifecycleEventType.AFTER_NODE_STOP; import static org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_STOP; diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java index 0dce94427..3ae5223be 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java @@ -39,7 +39,7 @@ import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; import org.apache.ignite.internal.binary.BinaryTypeImpl; import org.apache.ignite.internal.cdc.CdcMain; -import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.F; @@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteExperimental; +import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.resources.LoggerResource; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -294,7 +295,7 @@ private void sendOneBatch( } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry reg) { A.notNull(kafkaProps, "Kafka properties"); A.notNull(evtTopic, "Kafka topic"); A.notNull(metadataTopic, "Kafka metadata topic"); @@ -325,6 +326,8 @@ private void sendOneBatch( throw new RuntimeException(e); } + MetricRegistryImpl mreg = (MetricRegistryImpl)reg; + this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC); this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC); this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java index 3a6acadb5..bc7af745b 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java @@ -27,8 +27,8 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.client.thin.ClientBinary; -import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.metric.MetricRegistry; /** * Change Data Consumer that streams all data changes to destination cluster through Ignite thin client. diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java index 6b7b27882..785480a5d 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java @@ -56,13 +56,13 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; -import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.metric.LongMetric; diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java index 6167503a4..26b95d9ec 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler; import org.apache.ignite.internal.cdc.TypeMappingImpl; -import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.platform.PlatformType; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; @@ -154,7 +154,7 @@ private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() { GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class)); - streamer.start(new MetricRegistry("test", null, null, log)); + streamer.start(new MetricRegistryImpl("test", null, null, log)); return streamer; }