From d65b45fc034f0f47496aa565095f0be23e867549 Mon Sep 17 00:00:00 2001
From: Felix Barnsteiner
Date: Tue, 21 May 2019 17:22:29 +0200
Subject: [PATCH 1/7] Remote config
closes #636
---
.../ApmServerConfigurationSource.java | 108 +++++++++++++++
.../agent/impl/ElasticApmTracerBuilder.java | 12 +-
.../co/elastic/apm/agent/impl/MetaData.java | 16 +++
.../apm/agent/report/ApmServerClient.java | 127 ++++++++++++++++++
.../agent/report/ApmServerHealthChecker.java | 93 +++++--------
.../report/IntakeV2ReportingEventHandler.java | 91 ++-----------
.../apm/agent/report/ReporterFactory.java | 39 ++----
.../report/serialize/DslJsonSerializer.java | 9 +-
.../report/serialize/PayloadSerializer.java | 2 +
.../ApmServerConfigurationSourceTest.java | 88 ++++++++++++
.../ApmServerReporterIntegrationTest.java | 6 +-
.../IntakeV2ReportingEventHandlerTest.java | 38 +++---
.../apm/agent/report/ReporterFactoryTest.java | 5 +-
...tClientInstrumentationIT_RealReporter.java | 8 +-
14 files changed, 445 insertions(+), 197 deletions(-)
create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java
create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java
create mode 100644 apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java
new file mode 100644
index 0000000000..a9c689a80b
--- /dev/null
+++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java
@@ -0,0 +1,108 @@
+/*-
+ * #%L
+ * Elastic APM Java agent
+ * %%
+ * Copyright (C) 2018 - 2019 Elastic and contributors
+ * %%
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * #L%
+ */
+package co.elastic.apm.agent.configuration;
+
+import co.elastic.apm.agent.impl.MetaData;
+import co.elastic.apm.agent.report.ApmServerClient;
+import co.elastic.apm.agent.report.serialize.PayloadSerializer;
+import com.dslplatform.json.DslJson;
+import com.dslplatform.json.JsonReader;
+import com.dslplatform.json.MapConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.stagemonitor.configuration.source.AbstractConfigurationSource;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.Map;
+
+public class ApmServerConfigurationSource extends AbstractConfigurationSource {
+ private static final Logger logger = LoggerFactory.getLogger(ApmServerConfigurationSource.class);
+
+ private final DslJson
* @param expectedErrorCount the error count that is expected by the current thread
- * @return the current error count
+ * @return the new expected error count
*/
int incrementAndGetErrorCount(int expectedErrorCount) {
- int previousValue = errorCount.compareAndExchange(expectedErrorCount, expectedErrorCount + 1);
- if (previousValue == expectedErrorCount) {
+ boolean success = errorCount.compareAndSet(expectedErrorCount, expectedErrorCount + 1);
+ if (success) {
return expectedErrorCount + 1;
} else {
- return previousValue;
+ // this thread has a stale error count and may not increment the error count when another retry fails
+ return -1;
}
}
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java
index f6061c2caa..b10f3cbb42 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java
@@ -73,7 +73,7 @@ public Void withConnection(HttpURLConnection connection) {
logger.info("Elastic APM server is available: {}", HttpUtils.getBody(connection));
}
} catch (Exception e) {
- logger.warn("Elastic APM server is not available ({})", e.getMessage());
+ logger.warn("Elastic APM server {} is not available ({})", connection.getURL(), e.getMessage());
}
return null;
}
diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java
index f5b598fdb3..8dde278be7 100644
--- a/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java
+++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSourceTest.java
@@ -116,14 +116,14 @@ public void configDeleted() {
public void testApmServerCantReachKibana() {
mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(serviceUnavailable()));
assertThat(configurationSource.fetchConfig(config)).isNull();
- verify(mockLogger, times(1)).error(contains("Remote configuration is not available"), isA(IllegalStateException.class));
+ verify(mockLogger, times(1)).error(contains("Remote configuration is not available"));
}
@Test
public void testApmServerError() {
mockApmServer.stubFor(post(urlEqualTo("/config/v1/agents")).willReturn(serverError()));
assertThat(configurationSource.fetchConfig(config)).isNull();
- verify(mockLogger, times(1)).error(contains("Unexpected status 500 while fetching configuration"), isA(IllegalStateException.class));
+ verify(mockLogger, times(1)).error(contains("Unexpected status 500 while fetching configuration"));
}
@Test
From 3a78180135813d34aab32fb30a610b346ea34dcd Mon Sep 17 00:00:00 2001
From: Felix Barnsteiner
Date: Fri, 12 Jul 2019 12:52:24 +0200
Subject: [PATCH 6/7] Log warning if the config is unknown or not reloadable
---
.../configuration/ApmServerConfigurationSource.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java
index ccb187789c..48efe1c8fa 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/ApmServerConfigurationSource.java
@@ -35,6 +35,7 @@
import com.dslplatform.json.MapConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationRegistry;
import org.stagemonitor.configuration.source.AbstractConfigurationSource;
import org.stagemonitor.util.IOUtils;
@@ -188,6 +189,14 @@ private String tryFetchConfig(ConfigurationRegistry configurationRegistry, HttpU
IOUtils.consumeAndClose(is);
configurationRegistry.reloadDynamicConfigurationOptions();
logger.info("Received new configuration from APM Server: {}", config);
+ for (Map.Entry entry : config.entrySet()) {
+ ConfigurationOption> conf = configurationRegistry.getConfigurationOptionByKey(entry.getKey());
+ if (conf == null) {
+ logger.warn("Received unknown remote configuration key {}", entry.getKey());
+ } else if (!conf.isDynamic()) {
+ logger.warn("Can't apply remote configuration {} as this option is not dynamic (aka. reloadable)", entry.getKey());
+ }
+ }
break;
case SC_NOT_MODIFIED:
logger.debug("Configuration did not change");
From 63f25f6d9ef7b5f55ae9328d9bdd31b216a3d7c0 Mon Sep 17 00:00:00 2001
From: Felix Barnsteiner
Date: Wed, 17 Jul 2019 13:18:27 +0200
Subject: [PATCH 7/7] Apply review suggestions
---
.../apm/agent/report/ApmServerClient.java | 16 ++-----
.../agent/report/ApmServerHealthChecker.java | 46 +++++++++----------
.../apm/agent/report/ApmServerClientTest.java | 37 ++++++++-------
3 files changed, 44 insertions(+), 55 deletions(-)
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java
index cdf4e16f31..6bde86883d 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java
@@ -201,30 +201,20 @@ public V execute(String path, ConnectionHandler connectionHandler) throws
throw previousException;
}
- public void executeForAllUrls(String path, ConnectionHandler connectionHandler) throws Exception {
- int expectedErrorCount = errorCount.get();
- Exception previousException = null;
- for (URL serverUrl : getPrioritizedUrlList()) {
+ public void executeForAllUrls(String path, ConnectionHandler connectionHandler) {
+ for (URL serverUrl : serverUrls) {
HttpURLConnection connection = null;
try {
connection = startRequestToUrl(appendPath(serverUrl, path));
connectionHandler.withConnection(connection);
} catch (Exception e) {
- expectedErrorCount = incrementAndGetErrorCount(expectedErrorCount);
- logger.debug("Exception while interacting with APM Server, trying next one.");
- if (previousException != null) {
- e.addSuppressed(previousException);
- }
- previousException = e;
+ logger.debug("Exception while interacting with APM Server", e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
- if (previousException != null) {
- throw previousException;
- }
}
URL getCurrentUrl() {
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java
index b10f3cbb42..1d20e848d7 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java
@@ -51,36 +51,32 @@ public void start(ElasticApmTracer tracer) {
@Override
public void run() {
- try {
- apmServerClient.executeForAllUrls("/", new ApmServerClient.ConnectionHandler() {
- @Override
- public Void withConnection(HttpURLConnection connection) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Starting healthcheck to {}", connection.getURL());
- }
+ apmServerClient.executeForAllUrls("/", new ApmServerClient.ConnectionHandler() {
+ @Override
+ public Void withConnection(HttpURLConnection connection) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting healthcheck to {}", connection.getURL());
+ }
- final int status = connection.getResponseCode();
- if (status >= 300) {
- if (status == 404) {
- throw new IllegalStateException("It seems like you are using a version of the APM Server which is not compatible with this agent. " +
- "Please use APM Server 6.5.0 or newer.");
- } else {
- throw new IllegalStateException("Server returned status " + status);
- }
+ final int status = connection.getResponseCode();
+ if (status >= 300) {
+ if (status == 404) {
+ throw new IllegalStateException("It seems like you are using a version of the APM Server which is not compatible with this agent. " +
+ "Please use APM Server 6.5.0 or newer.");
} else {
- // prints out the version info of the APM Server
- logger.info("Elastic APM server is available: {}", HttpUtils.getBody(connection));
+ throw new IllegalStateException("Server returned status " + status);
}
- } catch (Exception e) {
- logger.warn("Elastic APM server {} is not available ({})", connection.getURL(), e.getMessage());
+ } else {
+ // prints out the version info of the APM Server
+ logger.info("Elastic APM server is available: {}", HttpUtils.getBody(connection));
}
- return null;
+ } catch (Exception e) {
+ logger.warn("Elastic APM server {} is not available ({})", connection.getURL(), e.getMessage());
}
- });
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return null;
+ }
+ });
}
@Override
diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java
index fd22e40bb9..e01c412ebc 100644
--- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java
+++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerClientTest.java
@@ -71,7 +71,7 @@ public void testInitialCurrentUrlIsFirstUrl() throws Exception {
apmServerClient.execute("/test", HttpURLConnection::getResponseCode);
- apmServer1.verify(getRequestedFor(urlEqualTo("/test")));
+ apmServer1.verify(1, getRequestedFor(urlEqualTo("/test")));
apmServer2.verify(0, getRequestedFor(urlEqualTo("/test")));
}
@@ -83,7 +83,8 @@ public void testUseNextUrlOnError() throws Exception {
apmServerClient.execute("/test", HttpURLConnection::getResponseCode);
apmServer1.verify(0, getRequestedFor(urlEqualTo("/test")));
- apmServer2.verify(getRequestedFor(urlEqualTo("/test")));
+ apmServer2.verify(1, getRequestedFor(urlEqualTo("/test")));
+ assertThat(apmServerClient.getErrorCount()).isEqualTo(1);
}
@Test
@@ -92,14 +93,16 @@ public void testWrapUrlsOnConsecutiveError() throws Exception {
apmServerClient.incrementAndGetErrorCount(expectedErrorCount);
testInitialCurrentUrlIsFirstUrl();
+ assertThat(apmServerClient.getErrorCount()).isEqualTo(2);
}
@Test
public void testRetry() throws Exception {
assertThat(apmServerClient.execute("/test", conn -> new String(conn.getInputStream().readAllBytes()))).isEqualTo("hello from server 2");
assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer2.port());
- apmServer1.verify(getRequestedFor(urlEqualTo("/test")));
- apmServer2.verify(getRequestedFor(urlEqualTo("/test")));
+ apmServer1.verify(1, getRequestedFor(urlEqualTo("/test")));
+ apmServer2.verify(1, getRequestedFor(urlEqualTo("/test")));
+ assertThat(apmServerClient.getErrorCount()).isEqualTo(1);
}
@Test
@@ -107,36 +110,36 @@ public void testRetryFailure() {
assertThatThrownBy(() -> apmServerClient.execute("/not-found", URLConnection::getInputStream))
.isInstanceOf(FileNotFoundException.class)
.matches(t -> t.getSuppressed().length == 1, "should have a suppressed exception");
- apmServer1.verify(getRequestedFor(urlEqualTo("/not-found")));
- apmServer2.verify(getRequestedFor(urlEqualTo("/not-found")));
+ apmServer1.verify(1, getRequestedFor(urlEqualTo("/not-found")));
+ apmServer2.verify(1, getRequestedFor(urlEqualTo("/not-found")));
// two failures -> urls wrap
assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer1.port());
+ assertThat(apmServerClient.getErrorCount()).isEqualTo(2);
}
@Test
- public void testExecuteSuccessfullyForAllUrls() throws Exception {
+ public void testExecuteSuccessfullyForAllUrls() {
apmServerClient.executeForAllUrls("/not-found", connection -> {
connection.getResponseCode();
return null;
});
- apmServer1.verify(getRequestedFor(urlEqualTo("/not-found")));
- apmServer2.verify(getRequestedFor(urlEqualTo("/not-found")));
+ apmServer1.verify(1, getRequestedFor(urlEqualTo("/not-found")));
+ apmServer2.verify(1, getRequestedFor(urlEqualTo("/not-found")));
// no failures -> urls in initial state
assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer1.port());
+ assertThat(apmServerClient.getErrorCount()).isZero();
}
@Test
public void testExecuteFailureForAllUrls() {
- assertThatThrownBy(() -> apmServerClient.executeForAllUrls("/not-found", connection -> {
+ // exception will only be logged, not thrown
+ apmServerClient.executeForAllUrls("/not-found", connection -> {
connection.getInputStream();
return null;
- }))
- .isInstanceOf(FileNotFoundException.class)
- .matches(t -> t.getSuppressed().length == 1, "should have a suppressed exception");
- apmServer1.verify(getRequestedFor(urlEqualTo("/not-found")));
- apmServer2.verify(getRequestedFor(urlEqualTo("/not-found")));
- // two failures -> urls wrap
- assertThat(apmServerClient.getCurrentUrl().getPort()).isEqualTo(apmServer1.port());
+ });
+ apmServer1.verify(1, getRequestedFor(urlEqualTo("/not-found")));
+ apmServer2.verify(1, getRequestedFor(urlEqualTo("/not-found")));
+ assertThat(apmServerClient.getErrorCount()).isEqualTo(0);
}
@Test