From d16a70b875a4fb6dbf35cc9dcec4de1ddabaa96f Mon Sep 17 00:00:00 2001 From: Yang Song Date: Wed, 8 May 2019 11:32:19 -0700 Subject: [PATCH] Exporter/Datadog, Elasticsearch: Add deadline option. --- CHANGELOG.md | 1 + .../trace/datadog/DatadogExporterHandler.java | 71 ++++++--- .../datadog/DatadogTraceConfiguration.java | 42 +++++- .../trace/datadog/DatadogTraceExporter.java | 10 +- .../datadog/DatadogExporterHandlerTest.java | 4 +- .../ElasticsearchTraceConfiguration.java | 29 +++- .../ElasticsearchTraceHandler.java | 139 +++++++++++------- 7 files changed, 207 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e01cb0bd8e..9b7a160aee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Monitoring client to make RPCs. with default value 10 seconds. - Use `ZipkinExporterConfiguration` for creating `ZipkinTraceExporter`. Provide a `Deadline` option with default value 10 seconds. +- Provide a `Deadline` option to Datadog and Elasticsearch exporter. Default value is 10 seconds. ## 0.21.0 - 2019-04-30 - Add HTTP text format serializer to Tag propagation component. diff --git a/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandler.java b/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandler.java index 83c081ca58..d2f1fb4697 100644 --- a/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandler.java +++ b/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandler.java @@ -16,9 +16,12 @@ package io.opencensus.exporter.trace.datadog; +import com.google.common.util.concurrent.SimpleTimeLimiter; +import com.google.common.util.concurrent.TimeLimiter; import com.google.gson.FieldNamingPolicy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import io.opencensus.common.Duration; import io.opencensus.common.Functions; import io.opencensus.common.Scope; import io.opencensus.common.Timestamp; @@ -44,7 +47,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -65,12 +71,14 @@ final class DatadogExporterHandler extends SpanExporter.Handler { private final URL agentEndpoint; private final String service; private final String type; + private final Duration deadline; - DatadogExporterHandler(final String agentEndpoint, final String service, final String type) + DatadogExporterHandler(String agentEndpoint, String service, String type, Duration deadline) throws MalformedURLException { this.agentEndpoint = new URL(agentEndpoint); this.service = service; this.type = type; + this.deadline = deadline; } private static String attributeValueToString(AttributeValue attributeValue) { @@ -163,7 +171,7 @@ String convertToJson(Collection spanDataList) { } @Override - public void export(Collection spanDataList) { + public void export(final Collection spanDataList) { // Start a new span with explicit 1/10000 sampling probability to avoid the case when user // sets the default sampler to always sample and we get the gRPC span of the datadog // export call always sampled and go to an infinite loop. @@ -172,29 +180,44 @@ public void export(Collection spanDataList) { .spanBuilder("ExportDatadogTraces") .setSampler(probabilitySpampler) .startScopedSpan()) { + TimeLimiter timeLimiter = SimpleTimeLimiter.create(Executors.newSingleThreadExecutor()); + timeLimiter.callWithTimeout( + new Callable() { + @Override + public Void call() throws Exception { + doExport(spanDataList); + return null; + } + }, + deadline.toMillis(), + TimeUnit.MILLISECONDS); + } catch (Exception e) { + handleException(e); + } + } - final String data = convertToJson(spanDataList); - - final HttpURLConnection connection = (HttpURLConnection) agentEndpoint.openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setDoOutput(true); - OutputStream outputStream = connection.getOutputStream(); - outputStream.write(data.getBytes(Charset.defaultCharset())); - outputStream.flush(); - outputStream.close(); - if (connection.getResponseCode() != 200) { - tracer - .getCurrentSpan() - .setStatus(Status.UNKNOWN.withDescription("Response " + connection.getResponseCode())); - } - } catch (IOException e) { - tracer - .getCurrentSpan() - .setStatus( - Status.UNKNOWN.withDescription( - e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); - // drop span batch + private void doExport(Collection spanDataList) throws IOException { + final String data = convertToJson(spanDataList); + + final HttpURLConnection connection = (HttpURLConnection) agentEndpoint.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); + OutputStream outputStream = connection.getOutputStream(); + outputStream.write(data.getBytes(Charset.defaultCharset())); + outputStream.flush(); + outputStream.close(); + if (connection.getResponseCode() != 200) { + handleException(new Exception("Response " + connection.getResponseCode())); } } + + private static void handleException(Exception e) { + Status status = e instanceof TimeoutException ? Status.DEADLINE_EXCEEDED : Status.UNKNOWN; + tracer + .getCurrentSpan() + .setStatus( + status.withDescription( + e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + } } diff --git a/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceConfiguration.java b/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceConfiguration.java index 408153d4e0..face01c640 100644 --- a/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceConfiguration.java +++ b/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceConfiguration.java @@ -17,6 +17,9 @@ package io.opencensus.exporter.trace.datadog; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.opencensus.common.Duration; import javax.annotation.concurrent.Immutable; /** @@ -28,6 +31,9 @@ @Immutable public abstract class DatadogTraceConfiguration { + @VisibleForTesting static final Duration DEFAULT_DEADLINE = Duration.create(10, 0); + @VisibleForTesting static final Duration ZERO = Duration.fromMillis(0); + DatadogTraceConfiguration() {} /** @@ -54,6 +60,16 @@ public abstract class DatadogTraceConfiguration { */ public abstract String getType(); + /** + * Returns the deadline for exporting to Datadog. + * + *

Default value is 10 seconds. + * + * @return the export deadline. + * @since 0.22 + */ + public abstract Duration getDeadline(); + /** * Return a new {@link Builder}. * @@ -61,7 +77,7 @@ public abstract class DatadogTraceConfiguration { * @since 0.19 */ public static Builder builder() { - return new AutoValue_DatadogTraceConfiguration.Builder(); + return new AutoValue_DatadogTraceConfiguration.Builder().setDeadline(DEFAULT_DEADLINE); } /** @@ -101,6 +117,28 @@ public abstract static class Builder { */ public abstract Builder setType(String type); - public abstract DatadogTraceConfiguration build(); + /** + * Sets the deadline for exporting to Datadog. + * + * @param deadline the export deadline. + * @return this + * @since 0.22 + */ + public abstract Builder setDeadline(Duration deadline); + + abstract Duration getDeadline(); + + abstract DatadogTraceConfiguration autoBuild(); + + /** + * Builds a {@link DatadogTraceConfiguration}. + * + * @return a {@code DatadogTraceConfiguration}. + * @since 0.22 + */ + public DatadogTraceConfiguration build() { + Preconditions.checkArgument(getDeadline().compareTo(ZERO) > 0, "Deadline must be positive."); + return autoBuild(); + } } } diff --git a/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceExporter.java b/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceExporter.java index c0a2c24ad1..88c22bb011 100644 --- a/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceExporter.java +++ b/exporters/trace/datadog/src/main/java/io/opencensus/exporter/trace/datadog/DatadogTraceExporter.java @@ -32,10 +32,10 @@ *

{@code
  * public static void main(String[] args) {
  *   DatadogTraceConfiguration config = DatadogTraceConfiguration.builder()
- * .setAgentEndpoint("http://localhost:8126/v0.3/traces")
- * .setService("myService")
- * .setType("web")
- * .build();
+ *     .setAgentEndpoint("http://localhost:8126/v0.3/traces")
+ *     .setService("myService")
+ *     .setType("web")
+ *     .build();
  * DatadogTraceExporter.createAndRegister(config);
  *   ... // Do work.
  * }
@@ -72,7 +72,7 @@ public static void createAndRegister(DatadogTraceConfiguration configuration)
       String type = configuration.getType();
 
       final DatadogExporterHandler exporterHandler =
-          new DatadogExporterHandler(agentEndpoint, service, type);
+          new DatadogExporterHandler(agentEndpoint, service, type, configuration.getDeadline());
       handler = exporterHandler;
       Tracing.getExportComponent()
           .getSpanExporter()
diff --git a/exporters/trace/datadog/src/test/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandlerTest.java b/exporters/trace/datadog/src/test/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandlerTest.java
index 63460eda3c..c2c268ad7b 100644
--- a/exporters/trace/datadog/src/test/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandlerTest.java
+++ b/exporters/trace/datadog/src/test/java/io/opencensus/exporter/trace/datadog/DatadogExporterHandlerTest.java
@@ -17,6 +17,7 @@
 package io.opencensus.exporter.trace.datadog;
 
 import static com.google.common.truth.Truth.assertThat;
+import static io.opencensus.exporter.trace.datadog.DatadogTraceConfiguration.DEFAULT_DEADLINE;
 
 import com.google.common.collect.ImmutableMap;
 import io.opencensus.common.Timestamp;
@@ -56,7 +57,8 @@ public class DatadogExporterHandlerTest {
 
   @Before
   public void setup() throws Exception {
-    this.handler = new DatadogExporterHandler("http://localhost", "service", "web");
+    this.handler =
+        new DatadogExporterHandler("http://localhost", "service", "web", DEFAULT_DEADLINE);
   }
 
   @Test
diff --git a/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceConfiguration.java b/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceConfiguration.java
index 8891704e34..8822c0889b 100644
--- a/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceConfiguration.java
+++ b/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceConfiguration.java
@@ -17,8 +17,10 @@
 package io.opencensus.exporter.trace.elasticsearch;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import io.opencensus.common.Duration;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.Immutable;
 
@@ -31,6 +33,9 @@
 @Immutable
 public abstract class ElasticsearchTraceConfiguration {
 
+  @VisibleForTesting static final Duration DEFAULT_DEADLINE = Duration.create(10, 0);
+  @VisibleForTesting static final Duration ZERO = Duration.fromMillis(0);
+
   /**
    * Returns a new {@link Builder}.
    *
@@ -38,7 +43,7 @@ public abstract class ElasticsearchTraceConfiguration {
    * @since 0.20.0
    */
   public static Builder builder() {
-    return new AutoValue_ElasticsearchTraceConfiguration.Builder();
+    return new AutoValue_ElasticsearchTraceConfiguration.Builder().setDeadline(DEFAULT_DEADLINE);
   }
 
   /**
@@ -91,6 +96,16 @@ public static Builder builder() {
    */
   public abstract String getElasticsearchType();
 
+  /**
+   * Returns the deadline for exporting to Elasticsearch.
+   *
+   * 

Default value is 10 seconds. + * + * @return the export deadline. + * @since 0.22 + */ + public abstract Duration getDeadline(); + /** * Builds a {@link ElasticsearchTraceConfiguration}. * @@ -157,6 +172,15 @@ public abstract static class Builder { */ public abstract Builder setElasticsearchType(String elasticsearchType); + /** + * Sets the deadline for exporting to Elasticsearch. + * + * @param deadline the export deadline. + * @return this + * @since 0.22 + */ + public abstract Builder setDeadline(Duration deadline); + /** * Builder for {@link ElasticsearchTraceConfiguration}. * @@ -174,6 +198,9 @@ public ElasticsearchTraceConfiguration build() { Preconditions.checkArgument( !Strings.isNullOrEmpty(elasticsearchTraceConfiguration.getElasticsearchIndex()), "Invalid Elasticsearch type."); + Preconditions.checkArgument( + elasticsearchTraceConfiguration.getDeadline().compareTo(ZERO) > 0, + "Deadline must be positive."); return elasticsearchTraceConfiguration; } } diff --git a/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceHandler.java b/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceHandler.java index 9da33b1c47..7bc6d044a7 100644 --- a/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceHandler.java +++ b/exporters/trace/elasticsearch/src/main/java/io/opencensus/exporter/trace/elasticsearch/ElasticsearchTraceHandler.java @@ -17,6 +17,9 @@ package io.opencensus.exporter.trace.elasticsearch; import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.SimpleTimeLimiter; +import com.google.common.util.concurrent.TimeLimiter; +import io.opencensus.common.Duration; import io.opencensus.common.Scope; import io.opencensus.trace.Sampler; import io.opencensus.trace.Status; @@ -34,12 +37,17 @@ import java.nio.charset.Charset; import java.util.Collection; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; final class ElasticsearchTraceHandler extends SpanExporter.Handler { private final ElasticsearchTraceConfiguration elasticsearchTraceConfiguration; private final String appName; private final URL indexUrl; + private final Duration deadline; private static final String CONTENT_TYPE = "application/json"; private static final String REQUEST_METHOD = "POST"; private static final int CONNECTION_TIMEOUT_MILLISECONDS = 6000; @@ -56,6 +64,7 @@ final class ElasticsearchTraceHandler extends SpanExporter.Handler { sb.append(elasticsearchTraceConfiguration.getElasticsearchType()).append("/"); indexUrl = new URL(sb.toString()); appName = elasticsearchTraceConfiguration.getAppName(); + deadline = elasticsearchTraceConfiguration.getDeadline(); } /** @@ -64,7 +73,7 @@ final class ElasticsearchTraceHandler extends SpanExporter.Handler { * @param spanDataList Collection of {@code SpanData} to be exported. */ @Override - public void export(Collection spanDataList) { + public void export(final Collection spanDataList) { Scope scope = tracer .spanBuilder("ExportElasticsearchTraces") @@ -72,67 +81,85 @@ public void export(Collection spanDataList) { .setRecordEvents(true) .startScopedSpan(); try { - List jsonList = JsonConversionUtils.convertToJson(appName, spanDataList); - if (jsonList.isEmpty()) { - return; - } - for (String json : jsonList) { + TimeLimiter timeLimiter = SimpleTimeLimiter.create(Executors.newSingleThreadExecutor()); + timeLimiter.callWithTimeout( + new Callable() { + @Override + public Void call() { + doExport(spanDataList); + return null; + } + }, + deadline.toMillis(), + TimeUnit.MILLISECONDS); + } catch (Exception e) { + handleException(e); + } finally { + scope.close(); + } + } - OutputStream outputStream = null; - InputStream inputStream = null; + private void doExport(Collection spanDataList) { + List jsonList = JsonConversionUtils.convertToJson(appName, spanDataList); + if (jsonList.isEmpty()) { + return; + } + for (String json : jsonList) { - try { - HttpURLConnection connection = (HttpURLConnection) indexUrl.openConnection(); - if (elasticsearchTraceConfiguration.getUserName() != null) { - String parameters = - BaseEncoding.base64() - .encode( - (elasticsearchTraceConfiguration.getUserName() - + ":" - + elasticsearchTraceConfiguration.getPassword()) - .getBytes("UTF-8")); - connection.setRequestProperty("Authorization", "Basic " + parameters); - } - connection.setRequestMethod(REQUEST_METHOD); - connection.setDoOutput(true); - connection.setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS); - connection.setRequestProperty("Content-Type", CONTENT_TYPE); - outputStream = connection.getOutputStream(); - outputStream.write(json.getBytes(Charset.defaultCharset())); - outputStream.flush(); - inputStream = connection.getInputStream(); - if (connection.getResponseCode() != 200) { - tracer - .getCurrentSpan() - .setStatus( - Status.UNKNOWN.withDescription("Response " + connection.getResponseCode())); - } - } catch (IOException e) { - tracer - .getCurrentSpan() - .setStatus( - Status.UNKNOWN.withDescription( - e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); - // dropping span batch - } finally { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - // ignore - } + OutputStream outputStream = null; + InputStream inputStream = null; + + try { + HttpURLConnection connection = (HttpURLConnection) indexUrl.openConnection(); + if (elasticsearchTraceConfiguration.getUserName() != null) { + String parameters = + BaseEncoding.base64() + .encode( + (elasticsearchTraceConfiguration.getUserName() + + ":" + + elasticsearchTraceConfiguration.getPassword()) + .getBytes("UTF-8")); + connection.setRequestProperty("Authorization", "Basic " + parameters); + } + connection.setRequestMethod(REQUEST_METHOD); + connection.setDoOutput(true); + connection.setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS); + connection.setRequestProperty("Content-Type", CONTENT_TYPE); + outputStream = connection.getOutputStream(); + outputStream.write(json.getBytes(Charset.defaultCharset())); + outputStream.flush(); + inputStream = connection.getInputStream(); + if (connection.getResponseCode() != 200) { + handleException(new Exception("Response " + connection.getResponseCode())); + } + } catch (IOException e) { + handleException(e); + // dropping span batch + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + // ignore } - if (outputStream != null) { - try { - outputStream.close(); - } catch (IOException e) { - // ignore - } + } + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + // ignore } } } - } finally { - scope.close(); } } + + private static void handleException(Exception e) { + Status status = e instanceof TimeoutException ? Status.DEADLINE_EXCEEDED : Status.UNKNOWN; + tracer + .getCurrentSpan() + .setStatus( + status.withDescription( + e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + } }