From 6f3067c1b95bf3fef63cf5b268b3999c754ad95d Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 31 May 2024 12:38:23 +0800 Subject: [PATCH] fix(metrics): avoid using uninitialized counter metrics Signed-off-by: Shichao Nie --- .../s3/metrics/S3StreamMetricsManager.java | 16 ++++++++-------- .../stream/s3/metrics/wrapper/CounterMetric.java | 14 ++++++++------ .../s3/metrics/wrapper/MetricsWrapperTest.java | 4 ++-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 4e64185ca3..119f3caaf1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -368,7 +368,7 @@ public static void registerInflightWALUploadTasksCountSupplier( public static CounterMetric buildS3UploadSizeMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, s3UploadSizeInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, () -> s3UploadSizeInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -376,7 +376,7 @@ public static CounterMetric buildS3UploadSizeMetric() { public static CounterMetric buildS3DownloadSizeMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, s3DownloadSizeInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, () -> s3DownloadSizeInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -435,7 +435,7 @@ public static HistogramMetric buildReadAheadStageTimeMetric(MetricsLevel metrics public static CounterMetric buildObjectNumMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, objectNumInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, () -> objectNumInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -462,7 +462,7 @@ public static HistogramMetric buildObjectUploadSizeMetric(MetricsLevel metricsLe public static CounterMetric buildNetworkInboundUsageMetric(ThrottleStrategy strategy) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, AttributesUtils.buildAttributes(strategy), networkInboundUsageInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, AttributesUtils.buildAttributes(strategy), () -> networkInboundUsageInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -470,7 +470,7 @@ public static CounterMetric buildNetworkInboundUsageMetric(ThrottleStrategy stra public static CounterMetric buildNetworkOutboundUsageMetric(ThrottleStrategy strategy) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, AttributesUtils.buildAttributes(strategy), networkOutboundUsageInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, AttributesUtils.buildAttributes(strategy), () -> networkOutboundUsageInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -523,7 +523,7 @@ public static CounterMetric buildBlockCacheOpsThroughputMetric(String ops) { synchronized (BASE_ATTRIBUTES_LISTENERS) { CounterMetric metric = new CounterMetric(metricsConfig, Attributes.builder() .put(AttributeKey.stringKey("ops"), ops) - .build(), blockCacheOpsThroughput); + .build(), () -> blockCacheOpsThroughput); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -561,7 +561,7 @@ public static HistogramMetric buildGetIndexTimeMetric(MetricsLevel metricsLevel, public static CounterMetric buildCompactionReadSizeMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, compactionReadSizeInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, () -> compactionReadSizeInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } @@ -569,7 +569,7 @@ public static CounterMetric buildCompactionReadSizeMetric() { public static CounterMetric buildCompactionWriteSizeMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, compactionWriteSizeInTotal); + CounterMetric metric = new CounterMetric(metricsConfig, () -> compactionWriteSizeInTotal); BASE_ATTRIBUTES_LISTENERS.add(metric); return metric; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java index a34738b088..ecd9321f0d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java @@ -16,22 +16,24 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; +import java.util.function.Supplier; + public class CounterMetric extends ConfigurableMetric { - private final LongCounter longCounter; + private final Supplier longCounterSupplier; - public CounterMetric(MetricsConfig metricsConfig, LongCounter longCounter) { + public CounterMetric(MetricsConfig metricsConfig, Supplier longCounterSupplier) { super(metricsConfig, Attributes.empty()); - this.longCounter = longCounter; + this.longCounterSupplier = longCounterSupplier; } - public CounterMetric(MetricsConfig metricsConfig, Attributes extraAttributes, LongCounter longCounter) { + public CounterMetric(MetricsConfig metricsConfig, Attributes extraAttributes, Supplier longCounterSupplier) { super(metricsConfig, extraAttributes); - this.longCounter = longCounter; + this.longCounterSupplier = longCounterSupplier; } public boolean add(MetricsLevel metricsLevel, long value) { if (metricsLevel.isWithin(this.metricsLevel)) { - longCounter.add(value, attributes); + longCounterSupplier.get().add(value, attributes); return true; } return false; diff --git a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java index f987ebced8..9bb9de8f76 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java @@ -37,7 +37,7 @@ public class MetricsWrapperTest { @Test public void testConfigurableMetrics() { CounterMetric metric = new CounterMetric(new MetricsConfig(), Attributes.builder().put("extra", "v").build(), - Mockito.mock(LongCounter.class)); + () -> Mockito.mock(LongCounter.class)); Assertions.assertEquals(MetricsLevel.INFO, metric.metricsLevel); metric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, Attributes.builder().put("base", "v2").build())); @@ -60,7 +60,7 @@ public void testConfigurableMetrics() { @Test public void testMetricsLevel() { - CounterMetric metric = new CounterMetric(new MetricsConfig(MetricsLevel.INFO, null), Mockito.mock(LongCounter.class)); + CounterMetric metric = new CounterMetric(new MetricsConfig(MetricsLevel.INFO, null), () -> Mockito.mock(LongCounter.class)); Assertions.assertTrue(metric.add(MetricsLevel.INFO, 1)); Assertions.assertFalse(metric.add(MetricsLevel.DEBUG, 1)); metric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null));