Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Micrometer registry serialization error due to failed meter #1741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -61,6 +61,7 @@ Even when matching on the main class name or on system properties,
** Checks the Java version before attaching to avoid attachment on unsupported JVMs.
* Cassandra instrumentation - {pull}1712[#1712]
* Log correlation supports JBoss Logging - {pull}1737[#1737]
* Update Byte-buddy to `1.11.0` - {pull}1769[#1769]

[float]
===== Bug fixes
Expand All @@ -70,6 +71,7 @@ Even when matching on the main class name or on system properties,
* Fix rounded number format for non-english locales - {pull}1728[#1728]
* Fix `NullPointerException` on legacy Apache client instrumentation when host is `null` - {pull}1746[#1746]
* Apply consistent proxy class exclusion heuristic - {pull}1738[#1738]
* Fix micrometer serialization error - {pull}1741[#1741]

[float]
===== Refactors
Expand Down
Expand Up @@ -29,7 +29,7 @@
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.36.1)",
value = "by gRPC proto compiler (version 1.37.0)",
comments = "Source: rpc.proto")
public final class HelloGrpc {

Expand Down
Expand Up @@ -26,6 +26,8 @@

import co.elastic.apm.agent.configuration.MetricsConfiguration;
import co.elastic.apm.agent.report.serialize.DslJsonSerializer;
import co.elastic.apm.agent.sdk.weakmap.WeakMapSupplier;
import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentSet;
import com.dslplatform.json.DslJson;
import com.dslplatform.json.JsonWriter;
import com.dslplatform.json.NumberConverter;
Expand All @@ -38,6 +40,8 @@
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -52,16 +56,23 @@
public class MicrometerMeterRegistrySerializer {

private static final byte NEW_LINE = (byte) '\n';

private static final Logger logger = LoggerFactory.getLogger(MicrometerMeterRegistrySerializer.class);

private final DslJson<Object> dslJson = new DslJson<>(new DslJson.Settings<>());
private final StringBuilder replaceBuilder = new StringBuilder();
private final MetricsConfiguration config;

private final WeakConcurrentSet<Meter> internallyDisabledMeters = WeakMapSupplier.createSet();
private int previousSize = 0;

public MicrometerMeterRegistrySerializer(MetricsConfiguration config) {
this.config = config;
}

Iterable<Meter> getFailedMeters() {
return internallyDisabledMeters;
}

public JsonWriter serialize(final Map<Meter.Id, Meter> metersById, final long epochMicros) {
int newSize = (int) (Math.max(previousSize, 512) * 1.25);
JsonWriter jw = dslJson.newWriter(newSize);
Expand Down Expand Up @@ -101,31 +112,50 @@ void serializeMetricSet(List<Tag> tags, List<Meter> meters, long epochMicros, St
DslJsonSerializer.writeFieldName("samples", jw);
jw.writeByte(JsonWriter.OBJECT_START);
boolean hasValue = false;
for (int i = 0, size = meters.size(); i < size; i++) {
Meter meter = meters.get(i);
if (meter instanceof Timer) {
Timer timer = (Timer) meter;
hasValue = serializeTimer(jw, timer.getId(), timer.count(), timer.totalTime(TimeUnit.MICROSECONDS), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof FunctionTimer) {
FunctionTimer timer = (FunctionTimer) meter;
hasValue = serializeTimer(jw, timer.getId(), (long) timer.count(), timer.totalTime(TimeUnit.MICROSECONDS), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof LongTaskTimer) {
LongTaskTimer timer = (LongTaskTimer) meter;
hasValue = serializeTimer(jw, timer.getId(), timer.activeTasks(), timer.duration(TimeUnit.MICROSECONDS), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof DistributionSummary) {
DistributionSummary timer = (DistributionSummary) meter;
hasValue = serializeDistributionSummary(jw, timer.getId(), timer.count(), timer.totalAmount(), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof Gauge) {
Gauge gauge = (Gauge) meter;
hasValue = serializeValue(gauge.getId(), gauge.value(), hasValue, jw, replaceBuilder, dedotMetricName);

} else if (meter instanceof Counter) {
Counter counter = (Counter) meter;
hasValue = serializeValue(counter.getId(), counter.count(), hasValue, jw, replaceBuilder, dedotMetricName);
} else if (meter instanceof FunctionCounter) {
FunctionCounter counter = (FunctionCounter) meter;
hasValue = serializeValue(counter.getId(), counter.count(), hasValue, jw, replaceBuilder, dedotMetricName);
ClassLoader originalContextCL = Thread.currentThread().getContextClassLoader();
try {
for (int i = 0, size = meters.size(); i < size; i++) {
Meter meter = meters.get(i);
if (internallyDisabledMeters.contains(meter)) {
continue;
}
try {
// Setting the Meter CL as the context class loader during the Meter query operations
Thread.currentThread().setContextClassLoader(meter.getClass().getClassLoader());
if (meter instanceof Timer) {
Timer timer = (Timer) meter;
hasValue = serializeTimer(jw, timer.getId(), timer.count(), timer.totalTime(TimeUnit.MICROSECONDS), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof FunctionTimer) {
FunctionTimer timer = (FunctionTimer) meter;
hasValue = serializeTimer(jw, timer.getId(), (long) timer.count(), timer.totalTime(TimeUnit.MICROSECONDS), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof LongTaskTimer) {
LongTaskTimer timer = (LongTaskTimer) meter;
hasValue = serializeTimer(jw, timer.getId(), timer.activeTasks(), timer.duration(TimeUnit.MICROSECONDS), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof DistributionSummary) {
DistributionSummary timer = (DistributionSummary) meter;
hasValue = serializeDistributionSummary(jw, timer.getId(), timer.count(), timer.totalAmount(), hasValue, replaceBuilder, dedotMetricName);
} else if (meter instanceof Gauge) {
Gauge gauge = (Gauge) meter;
hasValue = serializeValue(gauge.getId(), gauge.value(), hasValue, jw, replaceBuilder, dedotMetricName);
} else if (meter instanceof Counter) {
Counter counter = (Counter) meter;
hasValue = serializeValue(counter.getId(), counter.count(), hasValue, jw, replaceBuilder, dedotMetricName);
} else if (meter instanceof FunctionCounter) {
FunctionCounter counter = (FunctionCounter) meter;
hasValue = serializeValue(counter.getId(), counter.count(), hasValue, jw, replaceBuilder, dedotMetricName);
}
} catch (Throwable throwable) {
String meterName = meter.getId().getName();
logger.warn("Failed to serialize Micrometer meter \"{}\" with tags {}. This meter will be " +
"excluded from serialization going forward.", meterName, tags);
logger.debug("Detailed info about failure to register Micrometer meter \"" + meterName +
"\": ", throwable);
internallyDisabledMeters.add(meter);
}
}
} finally {
Thread.currentThread().setContextClassLoader(originalContextCL);
}
jw.writeByte(JsonWriter.OBJECT_END);
}
Expand Down
Expand Up @@ -131,4 +131,8 @@ public void accept(Meter meter) {
WeakConcurrentSet<MeterRegistry> getMeterRegistries() {
return meterRegistries;
}

Iterable<Meter> getFailedMeters() {
return serializer.getFailedMeters();
}
}
@@ -1,3 +1,27 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018 - 2021 Elastic and contributors
* %%
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* #L%
*/
package co.elastic.apm.agent.micrometer;

import co.elastic.apm.agent.configuration.MetricsConfiguration;
Expand Down
Expand Up @@ -324,6 +324,81 @@ void tryToSerializeInvalidGaugeValues() {
}
}

@Test
void testWorkingWithProperContextCL() {
List<Tag> tags = List.of(Tag.of("foo", "bar"));
meterRegistry.gauge("gauge1", tags, 42, v -> {
if (Thread.currentThread().getContextClassLoader() == null) {
throw new RuntimeException("Context CL cannot be null when querying this gauge");
}
return 42D;
});
JsonNode metricSet;
ClassLoader originalContextCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(null);
metricSet = getSingleMetricSet();
} finally {
Thread.currentThread().setContextClassLoader(originalContextCL);
}
assertThat(metricSet.get("metricset").get("samples").get("gauge1").get("value").doubleValue()).isEqualTo(42D);
assertThat(metricsReporter.getFailedMeters()).isEmpty();
}

@Test
void tryExclusionOfFailedGauge_singleGauge() {
List<Tag> tags = List.of(Tag.of("foo", "bar"));
meterRegistry.gauge("gauge1", tags, 42, v -> {
throw new RuntimeException("Failed to read gauge value");
});
assertThat(metricsReporter.getFailedMeters()).isEmpty();
JsonNode metricSet = getSingleMetricSet();
assertThat(metricSet.get("metricset").get("samples"))
.describedAs("value of %s is not expected to be written to json", "gauge1")
.isEmpty();

getSingleMetricSet();
assertThat(metricsReporter.getFailedMeters().iterator().next().getId().getName()).isEqualTo("gauge1");
}

@Test
void tryExclusionOfFailedGauge_firstFails() {
List<Tag> tags = List.of(Tag.of("foo", "bar"));
meterRegistry.gauge("gauge1", tags, 42, v -> {
throw new RuntimeException("Failed to read gauge value");
});
meterRegistry.gauge("gauge2", tags, 42, v -> 42D);
assertThat(metricsReporter.getFailedMeters()).isEmpty();
JsonNode metricSet = getSingleMetricSet();
assertThat(metricSet.get("metricset").get("samples").get("gauge1"))
.describedAs("value of %s is not expected to be written to json", "gauge1")
.isNull();

// serialization should handle ignoring the 1st value
assertThat(metricSet.get("metricset").get("samples").get("gauge2").get("value").doubleValue()).isEqualTo(42D);
assertThat(metricsReporter.getFailedMeters().iterator().next().getId().getName()).isEqualTo("gauge1");
}

@Test
void tryExclusionOfFailedGauge_secondFails() {
List<Tag> tags = List.of(Tag.of("foo", "bar"));
meterRegistry.gauge("gauge1", tags, 42, v -> 42D);
meterRegistry.gauge("gauge2", tags, 42, v -> {
throw new RuntimeException("Failed to read gauge value");
});
assertThat(metricsReporter.getFailedMeters()).isEmpty();
JsonNode metricSet = getSingleMetricSet();

// serialization should handle ignoring the 1st value
assertThat(metricSet.get("metricset").get("samples").get("gauge1").get("value").doubleValue()).isEqualTo(42D);

assertThat(metricSet.get("metricset").get("samples").get("gauge2"))
.describedAs("value of %s is not expected to be written to json", "gauge1")
.isNull();

assertThat(metricsReporter.getFailedMeters().iterator().next().getId().getName()).isEqualTo("gauge2");
}

@Test
void tryToSerializeInvalidCounterValues() {
for (Double invalidValue : Arrays.asList(Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, Double.NaN)) {
Expand Down