Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018 - 2019 Elastic and contributors
* %%
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* #L%
*/
package co.elastic.apm.agent.configuration;

import co.elastic.apm.agent.context.LifecycleListener;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.MetaData;
import co.elastic.apm.agent.report.ApmServerClient;
import co.elastic.apm.agent.report.serialize.PayloadSerializer;
import co.elastic.apm.agent.util.ExecutorUtils;
import com.dslplatform.json.DslJson;
import com.dslplatform.json.JsonReader;
import com.dslplatform.json.MapConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationRegistry;
import org.stagemonitor.configuration.source.AbstractConfigurationSource;
import org.stagemonitor.util.IOUtils;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ApmServerConfigurationSource extends AbstractConfigurationSource implements LifecycleListener {
private static final int SC_OK = 200;
private static final int SC_NOT_MODIFIED = 304;
private static final int SC_FORBIDDEN = 403;
private static final int SC_NOT_FOUND = 404;
private static final int SC_SERVICE_UNAVAILABLE = 503;

private static final int DEFAULT_POLL_DELAY_SEC = (int) TimeUnit.MINUTES.toSeconds(5);
private static final Pattern MAX_AGE = Pattern.compile("max-age\\s*=\\s*(\\d+)");
private final Logger logger;
private final DslJson<Object> dslJson = new DslJson<>();
private final byte[] buffer = new byte[4096];
private final PayloadSerializer payloadSerializer;
private final MetaData metaData;
private final ApmServerClient apmServerClient;
@Nullable
private String etag;
private volatile Map<String, String> config = Collections.emptyMap();
@Nullable
private volatile ThreadPoolExecutor threadPool;

public ApmServerConfigurationSource(PayloadSerializer payloadSerializer, MetaData metaData, ApmServerClient apmServerClient) {
this(payloadSerializer, metaData, apmServerClient, LoggerFactory.getLogger(ApmServerConfigurationSource.class));
}

ApmServerConfigurationSource(PayloadSerializer payloadSerializer, MetaData metaData, ApmServerClient apmServerClient, Logger logger) {
this.payloadSerializer = payloadSerializer;
this.metaData = metaData;
this.apmServerClient = apmServerClient;
this.logger = logger;
}

@Nullable
static Integer parseMaxAge(@Nullable String cacheControlHeader) {
if (cacheControlHeader == null) {
return null;
}

Matcher matcher = MAX_AGE.matcher(cacheControlHeader);
if (!matcher.find()) {
return null;
}
return Integer.parseInt(matcher.group(1));
}

/**
* We want to reload the configuration in intervals which are determined based on the Cache-Control header from the APM Server
* That's why we can't rely on the general {@link org.stagemonitor.configuration.ConfigurationRegistry} scheduled reload
*/
@Override
public void reload() {
}

@Override
public void start(final ElasticApmTracer tracer) {
threadPool = ExecutorUtils.createSingleThreadDeamonPool("apm-remote-config-poller", 1);
threadPool.execute(new Runnable() {
@Override
public void run() {
pollConfig(tracer.getConfigurationRegistry());
}
});
}

/**
* Continuously polls the APM Server's remote configuration endpoint
*
* @param configurationRegistry the configuration registry which will be asked to
* {@link ConfigurationRegistry#reloadDynamicConfigurationOptions()}
* after successfully fetching the configuration
*/
private void pollConfig(ConfigurationRegistry configurationRegistry) {
while (!Thread.currentThread().isInterrupted()) {
String cacheControlHeader = fetchConfig(configurationRegistry);
// it doesn't make sense to poll more frequently than the max-age
Integer pollDelaySec = parseMaxAge(cacheControlHeader);
if (pollDelaySec == null) {
pollDelaySec = DEFAULT_POLL_DELAY_SEC;
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling next remote configuration reload in {}s", pollDelaySec);
}
TimeUnit.SECONDS.sleep(pollDelaySec);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

/**
* Fetches the configuration and returns the Cache-Control header which is used to determine the next polling interval
*
* @param configurationRegistry the configuration registry which will be asked to
* {@link ConfigurationRegistry#reloadDynamicConfigurationOptions()}
* after successfully fetching the configuration
* @return the Cache-Control header of the HTTP response
*/
@Nullable
String fetchConfig(final ConfigurationRegistry configurationRegistry) {
try {
return apmServerClient.execute("/config/v1/agents", new ApmServerClient.ConnectionHandler<String>() {
@Override
public String withConnection(HttpURLConnection connection) throws IOException {
return tryFetchConfig(configurationRegistry, connection);
}
});
} catch (Exception e) {
logger.error(e.getMessage());
return null;
}
}

private String tryFetchConfig(ConfigurationRegistry configurationRegistry, HttpURLConnection connection) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Reloading configuration from APM Server {}", connection.getURL());
}
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestMethod("POST");
connection.setDoOutput(true);
if (etag != null) {
connection.setRequestProperty("If-None-Match", etag);
}
payloadSerializer.setOutputStream(connection.getOutputStream());
payloadSerializer.serializeMetadata(metaData);
payloadSerializer.flush();

final int status = connection.getResponseCode();
switch (status) {
case SC_OK:
etag = connection.getHeaderField("ETag");
InputStream is = connection.getInputStream();
final JsonReader<Object> reader = dslJson.newReader(is, buffer);
reader.startObject();
config = MapConverter.deserialize(reader);
IOUtils.consumeAndClose(is);
configurationRegistry.reloadDynamicConfigurationOptions();
logger.info("Received new configuration from APM Server: {}", config);
for (Map.Entry<String, String> entry : config.entrySet()) {
ConfigurationOption<?> conf = configurationRegistry.getConfigurationOptionByKey(entry.getKey());
if (conf == null) {
logger.warn("Received unknown remote configuration key {}", entry.getKey());
} else if (!conf.isDynamic()) {
logger.warn("Can't apply remote configuration {} as this option is not dynamic (aka. reloadable)", entry.getKey());
}
}
break;
case SC_NOT_MODIFIED:
logger.debug("Configuration did not change");
break;
case SC_NOT_FOUND:
etag = null;
// means that there either is no configuration for this agent
// or that this is an APM Server < 7.3 which does not have the config endpoint
logger.debug("No remote config found for this agent");
// makes sure to remove the configuration if all configs are deleted for this agent in Kibana
if (!config.isEmpty()) {
logger.info("Received new configuration from APM Server: <empty>");
}
config = Collections.emptyMap();
configurationRegistry.reloadDynamicConfigurationOptions();
break;
case SC_FORBIDDEN:
logger.debug("Central configuration is disabled. Set kibana.enabled: true in your APM Server configuration.");
break;
case SC_SERVICE_UNAVAILABLE:
throw new IllegalStateException("Remote configuration is not available. Check the connection between APM Server and Kibana.");
default:
throw new IllegalStateException("Unexpected status " + status + " while fetching configuration");
}
return connection.getHeaderField("Cache-Control");
}

@Override
public String getValue(String key) {
return config.get(key);
}

@Override
public String getName() {
return "APM Server";
}

@Override
public void stop() {
if (this.threadPool != null) {
this.threadPool.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,17 @@ public void stop() {
transactionPool.close();
spanPool.close();
errorPool.close();
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.stop();
}
} catch (Exception e) {
logger.warn("Suppressed exception while calling stop()", e);
}

for (LifecycleListener lifecycleListener : lifecycleListeners) {
try {
lifecycleListener.stop();
} catch (Exception e) {
logger.warn("Suppressed exception while calling stop()", e);
}
}
}

public Reporter getReporter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@

import co.elastic.apm.agent.bci.ElasticApmAgent;
import co.elastic.apm.agent.configuration.AgentArgumentsConfigurationSource;
import co.elastic.apm.agent.configuration.ApmServerConfigurationSource;
import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.configuration.PrefixingConfigurationSourceWrapper;
import co.elastic.apm.agent.configuration.source.PropertyFileConfigurationSource;
import co.elastic.apm.agent.configuration.source.SystemPropertyConfigurationSource;
import co.elastic.apm.agent.context.LifecycleListener;
import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration;
import co.elastic.apm.agent.logging.LoggingConfiguration;
import co.elastic.apm.agent.report.ApmServerClient;
import co.elastic.apm.agent.report.ApmServerHealthChecker;
import co.elastic.apm.agent.report.Reporter;
import co.elastic.apm.agent.report.ReporterConfiguration;
import co.elastic.apm.agent.report.ReporterFactory;
import co.elastic.apm.agent.report.serialize.DslJsonSerializer;
import co.elastic.apm.agent.util.DependencyInjectingServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -58,8 +64,7 @@ public class ElasticApmTracerBuilder {
private ConfigurationRegistry configurationRegistry;
@Nullable
private Reporter reporter;
@Nullable
private Iterable<LifecycleListener> lifecycleListeners;
private final List<LifecycleListener> lifecycleListeners = new ArrayList<>();
private Map<String, String> inlineConfig = new HashMap<>();
@Nullable
private final String agentArguments;
Expand All @@ -86,7 +91,7 @@ public ElasticApmTracerBuilder reporter(Reporter reporter) {
}

public ElasticApmTracerBuilder lifecycleListeners(List<LifecycleListener> lifecycleListeners) {
this.lifecycleListeners = lifecycleListeners;
this.lifecycleListeners.addAll(lifecycleListeners);
return this;
}

Expand All @@ -100,11 +105,18 @@ public ElasticApmTracer build() {
final List<ConfigurationSource> configSources = getConfigSources(agentArguments);
configurationRegistry = getDefaultConfigurationRegistry(configSources);
}
final ApmServerClient apmServerClient = new ApmServerClient(configurationRegistry.getConfig(ReporterConfiguration.class));
final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class));
final MetaData metaData = MetaData.create(configurationRegistry, null, null);
ApmServerConfigurationSource configurationSource = new ApmServerConfigurationSource(payloadSerializer, metaData, apmServerClient);
configurationRegistry.addConfigurationSource(configurationSource);
if (reporter == null) {
reporter = new ReporterFactory().createReporter(configurationRegistry, null, null);
reporter = new ReporterFactory().createReporter(configurationRegistry, apmServerClient, metaData);
}
if (lifecycleListeners == null) {
lifecycleListeners = DependencyInjectingServiceLoader.load(LifecycleListener.class);
if (lifecycleListeners.isEmpty()) {
lifecycleListeners.add(new ApmServerHealthChecker(apmServerClient));
lifecycleListeners.add(configurationSource);
lifecycleListeners.addAll(DependencyInjectingServiceLoader.load(LifecycleListener.class));
}
return new ElasticApmTracer(configurationRegistry, reporter, lifecycleListeners);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@
*/
package co.elastic.apm.agent.impl;

import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.impl.payload.ProcessFactory;
import co.elastic.apm.agent.impl.payload.ProcessInfo;
import co.elastic.apm.agent.impl.payload.Service;
import co.elastic.apm.agent.impl.payload.ServiceFactory;
import co.elastic.apm.agent.impl.payload.SystemInfo;
import co.elastic.apm.agent.report.ReporterConfiguration;
import org.stagemonitor.configuration.ConfigurationRegistry;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Map;
Expand Down Expand Up @@ -58,6 +65,15 @@ public MetaData(ProcessInfo process, Service service, SystemInfo system, Map<Str
globalLabelValues = new ArrayList<>(globalLabels.values());
}

public static MetaData create(ConfigurationRegistry configurationRegistry, @Nullable String frameworkName, @Nullable String frameworkVersion) {
final Service service = new ServiceFactory().createService(configurationRegistry.getConfig(CoreConfiguration.class), frameworkName, frameworkVersion);
final ProcessInfo processInformation = ProcessFactory.ForCurrentVM.INSTANCE.getProcessInformation();
if (!configurationRegistry.getConfig(ReporterConfiguration.class).isIncludeProcessArguments()) {
processInformation.getArgv().clear();
}
return new MetaData(processInformation, service, SystemInfo.create(), configurationRegistry.getConfig(CoreConfiguration.class).getGlobalLabels());
}

/**
* Service
* (Required)
Expand Down
Loading