Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option for automatic JMX retry #3511

Merged
merged 10 commits into from Feb 9, 2024
Expand Up @@ -18,12 +18,16 @@
*/
package co.elastic.apm.agent.jmx;

import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import co.elastic.apm.agent.tracer.configuration.TimeDurationValueConverter;
import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationOptionProvider;

import java.util.Collections;
import java.util.List;

import static co.elastic.apm.agent.tracer.configuration.RangeValidator.isNotInRange;

public class JmxConfiguration extends ConfigurationOptionProvider {

private ConfigurationOption<List<JmxMetric>> captureJmxMetrics = ConfigurationOption.<List<JmxMetric>>builder(JmxMetric.TokenValueConverter.INSTANCE, List.class)
Expand Down Expand Up @@ -137,4 +141,15 @@ public class JmxConfiguration extends ConfigurationOptionProvider {
ConfigurationOption<List<JmxMetric>> getCaptureJmxMetrics() {
return captureJmxMetrics;
}

private final ConfigurationOption<TimeDuration> faildRetryInterval = TimeDurationValueConverter.durationOption("m")
.key("jmx_failed_retry_interval")
.tags("internal")
.description("If set to a value greater or equal to 1m, the agent will retry failed JMX metric registrations.")
.addValidator(isNotInRange(TimeDuration.of("1ms"), TimeDuration.of("59s")))
.buildWithDefault(TimeDuration.of("0m"));

public ConfigurationOption<TimeDuration> getFaildRetryInterval() {
return faildRetryInterval;
}
}
Expand Up @@ -23,10 +23,12 @@
import co.elastic.apm.agent.metrics.DoubleSupplier;
import co.elastic.apm.agent.metrics.Labels;
import co.elastic.apm.agent.metrics.MetricRegistry;
import co.elastic.apm.agent.tracer.GlobalLocks;
import co.elastic.apm.agent.sdk.internal.util.ExecutorUtils;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.tracer.GlobalLocks;
import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import org.stagemonitor.configuration.ConfigurationOption;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -54,6 +56,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class JmxMetricTracker extends AbstractLifecycleListener {
Expand All @@ -68,9 +71,17 @@ public class JmxMetricTracker extends AbstractLifecycleListener {
@Nullable
private volatile NotificationListener listener;

private final List<JmxMetric> failedMetrics;

@Nullable
private ScheduledExecutorService retryExecutor;

public JmxMetricTracker(ElasticApmTracer tracer) {
jmxConfiguration = tracer.getConfig(JmxConfiguration.class);
metricRegistry = tracer.getMetricRegistry();

// using a synchronized list so adding to the list does not require synchronization
failedMetrics = Collections.synchronizedList(new ArrayList<JmxMetric>());
}

@Override
Expand Down Expand Up @@ -175,19 +186,48 @@ synchronized void init(final MBeanServer platformMBeanServer) {
jmxConfiguration.getCaptureJmxMetrics().addChangeListener(new ConfigurationOption.ChangeListener<List<JmxMetric>>() {
@Override
public void onChange(ConfigurationOption<?> configurationOption, List<JmxMetric> oldValue, List<JmxMetric> newValue) {
List<JmxMetricRegistration> oldRegistrations = compileJmxMetricRegistrations(oldValue, platformMBeanServer);
List<JmxMetricRegistration> newRegistrations = compileJmxMetricRegistrations(newValue, platformMBeanServer);
List<JmxMetric> registrationErrors = new ArrayList<JmxMetric>(); // those are not needed
List<JmxMetricRegistration> oldRegistrations = compileJmxMetricRegistrations(oldValue, platformMBeanServer, registrationErrors);

List<JmxMetricRegistration> newRegistrations;
synchronized (failedMetrics) {
failedMetrics.clear();
newRegistrations = compileJmxMetricRegistrations(newValue, platformMBeanServer, failedMetrics);
}


for (JmxMetricRegistration addedRegistration : removeAll(oldRegistrations, newRegistrations)) {
addedRegistration.register(platformMBeanServer, metricRegistry);
}
for (JmxMetricRegistration deletedRegistration : removeAll(newRegistrations, oldRegistrations)) {
deletedRegistration.unregister(metricRegistry);
}

}
});
register(jmxConfiguration.getCaptureJmxMetrics().get(), platformMBeanServer);

ConfigurationOption<TimeDuration> failedRetryConfig = jmxConfiguration.getFaildRetryInterval();
long retryMillis = failedRetryConfig.getValue().getMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scope is inside the if statement, why here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 91f49cf

if (!failedRetryConfig.isDefault()) {
retryExecutor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("jmx-retry");
jackshirazi marked this conversation as resolved.
Show resolved Hide resolved
retryExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
retryFailedJmx(platformMBeanServer);
}
}, retryMillis, retryMillis, TimeUnit.MILLISECONDS);
}

register(jmxConfiguration.getCaptureJmxMetrics().get(), platformMBeanServer, failedMetrics);
}

// package-private for testing
void retryFailedJmx(MBeanServer platformMBeanServer) {
List<JmxMetric> failed = JmxMetricTracker.this.failedMetrics;
synchronized (failed) {
List<JmxMetric> toRetry = new ArrayList<>(failed);
failed.clear();
register(toRetry, platformMBeanServer, failed);
}
}

private void registerMBeanNotificationListener(final MBeanServer server) {
Expand Down Expand Up @@ -217,7 +257,7 @@ private void addMBean(ObjectName mBeanName, JmxMetric jmxMetric) {
ObjectName metricName = jmxMetric.getObjectName();
if (metricName.apply(mBeanName) || matchesJbossStatisticsPool(mBeanName, metricName, server)) {
logger.debug("MBean added at runtime: {}", jmxMetric.getObjectName());
register(Collections.singletonList(jmxMetric), server);
register(Collections.singletonList(jmxMetric), server, failedMetrics);
}
}

Expand Down Expand Up @@ -280,28 +320,36 @@ private static <T> List<T> removeAll(List<T> removeFromThis, List<T> toRemove) {
return result;
}

private void register(List<JmxMetric> jmxMetrics, MBeanServer server) {
for (JmxMetricRegistration registration : compileJmxMetricRegistrations(jmxMetrics, server)) {
private void register(List<JmxMetric> jmxMetrics, MBeanServer server, List<JmxMetric> failedMetrics) {
for (JmxMetricRegistration registration : compileJmxMetricRegistrations(jmxMetrics, server, failedMetrics)) {
registration.register(server, metricRegistry);
}
}

/**
* A single {@link JmxMetric} can yield multiple {@link JmxMetricRegistration}s if the {@link JmxMetric} contains multiple attributes
*
* @param jmxMetrics JMX metrics to register
* @param server MBean server
* @param failedMetrics list of JMX metrics that failed to register (out)
*/
private List<JmxMetricRegistration> compileJmxMetricRegistrations(List<JmxMetric> jmxMetrics, MBeanServer server) {
List<JmxMetricRegistration> registrations = new ArrayList<>();
private List<JmxMetricRegistration> compileJmxMetricRegistrations(List<JmxMetric> jmxMetrics, MBeanServer server, List<JmxMetric> failedMetrics) {
List<JmxMetricRegistration> globalRegistrations = new ArrayList<>();
for (JmxMetric jmxMetric : jmxMetrics) {
List<JmxMetricRegistration> metricRegistrations = new ArrayList<>();
try {
addJmxMetricRegistration(jmxMetric, registrations, server);
addJmxMetricRegistration(jmxMetric, metricRegistrations, server);
globalRegistrations.addAll(metricRegistrations);
} catch (Exception e) {
failedMetrics.add(jmxMetric);
logger.error("Failed to register JMX metric {}", jmxMetric.toString(), e);
}

}
return registrations;
return globalRegistrations;
}

private static void addJmxMetricRegistration(final JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, MBeanServer server) throws JMException {
private void addJmxMetricRegistration(final JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, MBeanServer server) throws JMException {
Set<ObjectInstance> mbeans = server.queryMBeans(jmxMetric.getObjectName(), null);
if (!mbeans.isEmpty()) {
logger.debug("Found mbeans for object name {}", jmxMetric.getObjectName());
Expand Down Expand Up @@ -355,20 +403,21 @@ private static String metricPrepend(Labels labels) {
return "";
}

private static void addJmxMetricRegistration(JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, ObjectName objectName, Object value, JmxMetric.Attribute attribute, String attributeName, String metricPrepend) throws AttributeNotFoundException {
private void addJmxMetricRegistration(JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, ObjectName objectName, Object value, JmxMetric.Attribute attribute, String attributeName, @Nullable String metricPrepend) throws AttributeNotFoundException {
String effectiveAttributeName = metricPrepend == null ? attributeName : metricPrepend + attributeName;
boolean unsubscribeOnError = jmxConfiguration.getFaildRetryInterval().isDefault();
if (value instanceof Number) {
logger.debug("Found number attribute {}={}", attribute.getJmxAttributeName(), value);
registrations.add(
new JmxMetricRegistration(
attribute.getMetricName(
metricPrepend == null ?
attributeName :
metricPrepend + attributeName
effectiveAttributeName
),
attribute.getLabels(objectName),
attributeName,
null,
objectName
objectName,
unsubscribeOnError
)
);
} else if (value instanceof CompositeData) {
Expand All @@ -380,14 +429,12 @@ private static void addJmxMetricRegistration(JmxMetric jmxMetric, List<JmxMetric
new JmxMetricRegistration(
attribute.getCompositeMetricName(
key,
metricPrepend == null ?
attributeName :
metricPrepend + attributeName
),
effectiveAttributeName),
attribute.getLabels(objectName),
attributeName,
key,
objectName
objectName,
unsubscribeOnError
)
);
} else {
Expand All @@ -411,13 +458,15 @@ static class JmxMetricRegistration {
@Nullable
private final String compositeDataKey;
private final ObjectName objectName;
private final boolean unsubscribeOnError;

private JmxMetricRegistration(String metricName, Labels labels, String jmxAttribute, @Nullable String compositeDataKey, ObjectName objectName) {
private JmxMetricRegistration(String metricName, Labels labels, String jmxAttribute, @Nullable String compositeDataKey, ObjectName objectName, boolean unsubscribeOnError) {
this.metricName = metricName;
this.labels = labels.immutableCopy();
this.jmxAttribute = jmxAttribute;
this.compositeDataKey = compositeDataKey;
this.objectName = objectName;
this.unsubscribeOnError = unsubscribeOnError;
}


Expand All @@ -427,13 +476,17 @@ void register(final MBeanServer server, final MetricRegistry metricRegistry) {
@Override
public double get() {
try {
double value;
if (compositeDataKey == null) {
return ((Number) server.getAttribute(objectName, jmxAttribute)).doubleValue();
value = ((Number) server.getAttribute(objectName, jmxAttribute)).doubleValue();
} else {
return ((Number) ((CompositeData) server.getAttribute(objectName, jmxAttribute)).get(compositeDataKey)).doubleValue();
value = ((Number) ((CompositeData) server.getAttribute(objectName, jmxAttribute)).get(compositeDataKey)).doubleValue();
}
return value;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] this allows to set a breakpoint to investigate the captured value when debugging.

} catch (InstanceNotFoundException | AttributeNotFoundException e) {
unregister(metricRegistry);
if (unsubscribeOnError) {
unregister(metricRegistry);
}
return Double.NaN;
} catch (Exception e) {
return Double.NaN;
Expand Down Expand Up @@ -473,5 +526,8 @@ public void stop() throws Exception {
if (logManagerPropertyPoller != null) {
logManagerPropertyPoller.interrupt();
}
if (retryExecutor != null) {
ExecutorUtils.shutdownAndWaitTermination(retryExecutor);
}
}
}