diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java new file mode 100644 index 0000000000..48efe1c8fa --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java @@ -0,0 +1,243 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2019 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.configuration; + +import co.elastic.apm.agent.context.LifecycleListener; +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.MetaData; +import co.elastic.apm.agent.report.ApmServerClient; +import co.elastic.apm.agent.report.serialize.PayloadSerializer; +import co.elastic.apm.agent.util.ExecutorUtils; +import com.dslplatform.json.DslJson; +import com.dslplatform.json.JsonReader; +import com.dslplatform.json.MapConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.stagemonitor.configuration.ConfigurationOption; +import org.stagemonitor.configuration.ConfigurationRegistry; +import org.stagemonitor.configuration.source.AbstractConfigurationSource; +import org.stagemonitor.util.IOUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ApmServerConfigurationSource extends AbstractConfigurationSource implements LifecycleListener { + private static final int SC_OK = 200; + private static final int SC_NOT_MODIFIED = 304; + private static final int SC_FORBIDDEN = 403; + private static final int SC_NOT_FOUND = 404; + private static final int SC_SERVICE_UNAVAILABLE = 503; + + private static final int DEFAULT_POLL_DELAY_SEC = (int) TimeUnit.MINUTES.toSeconds(5); + private static final Pattern MAX_AGE = Pattern.compile("max-age\\s*=\\s*(\\d+)"); + private final Logger logger; + private final DslJson dslJson = new DslJson<>(); + private final byte[] buffer = new byte[4096]; + private final PayloadSerializer payloadSerializer; + private final MetaData metaData; + private final ApmServerClient apmServerClient; + @Nullable + private String etag; + private volatile Map config = Collections.emptyMap(); + @Nullable + private volatile ThreadPoolExecutor threadPool; + + public ApmServerConfigurationSource(PayloadSerializer payloadSerializer, MetaData metaData, ApmServerClient apmServerClient) { + this(payloadSerializer, metaData, apmServerClient, LoggerFactory.getLogger(ApmServerConfigurationSource.class)); + } + + ApmServerConfigurationSource(PayloadSerializer payloadSerializer, MetaData metaData, ApmServerClient apmServerClient, Logger logger) { + this.payloadSerializer = payloadSerializer; + this.metaData = metaData; + this.apmServerClient = apmServerClient; + this.logger = logger; + } + + @Nullable + static Integer parseMaxAge(@Nullable String cacheControlHeader) { + if (cacheControlHeader == null) { + return null; + } + + Matcher matcher = MAX_AGE.matcher(cacheControlHeader); + if (!matcher.find()) { + return null; + } + return Integer.parseInt(matcher.group(1)); + } + + /** + * We want to reload the configuration in intervals which are determined based on the Cache-Control header from the APM Server + * That's why we can't rely on the general {@link org.stagemonitor.configuration.ConfigurationRegistry} scheduled reload + */ + @Override + public void reload() { + } + + @Override + public void start(final ElasticApmTracer tracer) { + threadPool = ExecutorUtils.createSingleThreadDeamonPool("apm-remote-config-poller", 1); + threadPool.execute(new Runnable() { + @Override + public void run() { + pollConfig(tracer.getConfigurationRegistry()); + } + }); + } + + /** + * Continuously polls the APM Server's remote configuration endpoint + * + * @param configurationRegistry the configuration registry which will be asked to + * {@link ConfigurationRegistry#reloadDynamicConfigurationOptions()} + * after successfully fetching the configuration + */ + private void pollConfig(ConfigurationRegistry configurationRegistry) { + while (!Thread.currentThread().isInterrupted()) { + String cacheControlHeader = fetchConfig(configurationRegistry); + // it doesn't make sense to poll more frequently than the max-age + Integer pollDelaySec = parseMaxAge(cacheControlHeader); + if (pollDelaySec == null) { + pollDelaySec = DEFAULT_POLL_DELAY_SEC; + } + try { + if (logger.isDebugEnabled()) { + logger.debug("Scheduling next remote configuration reload in {}s", pollDelaySec); + } + TimeUnit.SECONDS.sleep(pollDelaySec); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Fetches the configuration and returns the Cache-Control header which is used to determine the next polling interval + * + * @param configurationRegistry the configuration registry which will be asked to + * {@link ConfigurationRegistry#reloadDynamicConfigurationOptions()} + * after successfully fetching the configuration + * @return the Cache-Control header of the HTTP response + */ + @Nullable + String fetchConfig(final ConfigurationRegistry configurationRegistry) { + try { + return apmServerClient.execute("/config/v1/agents", new ApmServerClient.ConnectionHandler() { + @Override + public String withConnection(HttpURLConnection connection) throws IOException { + return tryFetchConfig(configurationRegistry, connection); + } + }); + } catch (Exception e) { + logger.error(e.getMessage()); + return null; + } + } + + private String tryFetchConfig(ConfigurationRegistry configurationRegistry, HttpURLConnection connection) throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("Reloading configuration from APM Server {}", connection.getURL()); + } + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + if (etag != null) { + connection.setRequestProperty("If-None-Match", etag); + } + payloadSerializer.setOutputStream(connection.getOutputStream()); + payloadSerializer.serializeMetadata(metaData); + payloadSerializer.flush(); + + final int status = connection.getResponseCode(); + switch (status) { + case SC_OK: + etag = connection.getHeaderField("ETag"); + InputStream is = connection.getInputStream(); + final JsonReader reader = dslJson.newReader(is, buffer); + reader.startObject(); + config = MapConverter.deserialize(reader); + IOUtils.consumeAndClose(is); + configurationRegistry.reloadDynamicConfigurationOptions(); + logger.info("Received new configuration from APM Server: {}", config); + for (Map.Entry entry : config.entrySet()) { + ConfigurationOption conf = configurationRegistry.getConfigurationOptionByKey(entry.getKey()); + if (conf == null) { + logger.warn("Received unknown remote configuration key {}", entry.getKey()); + } else if (!conf.isDynamic()) { + logger.warn("Can't apply remote configuration {} as this option is not dynamic (aka. reloadable)", entry.getKey()); + } + } + break; + case SC_NOT_MODIFIED: + logger.debug("Configuration did not change"); + break; + case SC_NOT_FOUND: + etag = null; + // means that there either is no configuration for this agent + // or that this is an APM Server < 7.3 which does not have the config endpoint + logger.debug("No remote config found for this agent"); + // makes sure to remove the configuration if all configs are deleted for this agent in Kibana + if (!config.isEmpty()) { + logger.info("Received new configuration from APM Server: "); + } + config = Collections.emptyMap(); + configurationRegistry.reloadDynamicConfigurationOptions(); + break; + case SC_FORBIDDEN: + logger.debug("Central configuration is disabled. Set kibana.enabled: true in your APM Server configuration."); + break; + case SC_SERVICE_UNAVAILABLE: + throw new IllegalStateException("Remote configuration is not available. Check the connection between APM Server and Kibana."); + default: + throw new IllegalStateException("Unexpected status " + status + " while fetching configuration"); + } + return connection.getHeaderField("Cache-Control"); + } + + @Override + public String getValue(String key) { + return config.get(key); + } + + @Override + public String getName() { + return "APM Server"; + } + + @Override + public void stop() { + if (this.threadPool != null) { + this.threadPool.shutdownNow(); + } + } +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java index abc12407c5..bfae4f0784 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java @@ -476,12 +476,17 @@ public void stop() { transactionPool.close(); spanPool.close(); errorPool.close(); - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.stop(); - } } catch (Exception e) { logger.warn("Suppressed exception while calling stop()", e); } + + for (LifecycleListener lifecycleListener : lifecycleListeners) { + try { + lifecycleListener.stop(); + } catch (Exception e) { + logger.warn("Suppressed exception while calling stop()", e); + } + } } public Reporter getReporter() { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracerBuilder.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracerBuilder.java index d88ec8836a..7b0204ab5f 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracerBuilder.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracerBuilder.java @@ -26,14 +26,20 @@ import co.elastic.apm.agent.bci.ElasticApmAgent; import co.elastic.apm.agent.configuration.AgentArgumentsConfigurationSource; +import co.elastic.apm.agent.configuration.ApmServerConfigurationSource; import co.elastic.apm.agent.configuration.CoreConfiguration; import co.elastic.apm.agent.configuration.PrefixingConfigurationSourceWrapper; import co.elastic.apm.agent.configuration.source.PropertyFileConfigurationSource; import co.elastic.apm.agent.configuration.source.SystemPropertyConfigurationSource; import co.elastic.apm.agent.context.LifecycleListener; +import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; import co.elastic.apm.agent.logging.LoggingConfiguration; +import co.elastic.apm.agent.report.ApmServerClient; +import co.elastic.apm.agent.report.ApmServerHealthChecker; import co.elastic.apm.agent.report.Reporter; +import co.elastic.apm.agent.report.ReporterConfiguration; import co.elastic.apm.agent.report.ReporterFactory; +import co.elastic.apm.agent.report.serialize.DslJsonSerializer; import co.elastic.apm.agent.util.DependencyInjectingServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,8 +64,7 @@ public class ElasticApmTracerBuilder { private ConfigurationRegistry configurationRegistry; @Nullable private Reporter reporter; - @Nullable - private Iterable lifecycleListeners; + private final List lifecycleListeners = new ArrayList<>(); private Map inlineConfig = new HashMap<>(); @Nullable private final String agentArguments; @@ -86,7 +91,7 @@ public ElasticApmTracerBuilder reporter(Reporter reporter) { } public ElasticApmTracerBuilder lifecycleListeners(List lifecycleListeners) { - this.lifecycleListeners = lifecycleListeners; + this.lifecycleListeners.addAll(lifecycleListeners); return this; } @@ -100,11 +105,18 @@ public ElasticApmTracer build() { final List configSources = getConfigSources(agentArguments); configurationRegistry = getDefaultConfigurationRegistry(configSources); } + final ApmServerClient apmServerClient = new ApmServerClient(configurationRegistry.getConfig(ReporterConfiguration.class)); + final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class)); + final MetaData metaData = MetaData.create(configurationRegistry, null, null); + ApmServerConfigurationSource configurationSource = new ApmServerConfigurationSource(payloadSerializer, metaData, apmServerClient); + configurationRegistry.addConfigurationSource(configurationSource); if (reporter == null) { - reporter = new ReporterFactory().createReporter(configurationRegistry, null, null); + reporter = new ReporterFactory().createReporter(configurationRegistry, apmServerClient, metaData); } - if (lifecycleListeners == null) { - lifecycleListeners = DependencyInjectingServiceLoader.load(LifecycleListener.class); + if (lifecycleListeners.isEmpty()) { + lifecycleListeners.add(new ApmServerHealthChecker(apmServerClient)); + lifecycleListeners.add(configurationSource); + lifecycleListeners.addAll(DependencyInjectingServiceLoader.load(LifecycleListener.class)); } return new ElasticApmTracer(configurationRegistry, reporter, lifecycleListeners); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/MetaData.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/MetaData.java index e79da40ab2..146619b3c2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/MetaData.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/MetaData.java @@ -24,9 +24,16 @@ */ package co.elastic.apm.agent.impl; +import co.elastic.apm.agent.configuration.CoreConfiguration; +import co.elastic.apm.agent.impl.payload.ProcessFactory; import co.elastic.apm.agent.impl.payload.ProcessInfo; import co.elastic.apm.agent.impl.payload.Service; +import co.elastic.apm.agent.impl.payload.ServiceFactory; import co.elastic.apm.agent.impl.payload.SystemInfo; +import co.elastic.apm.agent.report.ReporterConfiguration; +import org.stagemonitor.configuration.ConfigurationRegistry; + +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Map; @@ -58,6 +65,15 @@ public MetaData(ProcessInfo process, Service service, SystemInfo system, Map(globalLabels.values()); } + public static MetaData create(ConfigurationRegistry configurationRegistry, @Nullable String frameworkName, @Nullable String frameworkVersion) { + final Service service = new ServiceFactory().createService(configurationRegistry.getConfig(CoreConfiguration.class), frameworkName, frameworkVersion); + final ProcessInfo processInformation = ProcessFactory.ForCurrentVM.INSTANCE.getProcessInformation(); + if (!configurationRegistry.getConfig(ReporterConfiguration.class).isIncludeProcessArguments()) { + processInformation.getArgv().clear(); + } + return new MetaData(processInformation, service, SystemInfo.create(), configurationRegistry.getConfig(CoreConfiguration.class).getGlobalLabels()); + } + /** * Service * (Required) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java new file mode 100644 index 0000000000..6bde86883d --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java @@ -0,0 +1,248 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2019 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.report; + +import co.elastic.apm.agent.util.VersionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Load-balances traffic and handles fail-overs to multiple APM Servers. + *

+ * In contrast to most load-balancing algorithms, this does not round-robin for every new request. + * The reasoning is that we want to be able to reuse the TCP connections to the APM Server as much as possible so that + * initiating a new request is as fast as possible. + * That's why we only round-robin to the next APM Server URL in the event of a connection error. + *

+ *

+ * To achieve load-balancing, we shuffle the list of provided APM Server URLs. + * The more agents need to communicate with the same set of servers, + * the more even the load distribution will be. + * That's because of the random order the APM Servers will be in the shuffled list. + * The assumption is that we only need to multiple APM Servers if lots of agents are in use and therefore one server does not scale anymore. + *

+ */ +public class ApmServerClient { + + private static final Logger logger = LoggerFactory.getLogger(ApmServerClient.class); + private static final String USER_AGENT = "elasticapm-java/" + VersionUtils.getAgentVersion(); + private final ReporterConfiguration reporterConfiguration; + private final List serverUrls; + private final AtomicInteger errorCount = new AtomicInteger(); + + public ApmServerClient(ReporterConfiguration reporterConfiguration) { + this(reporterConfiguration, shuffleUrls(reporterConfiguration)); + } + + public ApmServerClient(ReporterConfiguration reporterConfiguration, List serverUrls) { + this.reporterConfiguration = reporterConfiguration; + this.serverUrls = Collections.unmodifiableList(serverUrls); + } + + private static List shuffleUrls(ReporterConfiguration reporterConfiguration) { + List serverUrls = new ArrayList<>(reporterConfiguration.getServerUrls()); + // shuffling the URL list helps to distribute the load across the apm servers + // when there are multiple agents, they should not all start connecting to the same apm server + Collections.shuffle(serverUrls); + return serverUrls; + } + + private static void trustAll(HttpsURLConnection connection) { + final SSLSocketFactory sf = SslUtils.getTrustAllSocketFactory(); + if (sf != null) { + // using the same instances is important for TCP connection reuse + connection.setHostnameVerifier(SslUtils.getTrustAllHostnameVerifyer()); + connection.setSSLSocketFactory(sf); + } + } + + HttpURLConnection startRequest(String relativePath) throws IOException { + return startRequestToUrl(appendPathToCurrentUrl(relativePath)); + } + + @Nonnull + private HttpURLConnection startRequestToUrl(URL url) throws IOException { + final URLConnection connection = url.openConnection(); + if (!reporterConfiguration.isVerifyServerCert()) { + if (connection instanceof HttpsURLConnection) { + trustAll((HttpsURLConnection) connection); + } + } + if (reporterConfiguration.getSecretToken() != null) { + connection.setRequestProperty("Authorization", "Bearer " + reporterConfiguration.getSecretToken()); + } + connection.setRequestProperty("User-Agent", USER_AGENT); + connection.setConnectTimeout((int) reporterConfiguration.getServerTimeout().getMillis()); + connection.setReadTimeout((int) reporterConfiguration.getServerTimeout().getMillis()); + return (HttpURLConnection) connection; + } + + @Nonnull + URL appendPathToCurrentUrl(String apmServerPath) throws MalformedURLException { + return appendPath(getCurrentUrl(), apmServerPath); + } + + @Nonnull + private URL appendPath(URL serverUrl, String apmServerPath) throws MalformedURLException { + String path = serverUrl.getPath(); + if (path.endsWith("/")) { + path = path.substring(0, path.length() - 1); + } + return new URL(serverUrl, path + apmServerPath); + } + + /** + * Instead of rotating the {@link #serverUrls} instance variable, this just increments an error counter which is read by + * {@link #getCurrentUrl()} and {@link #getPrioritizedUrlList()} which rotate a copy of the immutable {@link #serverUrls} list. + * This avoids that concurrently running requests influence each other. + *

+ * This design is inspired by org.elasticsearch.client.RestClient + *

+ *

+ * If the expected error count does not match the actual count, + * the error count is not incremented. + * This avoids concurrent requests from incrementing the error multiple times due to only one failing server. + *

+ * @param expectedErrorCount the error count that is expected by the current thread + * @return the new expected error count + */ + int incrementAndGetErrorCount(int expectedErrorCount) { + boolean success = errorCount.compareAndSet(expectedErrorCount, expectedErrorCount + 1); + if (success) { + return expectedErrorCount + 1; + } else { + // this thread has a stale error count and may not increment the error count when another retry fails + return -1; + } + } + + /** + * Similar to {@link #incrementAndGetErrorCount(int)} but without guarding against concurrent connection errors. + * This relieves the user from maintaining a separate error count. + */ + void onConnectionError() { + errorCount.incrementAndGet(); + } + + /** + * Executes a request to the APM Server and returns the result from the provided {@link ConnectionHandler}. + * If there's a connection error executing the request, + * the request is retried with the next APM Server url. + * The maximum amount of retries is the number of configured APM Server URLs. + * + * @param path the APM Server path + * @param connectionHandler receives the {@link HttpURLConnection} and returns the result + * @param the result type + * @return the result of the provided {@link Callable} + * @throws Exception in case all retries yield an exception, the last will be thrown + */ + @Nullable + public V execute(String path, ConnectionHandler connectionHandler) throws Exception { + int expectedErrorCount = errorCount.get(); + Exception previousException = null; + for (URL serverUrl : getPrioritizedUrlList()) { + HttpURLConnection connection = null; + try { + connection = startRequestToUrl(appendPath(serverUrl, path)); + return connectionHandler.withConnection(connection); + } catch (Exception e) { + expectedErrorCount = incrementAndGetErrorCount(expectedErrorCount); + logger.debug("Exception while interacting with APM Server, trying next one."); + if (previousException != null) { + e.addSuppressed(previousException); + } + previousException = e; + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + if (previousException == null) { + throw new IllegalStateException("Expected previousException not to be null"); + } + throw previousException; + } + + public void executeForAllUrls(String path, ConnectionHandler connectionHandler) { + for (URL serverUrl : serverUrls) { + HttpURLConnection connection = null; + try { + connection = startRequestToUrl(appendPath(serverUrl, path)); + connectionHandler.withConnection(connection); + } catch (Exception e) { + logger.debug("Exception while interacting with APM Server", e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + } + + URL getCurrentUrl() { + return serverUrls.get(errorCount.get() % serverUrls.size()); + } + + /** + * Returns a copy of {@link #serverUrls} which contains the {@link #getCurrentUrl() current URL} as the first element + * + * @return a copy of {@link #serverUrls} which contains the {@link #getCurrentUrl() current URL} as the first element + */ + @Nonnull + private List getPrioritizedUrlList() { + // Copying the URLs instead of rotating serverUrls makes sure that a concurrently happening connection error + // for a different request does not skip a URL. + // In other words, it avoids that concurrently running requests influence each other. + ArrayList serverUrlsCopy = new ArrayList<>(serverUrls); + Collections.rotate(serverUrlsCopy, errorCount.get()); + return serverUrlsCopy; + } + + int getErrorCount() { + return errorCount.get(); + } + + public interface ConnectionHandler { + @Nullable + T withConnection(HttpURLConnection connection) throws IOException; + } + +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java index 4a99abf60b..1d20e848d7 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,86 +24,62 @@ */ package co.elastic.apm.agent.report; -import co.elastic.apm.agent.util.VersionUtils; +import co.elastic.apm.agent.context.LifecycleListener; +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.util.ExecutorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; -import java.io.IOException; import java.net.HttpURLConnection; -import java.net.URL; +import java.util.concurrent.ThreadPoolExecutor; -class ApmServerHealthChecker implements Runnable { +public class ApmServerHealthChecker implements Runnable, LifecycleListener { private static final Logger logger = LoggerFactory.getLogger(ApmServerHealthChecker.class); - private final ReporterConfiguration reporterConfiguration; + private final ApmServerClient apmServerClient; - ApmServerHealthChecker(ReporterConfiguration reporterConfiguration) { - this.reporterConfiguration = reporterConfiguration; + public ApmServerHealthChecker(ApmServerClient apmServerClient) { + this.apmServerClient = apmServerClient; } @Override - public void run() { - boolean success; - String message; - HttpURLConnection connection = null; - try { - URL url = new URL(reporterConfiguration.getServerUrls().get(0).toString() + "/"); - if (logger.isDebugEnabled()) { - logger.debug("Starting healthcheck to {}", url); - } - connection = (HttpURLConnection) url.openConnection(); - if (!reporterConfiguration.isVerifyServerCert()) { - if (connection instanceof HttpsURLConnection) { - trustAll((HttpsURLConnection) connection); - } - } - if (reporterConfiguration.getSecretToken() != null) { - connection.setRequestProperty("Authorization", "Bearer " + reporterConfiguration.getSecretToken()); - } - connection.setRequestProperty("User-Agent", "elasticapm-java/" + VersionUtils.getAgentVersion()); - connection.setConnectTimeout((int) reporterConfiguration.getServerTimeout().getMillis()); - connection.setReadTimeout((int) reporterConfiguration.getServerTimeout().getMillis()); - connection.connect(); - - final int status = connection.getResponseCode(); + public void start(ElasticApmTracer tracer) { + ThreadPoolExecutor pool = ExecutorUtils.createSingleThreadDeamonPool("apm-server-healthcheck", 1); + pool.execute(this); + pool.shutdown(); + } - success = status < 300; + @Override + public void run() { + apmServerClient.executeForAllUrls("/", new ApmServerClient.ConnectionHandler() { + @Override + public Void withConnection(HttpURLConnection connection) { + try { + if (logger.isDebugEnabled()) { + logger.debug("Starting healthcheck to {}", connection.getURL()); + } - if (!success) { - if (status == 404) { - message = "It seems like you are using a version of the APM Server which is not compatible with this agent. " + - "Please use APM Server 6.5.0 or newer."; - } else { - message = Integer.toString(status); + final int status = connection.getResponseCode(); + if (status >= 300) { + if (status == 404) { + throw new IllegalStateException("It seems like you are using a version of the APM Server which is not compatible with this agent. " + + "Please use APM Server 6.5.0 or newer."); + } else { + throw new IllegalStateException("Server returned status " + status); + } + } else { + // prints out the version info of the APM Server + logger.info("Elastic APM server is available: {}", HttpUtils.getBody(connection)); + } + } catch (Exception e) { + logger.warn("Elastic APM server {} is not available ({})", connection.getURL(), e.getMessage()); } - } else { - // prints out the version info of the APM Server - message = HttpUtils.getBody(connection); + return null; } - } catch (IOException e) { - message = e.getMessage(); - success = false; - } finally { - if (connection != null) { - connection.disconnect(); - } - } - - if (success) { - logger.info("Elastic APM server is available: {}", message); - } else { - logger.warn("Elastic APM server is not available ({})", message); - } + }); } - private void trustAll(HttpsURLConnection connection) { - final SSLSocketFactory sf = SslUtils.getTrustAllSocketFactory(); - if (sf != null) { - // using the same instances is important for TCP connection reuse - connection.setHostnameVerifier(SslUtils.getTrustAllHostnameVerifyer()); - connection.setSSLSocketFactory(sf); - } + @Override + public void stop() { } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java index 6d89f88abf..8e7a611550 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java @@ -25,32 +25,18 @@ package co.elastic.apm.agent.report; import co.elastic.apm.agent.impl.MetaData; -import co.elastic.apm.agent.impl.payload.ProcessInfo; -import co.elastic.apm.agent.impl.payload.Service; -import co.elastic.apm.agent.impl.payload.SystemInfo; import co.elastic.apm.agent.report.processor.ProcessorEventHandler; import co.elastic.apm.agent.report.serialize.DslJsonSerializer; import co.elastic.apm.agent.report.serialize.PayloadSerializer; -import co.elastic.apm.agent.util.VersionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.stagemonitor.util.IOUtils; -import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Future; @@ -66,7 +52,6 @@ public class IntakeV2ReportingEventHandler implements ReportingEventHandler { public static final String INTAKE_V2_URL = "/intake/v2/events"; private static final Logger logger = LoggerFactory.getLogger(IntakeV2ReportingEventHandler.class); private static final int GZIP_COMPRESSION_LEVEL = 1; - private static final String USER_AGENT = "elasticapm-java/" + VersionUtils.getAgentVersion(); private static final Object WAIT_LOCK = new Object(); private final ReporterConfiguration reporterConfiguration; @@ -74,7 +59,7 @@ public class IntakeV2ReportingEventHandler implements ReportingEventHandler { private final MetaData metaData; private final PayloadSerializer payloadSerializer; private final Timer timeoutTimer; - private final CyclicIterator serverUrlIterator; + private final ApmServerClient apmServerClient; private Deflater deflater; private long currentlyTransmitting = 0; private long reported = 0; @@ -90,30 +75,15 @@ public class IntakeV2ReportingEventHandler implements ReportingEventHandler { private int errorCount; private volatile boolean shutDown; - public IntakeV2ReportingEventHandler(Service service, ProcessInfo process, SystemInfo system, - ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler, - PayloadSerializer payloadSerializer, Map globalLabels) { - this(service, process, system, reporterConfiguration, processorEventHandler, payloadSerializer, shuffleUrls(reporterConfiguration), globalLabels); - } - - IntakeV2ReportingEventHandler(Service service, ProcessInfo process, SystemInfo system, - ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler, - PayloadSerializer payloadSerializer, List serverUrls, Map globalLabels) { + public IntakeV2ReportingEventHandler(ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler, + PayloadSerializer payloadSerializer, MetaData metaData, ApmServerClient apmServerClient) { this.reporterConfiguration = reporterConfiguration; this.processorEventHandler = processorEventHandler; this.payloadSerializer = payloadSerializer; - this.metaData = new MetaData(process, service, system, globalLabels); + this.metaData = metaData; + this.apmServerClient = apmServerClient; this.deflater = new Deflater(GZIP_COMPRESSION_LEVEL); this.timeoutTimer = new Timer("apm-request-timeout-timer", true); - this.serverUrlIterator = new CyclicIterator<>(serverUrls); - } - - private static List shuffleUrls(ReporterConfiguration reporterConfiguration) { - List serverUrls = new ArrayList<>(reporterConfiguration.getServerUrls()); - // shuffling the URL list helps to distribute the load across the apm servers - // when there are multiple agents, they should not all start connecting to the same apm server - Collections.shuffle(serverUrls); - return serverUrls; } /* @@ -217,28 +187,16 @@ private boolean shouldFlush() { } private HttpURLConnection startRequest() throws IOException { - URL url = getUrl(); + final HttpURLConnection connection = apmServerClient.startRequest(INTAKE_V2_URL); if (logger.isDebugEnabled()) { - logger.debug("Starting new request to {}", url); - } - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - if (!reporterConfiguration.isVerifyServerCert()) { - if (connection instanceof HttpsURLConnection) { - trustAll((HttpsURLConnection) connection); - } + logger.debug("Starting new request to {}", connection.getURL()); } connection.setRequestMethod("POST"); connection.setDoOutput(true); - if (reporterConfiguration.getSecretToken() != null) { - connection.setRequestProperty("Authorization", "Bearer " + reporterConfiguration.getSecretToken()); - } connection.setChunkedStreamingMode(DslJsonSerializer.BUFFER_SIZE); - connection.setRequestProperty("User-Agent", USER_AGENT); connection.setRequestProperty("Content-Encoding", "deflate"); connection.setRequestProperty("Content-Type", "application/x-ndjson"); connection.setUseCaches(false); - connection.setConnectTimeout((int) reporterConfiguration.getServerTimeout().getMillis()); - connection.setReadTimeout((int) reporterConfiguration.getServerTimeout().getMillis()); connection.connect(); os = new DeflaterOutputStream(connection.getOutputStream(), deflater); payloadSerializer.setOutputStream(os); @@ -252,29 +210,6 @@ private HttpURLConnection startRequest() throws IOException { return connection; } - @Nonnull - URL getUrl() throws MalformedURLException { - URL serverUrl = serverUrlIterator.get(); - String path = serverUrl.getPath(); - if (path.endsWith("/")) { - path = path.substring(0, path.length() - 1); - } - return new URL(serverUrl, path + INTAKE_V2_URL); - } - - void switchToNextServerUrl() { - serverUrlIterator.next(); - } - - private void trustAll(HttpsURLConnection connection) { - final SSLSocketFactory sf = SslUtils.getTrustAllSocketFactory(); - if (sf != null) { - // using the same instances is important for TCP connection reuse - connection.setHostnameVerifier(SslUtils.getTrustAllHostnameVerifyer()); - connection.setSSLSocketFactory(sf); - } - } - void flush() { cancelTimeout(); if (connection != null) { @@ -353,7 +288,7 @@ private void onConnectionError(@Nullable Integer responseCode, long droppedEvent // if the response code is null, the server did not even send a response if (responseCode == null || responseCode > 429) { // this server seems to have connection or capacity issues, try next - switchToNextServerUrl(); + apmServerClient.onConnectionError(); } else if (responseCode == 404) { logger.warn("It seems like you are using a version of the APM Server which is not compatible with this agent. " + "Please use APM Server 6.5.0 or newer."); @@ -423,26 +358,4 @@ public boolean cancel() { } } - private static class CyclicIterator { - private final Iterable iterable; - private Iterator iterator; - private T current; - - public CyclicIterator(Iterable iterable) { - this.iterable = iterable; - iterator = this.iterable.iterator(); - current = iterator.next(); - } - - public T get() { - return current; - } - - public void next() { - if (!iterator.hasNext()) { - iterator = iterable.iterator(); - } - current = iterator.next(); - } - } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReporterFactory.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReporterFactory.java index 292ef8e142..960b4a1f70 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReporterFactory.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReporterFactory.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,67 +25,31 @@ package co.elastic.apm.agent.report; import co.elastic.apm.agent.configuration.CoreConfiguration; -import co.elastic.apm.agent.impl.payload.ProcessFactory; -import co.elastic.apm.agent.impl.payload.ProcessInfo; -import co.elastic.apm.agent.impl.payload.ServiceFactory; -import co.elastic.apm.agent.impl.payload.SystemInfo; +import co.elastic.apm.agent.impl.MetaData; import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; import co.elastic.apm.agent.report.processor.ProcessorEventHandler; import co.elastic.apm.agent.report.serialize.DslJsonSerializer; -import co.elastic.apm.agent.util.VersionUtils; import org.stagemonitor.configuration.ConfigurationRegistry; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; public class ReporterFactory { - public Reporter createReporter(ConfigurationRegistry configurationRegistry, @Nullable String frameworkName, - @Nullable String frameworkVersion) { + public Reporter createReporter(ConfigurationRegistry configurationRegistry, ApmServerClient apmServerClient, MetaData metaData) { final ReporterConfiguration reporterConfiguration = configurationRegistry.getConfig(ReporterConfiguration.class); final CoreConfiguration coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class); - if (coreConfiguration.isActive()) { - ExecutorService healthCheckExecutorService = Executors.newFixedThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - final Thread thread = new Thread(r); - thread.setName("apm-server-healthcheck"); - thread.setDaemon(true); - return thread; - } - }); - healthCheckExecutorService.submit(new ApmServerHealthChecker(reporterConfiguration)); - healthCheckExecutorService.shutdown(); - } - final ReportingEventHandler reportingEventHandler = getReportingEventHandler(configurationRegistry, frameworkName, - frameworkVersion, reporterConfiguration); + final ReportingEventHandler reportingEventHandler = getReportingEventHandler(configurationRegistry, + reporterConfiguration, metaData, apmServerClient); return new ApmServerReporter(true, reporterConfiguration, coreConfiguration, reportingEventHandler); } @Nonnull - private ReportingEventHandler getReportingEventHandler(ConfigurationRegistry configurationRegistry, @Nullable String frameworkName, - @Nullable String frameworkVersion, ReporterConfiguration reporterConfiguration) { + private ReportingEventHandler getReportingEventHandler(ConfigurationRegistry configurationRegistry, + ReporterConfiguration reporterConfiguration, MetaData metaData, ApmServerClient apmServerClient) { - final DslJsonSerializer payloadSerializer = new DslJsonSerializer( - configurationRegistry.getConfig(StacktraceConfiguration.class)); - final co.elastic.apm.agent.impl.payload.Service service = new ServiceFactory().createService(configurationRegistry.getConfig(CoreConfiguration.class), frameworkName, frameworkVersion); - final ProcessInfo processInformation = ProcessFactory.ForCurrentVM.INSTANCE.getProcessInformation(); + final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class)); final ProcessorEventHandler processorEventHandler = ProcessorEventHandler.loadProcessors(configurationRegistry); - if (!reporterConfiguration.isIncludeProcessArguments()) { - processInformation.getArgv().clear(); - } - return new IntakeV2ReportingEventHandler(service, processInformation, SystemInfo.create(), reporterConfiguration, - processorEventHandler, payloadSerializer, configurationRegistry.getConfig(CoreConfiguration.class).getGlobalLabels()); + return new IntakeV2ReportingEventHandler(reporterConfiguration, processorEventHandler, payloadSerializer, metaData, apmServerClient); } - private String getUserAgent() { - String agentVersion = VersionUtils.getAgentVersion(); - if (agentVersion != null) { - return "apm-agent-java " + agentVersion; - } - return "apm-agent-java"; - } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java index df125345b4..2164519ffa 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java @@ -155,6 +155,13 @@ public void serializePayload(final OutputStream os, final Payload payload) { public void serializeMetaDataNdJson(MetaData metaData) { jw.writeByte(JsonWriter.OBJECT_START); writeFieldName("metadata"); + serializeMetadata(metaData); + jw.writeByte(JsonWriter.OBJECT_END); + jw.writeByte(NEW_LINE); + } + + @Override + public void serializeMetadata(MetaData metaData) { jw.writeByte(JsonWriter.OBJECT_START); serializeService(metaData.getService()); jw.writeByte(COMMA); @@ -163,8 +170,6 @@ public void serializeMetaDataNdJson(MetaData metaData) { serializeGlobalLabels(metaData.getGlobalLabelKeys(), metaData.getGlobalLabelValues()); serializeSystem(metaData.getSystem()); jw.writeByte(JsonWriter.OBJECT_END); - jw.writeByte(JsonWriter.OBJECT_END); - jw.writeByte(NEW_LINE); } private void serializeGlobalLabels(ArrayList globalLabelKeys, ArrayList globalLabelValues) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/PayloadSerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/PayloadSerializer.java index 8ba9e988c1..b5229fa91c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/PayloadSerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/PayloadSerializer.java @@ -47,6 +47,8 @@ public interface PayloadSerializer { void serializeMetaDataNdJson(MetaData metaData); + void serializeMetadata(MetaData metaData); + void serializeTransactionNdJson(Transaction transaction); void serializeSpanNdJson(Span span); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java new file mode 100644 index 0000000000..8dde278be7 --- /dev/null +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java @@ -0,0 +1,148 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2019 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.configuration; + +import co.elastic.apm.agent.impl.MetaData; +import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; +import co.elastic.apm.agent.report.ApmServerClient; +import co.elastic.apm.agent.report.ReporterConfiguration; +import co.elastic.apm.agent.report.serialize.DslJsonSerializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; +import com.github.tomakehurst.wiremock.verification.LoggedRequest; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.stagemonitor.configuration.ConfigurationRegistry; + +import java.net.URL; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.notFound; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.serverError; +import static com.github.tomakehurst.wiremock.client.WireMock.serviceUnavailable; +import static com.github.tomakehurst.wiremock.client.WireMock.status; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ApmServerConfigurationSourceTest { + + @Rule + public WireMockRule mockApmServer = new WireMockRule(WireMockConfiguration.wireMockConfig().dynamicPort()); + private ConfigurationRegistry config; + private ApmServerClient apmServerClient; + private ApmServerConfigurationSource configurationSource; + private Logger mockLogger; + + @Before + public void setUp() throws Exception { + config = SpyConfiguration.createSpyConfig(); + apmServerClient = new ApmServerClient(config.getConfig(ReporterConfiguration.class), List.of(new URL("http", "localhost", mockApmServer.port(), "/"))); + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(ResponseDefinitionBuilder.okForJson(Map.of("foo", "bar")).withHeader("ETag", "foo"))); + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).withHeader("If-None-Match", equalTo("foo")).willReturn(status(304))); + mockLogger = mock(Logger.class); + configurationSource = new ApmServerConfigurationSource(new DslJsonSerializer(mock(StacktraceConfiguration.class)), MetaData.create(config, null, null), apmServerClient, mockLogger); + } + + @Test + public void testLoadRemoteConfig() throws Exception { + configurationSource.fetchConfig(config); + assertThat(configurationSource.getValue("foo")).isEqualTo("bar"); + mockApmServer.verify(postRequestedFor(urlEqualTo("/config/v1/agents"))); + configurationSource.fetchConfig(config); + mockApmServer.verify(postRequestedFor(urlEqualTo("/config/v1/agents")).withHeader("If-None-Match", equalTo("foo"))); + for (LoggedRequest request : WireMock.findAll(RequestPatternBuilder.allRequests())) { + final JsonNode jsonNode = new ObjectMapper().readTree(request.getBodyAsString()); + assertThat(jsonNode.get("service")).isNotNull(); + assertThat(jsonNode.get("system")).isNotNull(); + assertThat(jsonNode.get("process")).isNotNull(); + } + } + + @Test + public void testNotFound() { + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(notFound())); + assertThat(configurationSource.fetchConfig(config)).isNull(); + verify(mockLogger, times(1)).debug(contains("No remote config found for this agent")); + } + + @Test + public void configDeleted() { + configurationSource.fetchConfig(config); + assertThat(configurationSource.getValue("foo")).isEqualTo("bar"); + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(notFound())); + configurationSource.fetchConfig(config); + assertThat(configurationSource.getValue("foo")).isNull(); + } + + @Test + public void testApmServerCantReachKibana() { + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(serviceUnavailable())); + assertThat(configurationSource.fetchConfig(config)).isNull(); + verify(mockLogger, times(1)).error(contains("Remote configuration is not available")); + } + + @Test + public void testApmServerError() { + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(serverError())); + assertThat(configurationSource.fetchConfig(config)).isNull(); + verify(mockLogger, times(1)).error(contains("Unexpected status 500 while fetching configuration")); + } + + @Test + public void testApmServerCentralConfigDisabled() { + mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(WireMock.forbidden())); + assertThat(configurationSource.fetchConfig(config)).isNull(); + verify(mockLogger).debug(contains("Central configuration is disabled")); + } + + @Test + public void parseMaxAgeFromCacheControlHeader() { + assertThat(ApmServerConfigurationSource.parseMaxAge("max-age=1")).isEqualTo(1); + assertThat(ApmServerConfigurationSource.parseMaxAge("max-age= 1")).isEqualTo(1); + assertThat(ApmServerConfigurationSource.parseMaxAge("max-age =1")).isEqualTo(1); + assertThat(ApmServerConfigurationSource.parseMaxAge("max-age = 1")).isEqualTo(1); + assertThat(ApmServerConfigurationSource.parseMaxAge("public, max-age = 42")).isEqualTo(42); + assertThat(ApmServerConfigurationSource.parseMaxAge("max-age= 42 , public")).isEqualTo(42); + assertThat(ApmServerConfigurationSource.parseMaxAge("public")).isNull(); + assertThat(ApmServerConfigurationSource.parseMaxAge(null)).isNull(); + } + +} diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java index 0a4b660092..94cded7c36 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,6 @@ import co.elastic.apm.agent.metrics.MetricRegistry; import co.elastic.apm.agent.report.ReporterConfiguration; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java new file mode 100644 index 0000000000..e01c412ebc --- /dev/null +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java @@ -0,0 +1,151 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2019 Elastic and contributors + * %% + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * #L% + */ +package co.elastic.apm.agent.report; + +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.util.List; + +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.notFound; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class ApmServerClientTest { + + @Rule + public WireMockRule apmServer1 = new WireMockRule(WireMockConfiguration.wireMockConfig().dynamicPort()); + @Rule + public WireMockRule apmServer2 = new WireMockRule(WireMockConfiguration.wireMockConfig().dynamicPort()); + private ApmServerClient apmServerClient; + + @Before + public void setUp() throws MalformedURLException { + apmServer1.stubFor(get(urlEqualTo("/test")).willReturn(notFound())); + apmServer1.stubFor(get(urlEqualTo("/not-found")).willReturn(notFound())); + apmServer2.stubFor(get(urlEqualTo("/test")).willReturn(ok("hello from server 2"))); + apmServer2.stubFor(get(urlEqualTo("/not-found")).willReturn(notFound())); + apmServerClient = new ApmServerClient(new ReporterConfiguration(), List.of( + new URL("http", "localhost", apmServer1.port(), "/"), + new URL("http", "localhost", apmServer2.port(), "/") + )); + } + + @Test + public void testInitialCurrentUrlIsFirstUrl() throws Exception { + assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer1.port()); + + apmServerClient.execute("/test", HttpURLConnection::getResponseCode); + + apmServer1.verify(1, getRequestedFor(urlEqualTo("/test"))); + apmServer2.verify(0, getRequestedFor(urlEqualTo("/test"))); + } + + @Test + public void testUseNextUrlOnError() throws Exception { + apmServerClient.incrementAndGetErrorCount(0); + assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer2.port()); + + apmServerClient.execute("/test", HttpURLConnection::getResponseCode); + + apmServer1.verify(0, getRequestedFor(urlEqualTo("/test"))); + apmServer2.verify(1, getRequestedFor(urlEqualTo("/test"))); + assertThat(apmServerClient.getErrorCount()).isEqualTo(1); + } + + @Test + public void testWrapUrlsOnConsecutiveError() throws Exception { + int expectedErrorCount = apmServerClient.incrementAndGetErrorCount(0); + apmServerClient.incrementAndGetErrorCount(expectedErrorCount); + + testInitialCurrentUrlIsFirstUrl(); + assertThat(apmServerClient.getErrorCount()).isEqualTo(2); + } + + @Test + public void testRetry() throws Exception { + assertThat(apmServerClient.execute("/test", conn -> new String(conn.getInputStream().readAllBytes()))).isEqualTo("hello from server 2"); + assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer2.port()); + apmServer1.verify(1, getRequestedFor(urlEqualTo("/test"))); + apmServer2.verify(1, getRequestedFor(urlEqualTo("/test"))); + assertThat(apmServerClient.getErrorCount()).isEqualTo(1); + } + + @Test + public void testRetryFailure() { + assertThatThrownBy(() -> apmServerClient.execute("/not-found", URLConnection::getInputStream)) + .isInstanceOf(FileNotFoundException.class) + .matches(t -> t.getSuppressed().length == 1, "should have a suppressed exception"); + apmServer1.verify(1, getRequestedFor(urlEqualTo("/not-found"))); + apmServer2.verify(1, getRequestedFor(urlEqualTo("/not-found"))); + // two failures -> urls wrap + assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer1.port()); + assertThat(apmServerClient.getErrorCount()).isEqualTo(2); + } + + @Test + public void testExecuteSuccessfullyForAllUrls() { + apmServerClient.executeForAllUrls("/not-found", connection -> { + connection.getResponseCode(); + return null; + }); + apmServer1.verify(1, getRequestedFor(urlEqualTo("/not-found"))); + apmServer2.verify(1, getRequestedFor(urlEqualTo("/not-found"))); + // no failures -> urls in initial state + assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer1.port()); + assertThat(apmServerClient.getErrorCount()).isZero(); + } + + @Test + public void testExecuteFailureForAllUrls() { + // exception will only be logged, not thrown + apmServerClient.executeForAllUrls("/not-found", connection -> { + connection.getInputStream(); + return null; + }); + apmServer1.verify(1, getRequestedFor(urlEqualTo("/not-found"))); + apmServer2.verify(1, getRequestedFor(urlEqualTo("/not-found"))); + assertThat(apmServerClient.getErrorCount()).isEqualTo(0); + } + + @Test + public void testSimulateConcurrentConnectionError() { + apmServerClient.incrementAndGetErrorCount(0); + apmServerClient.incrementAndGetErrorCount(0); + assertThat(apmServerClient.getErrorCount()).isOne(); + } +} diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerReporterIntegrationTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerReporterIntegrationTest.java index e0e2b9dba6..f2288feeff 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerReporterIntegrationTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerReporterIntegrationTest.java @@ -28,6 +28,7 @@ import co.elastic.apm.agent.configuration.CoreConfiguration; import co.elastic.apm.agent.configuration.SpyConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.MetaData; import co.elastic.apm.agent.impl.error.ErrorCapture; import co.elastic.apm.agent.impl.payload.ProcessInfo; import co.elastic.apm.agent.impl.payload.Service; @@ -100,8 +101,12 @@ void setUp() throws Exception { final Service service = new Service(); final ProcessInfo title = new ProcessInfo("title"); final ProcessorEventHandler processorEventHandler = ProcessorEventHandler.loadProcessors(config); - final IntakeV2ReportingEventHandler v2handler = new IntakeV2ReportingEventHandler(service, title, system, reporterConfiguration, - processorEventHandler, new DslJsonSerializer(mock(StacktraceConfiguration.class)), Collections.emptyMap()); + final IntakeV2ReportingEventHandler v2handler = new IntakeV2ReportingEventHandler( + reporterConfiguration, + processorEventHandler, + new DslJsonSerializer(mock(StacktraceConfiguration.class)), + new MetaData(title, service, system, Collections.emptyMap()), + new ApmServerClient(reporterConfiguration)); reporter = new ApmServerReporter(false, reporterConfiguration, config.getConfig(CoreConfiguration.class), v2handler); } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java index 625fb73fb6..3bb2bd6cea 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,6 +27,7 @@ import co.elastic.apm.agent.MockTracer; import co.elastic.apm.agent.configuration.SpyConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.MetaData; import co.elastic.apm.agent.impl.error.ErrorCapture; import co.elastic.apm.agent.impl.payload.ProcessInfo; import co.elastic.apm.agent.impl.payload.Service; @@ -80,6 +81,7 @@ class IntakeV2ReportingEventHandlerTest { public WireMockRule mockApmServer2 = new WireMockRule(WireMockConfiguration.wireMockConfig().dynamicPort()); private IntakeV2ReportingEventHandler reportingEventHandler; private IntakeV2ReportingEventHandler nonConnectedReportingEventHandler; + private ApmServerClient apmServerClient; @Nonnull private static JsonNode getReadTree(String s) { @@ -98,22 +100,26 @@ void setUp() throws Exception { final ConfigurationRegistry configurationRegistry = SpyConfiguration.createSpyConfig(); final ReporterConfiguration reporterConfiguration = configurationRegistry.getConfig(ReporterConfiguration.class); SystemInfo system = new SystemInfo("x64", "localhost", "platform"); - reportingEventHandler = new IntakeV2ReportingEventHandler(new Service(), new ProcessInfo("title"), system, + final ProcessInfo title = new ProcessInfo("title"); + final Service service = new Service(); + apmServerClient = new ApmServerClient(reporterConfiguration, List.of( + new URL(HTTP_LOCALHOST + mockApmServer1.port()), + // testing ability to configure a server url with additional path (ending with "/" in this case) + new URL(HTTP_LOCALHOST + mockApmServer2.port() + APM_SERVER_PATH + "/") + )); + reportingEventHandler = new IntakeV2ReportingEventHandler( reporterConfiguration, mock(ProcessorEventHandler.class), new DslJsonSerializer(mock(StacktraceConfiguration.class)), - List.of( - new URL(HTTP_LOCALHOST + mockApmServer1.port()), - // testing ability to configure a server url with additional path (ending with "/" in this case) - new URL(HTTP_LOCALHOST + mockApmServer2.port() + APM_SERVER_PATH + "/") - ), Collections.emptyMap()); - nonConnectedReportingEventHandler = new IntakeV2ReportingEventHandler(new Service(), new ProcessInfo("title"), system, + new MetaData(title, service, system, Collections.emptyMap()), apmServerClient); + final ProcessInfo title1 = new ProcessInfo("title"); + final Service service1 = new Service(); + nonConnectedReportingEventHandler = new IntakeV2ReportingEventHandler( reporterConfiguration, mock(ProcessorEventHandler.class), new DslJsonSerializer(mock(StacktraceConfiguration.class)), - List.of( - new URL("http://non.existing:8080") - ), Collections.emptyMap()); + new MetaData(title1, service1, system, Collections.emptyMap()), + new ApmServerClient(reporterConfiguration, List.of(new URL("http://non.existing:8080")))); } @AfterEach @@ -123,13 +129,13 @@ void tearDown() { @Test void testUrls() throws MalformedURLException { - URL server1url = reportingEventHandler.getUrl(); + URL server1url = apmServerClient.appendPathToCurrentUrl(INTAKE_V2_URL); assertThat(server1url.toString()).isEqualTo(HTTP_LOCALHOST + mockApmServer1.port() + INTAKE_V2_URL); - reportingEventHandler.switchToNextServerUrl(); - URL server2url = reportingEventHandler.getUrl(); + apmServerClient.onConnectionError(); + URL server2url = apmServerClient.appendPathToCurrentUrl(INTAKE_V2_URL); assertThat(server2url.toString()).isEqualTo(HTTP_LOCALHOST + mockApmServer2.port() + APM_SERVER_PATH + INTAKE_V2_URL); // just to restore - reportingEventHandler.switchToNextServerUrl(); + apmServerClient.onConnectionError(); } @Test diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ReporterFactoryTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ReporterFactoryTest.java index 9030ba3cd2..0e694d910d 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ReporterFactoryTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ReporterFactoryTest.java @@ -26,6 +26,7 @@ import co.elastic.apm.agent.configuration.SpyConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.MetaData; import co.elastic.apm.agent.impl.transaction.Transaction; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.server.HttpConfiguration; @@ -109,7 +110,7 @@ private int getPort() { @Test void testNotValidatingSslCertificate() throws Exception { when(reporterConfiguration.isVerifyServerCert()).thenReturn(false); - final Reporter reporter = reporterFactory.createReporter(configuration, null, null); + final Reporter reporter = reporterFactory.createReporter(configuration, new ApmServerClient(reporterConfiguration), MetaData.create(configuration, null, null)); reporter.report(new Transaction(mock(ElasticApmTracer.class))); reporter.flush().get(); @@ -121,7 +122,7 @@ void testNotValidatingSslCertificate() throws Exception { @Test void testValidatingSslCertificate() throws Exception { when(reporterConfiguration.isVerifyServerCert()).thenReturn(true); - final Reporter reporter = reporterFactory.createReporter(configuration, null, null); + final Reporter reporter = reporterFactory.createReporter(configuration, new ApmServerClient(reporterConfiguration), MetaData.create(configuration, null, null)); reporter.report(new Transaction(mock(ElasticApmTracer.class))); reporter.flush().get(); diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT_RealReporter.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT_RealReporter.java index e4410c84ed..c43c838d4f 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT_RealReporter.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT_RealReporter.java @@ -29,6 +29,7 @@ import co.elastic.apm.agent.configuration.SpyConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.ElasticApmTracerBuilder; +import co.elastic.apm.agent.impl.MetaData; import co.elastic.apm.agent.impl.payload.Agent; import co.elastic.apm.agent.impl.payload.ProcessInfo; import co.elastic.apm.agent.impl.payload.Service; @@ -36,6 +37,7 @@ import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; import co.elastic.apm.agent.impl.transaction.TraceContext; import co.elastic.apm.agent.impl.transaction.Transaction; +import co.elastic.apm.agent.report.ApmServerClient; import co.elastic.apm.agent.report.ApmServerReporter; import co.elastic.apm.agent.report.IntakeV2ReportingEventHandler; import co.elastic.apm.agent.report.Reporter; @@ -138,8 +140,12 @@ public static void startElasticsearchContainerAndClient() throws IOException { final Service service = new Service().withName("Eyal-ES-client-test").withAgent(new Agent("java", "Test")); final ProcessInfo title = new ProcessInfo("title"); final ProcessorEventHandler processorEventHandler = ProcessorEventHandler.loadProcessors(configurationRegistry); - final IntakeV2ReportingEventHandler v2handler = new IntakeV2ReportingEventHandler(service, title, system, reporterConfiguration, - processorEventHandler, new DslJsonSerializer(mock(StacktraceConfiguration.class)), Collections.emptyMap()); + final IntakeV2ReportingEventHandler v2handler = new IntakeV2ReportingEventHandler( + reporterConfiguration, + processorEventHandler, + new DslJsonSerializer(mock(StacktraceConfiguration.class)), + new MetaData(title, service, system, Collections.emptyMap()), + new ApmServerClient(reporterConfiguration)); realReporter = new ApmServerReporter(true, reporterConfiguration, configurationRegistry.getConfig(CoreConfiguration.class), v2handler); tracer = new ElasticApmTracerBuilder() diff --git a/pom.xml b/pom.xml index 317fe3979f..591ec8905e 100644 --- a/pom.xml +++ b/pom.xml @@ -498,7 +498,7 @@ org.assertj assertj-core - 3.11.1 + 3.12.2 test