diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java index d3ec075c606d5..bc8472286dae8 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java @@ -8,8 +8,10 @@ import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPOutputStream; @@ -22,6 +24,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import io.quarkus.vertx.core.runtime.BufferOutputStream; +import io.smallrye.mutiny.Uni; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -38,6 +41,8 @@ final class VertxHttpExporter implements SpanExporter { private static final Logger internalLogger = Logger.getLogger(VertxHttpExporter.class.getName()); private static final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); + private static final int MAX_ATTEMPTS = 3; + private final HttpExporter delegate; VertxHttpExporter(HttpExporter delegate) { @@ -110,75 +115,35 @@ private static String determineBasePath(URI baseUri) { @Override public void send(Consumer marshaler, int contentLength, - Consumer onResponse, + Consumer onHttpResponseRead, Consumer onError) { - client.request(HttpMethod.POST, basePath + TRACES_PATH) - .onSuccess(new Handler<>() { - @Override - public void handle(HttpClientRequest request) { - - HttpClientRequest clientRequest = request.response(new Handler<>() { - @Override - public void handle(AsyncResult callResult) { - if (callResult.succeeded()) { - HttpClientResponse clientResponse = callResult.result(); - clientResponse.body(new Handler<>() { - @Override - public void handle(AsyncResult bodyResult) { - if (bodyResult.succeeded()) { - onResponse.accept(new Response() { - @Override - public int statusCode() { - return clientResponse.statusCode(); - } - - @Override - public String statusMessage() { - return clientResponse.statusMessage(); - } - - @Override - public byte[] responseBody() { - return bodyResult.result().getBytes(); - } - }); - } else { - onError.accept(bodyResult.cause()); - } - } - }); - } else { - onError.accept(callResult.cause()); - } - } - }) - .putHeader("Content-Type", contentType); - - Buffer buffer = Buffer.buffer(contentLength); - OutputStream os = new BufferOutputStream(buffer); - if (compressionEnabled) { - clientRequest.putHeader("Content-Encoding", "gzip"); - try (var gzos = new GZIPOutputStream(os)) { - marshaler.accept(gzos); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } else { - marshaler.accept(os); - } - - if (!headers.isEmpty()) { - for (var entry : headers.entrySet()) { - clientRequest.putHeader(entry.getKey(), entry.getValue()); - } - } - - clientRequest.send(buffer); + String requestURI = basePath + TRACES_PATH; + var clientRequestSuccessHandler = new ClientRequestSuccessHandler(client, requestURI, headers, compressionEnabled, + contentType, + contentLength, onHttpResponseRead, + onError, marshaler, 1); + initiateSend(client, requestURI, MAX_ATTEMPTS, clientRequestSuccessHandler, onError); + } + private static void initiateSend(HttpClient client, String requestURI, + int numberOfAttempts, + Handler clientRequestSuccessHandler, + Consumer onError) { + Uni.createFrom().completionStage(new Supplier>() { + @Override + public CompletionStage get() { + return client.request(HttpMethod.POST, requestURI).toCompletionStage(); + } + }).onFailure().retry() + .withBackOff(Duration.ofMillis(100)) + .atMost(numberOfAttempts) + .subscribe().with(new Consumer<>() { + @Override + public void accept(HttpClientRequest request) { + clientRequestSuccessHandler.handle(request); } - }) - .onFailure(onError::accept); + }, onError); } @Override @@ -204,5 +169,134 @@ public void handle(Throwable event) { }); return shutdownResult; } + + private static class ClientRequestSuccessHandler implements Handler { + private final HttpClient client; + private final String requestURI; + private final Map headers; + private final boolean compressionEnabled; + private final String contentType; + private final int contentLength; + private final Consumer onHttpResponseRead; + private final Consumer onError; + private final Consumer marshaler; + + private final int attemptNumber; + + public ClientRequestSuccessHandler(HttpClient client, + String requestURI, Map headers, + boolean compressionEnabled, + String contentType, + int contentLength, + Consumer onHttpResponseRead, + Consumer onError, + Consumer marshaler, + int attemptNumber) { + this.client = client; + this.requestURI = requestURI; + this.headers = headers; + this.compressionEnabled = compressionEnabled; + this.contentType = contentType; + this.contentLength = contentLength; + this.onHttpResponseRead = onHttpResponseRead; + this.onError = onError; + this.marshaler = marshaler; + this.attemptNumber = attemptNumber; + } + + @Override + public void handle(HttpClientRequest request) { + + HttpClientRequest clientRequest = request.response(new Handler<>() { + @Override + public void handle(AsyncResult callResult) { + if (callResult.succeeded()) { + HttpClientResponse clientResponse = callResult.result(); + clientResponse.body(new Handler<>() { + @Override + public void handle(AsyncResult bodyResult) { + if (bodyResult.succeeded()) { + if (clientResponse.statusCode() >= 500) { + if (attemptNumber <= MAX_ATTEMPTS) { + // we should retry for 5xx error as they might be recoverable + initiateSend(client, requestURI, + MAX_ATTEMPTS - attemptNumber, + newAttempt(), + onError); + return; + } + } + onHttpResponseRead.accept(new Response() { + @Override + public int statusCode() { + return clientResponse.statusCode(); + } + + @Override + public String statusMessage() { + return clientResponse.statusMessage(); + } + + @Override + public byte[] responseBody() { + return bodyResult.result().getBytes(); + } + }); + } else { + if (attemptNumber <= MAX_ATTEMPTS) { + // retry + initiateSend(client, requestURI, + MAX_ATTEMPTS - attemptNumber, + newAttempt(), + onError); + } else { + onError.accept(bodyResult.cause()); + } + } + } + }); + } else { + if (attemptNumber <= MAX_ATTEMPTS) { + // retry + initiateSend(client, requestURI, + MAX_ATTEMPTS - attemptNumber, + newAttempt(), + onError); + } else { + onError.accept(callResult.cause()); + } + } + } + }) + .putHeader("Content-Type", contentType); + + Buffer buffer = Buffer.buffer(contentLength); + OutputStream os = new BufferOutputStream(buffer); + if (compressionEnabled) { + clientRequest.putHeader("Content-Encoding", "gzip"); + try (var gzos = new GZIPOutputStream(os)) { + marshaler.accept(gzos); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } else { + marshaler.accept(os); + } + + if (!headers.isEmpty()) { + for (var entry : headers.entrySet()) { + clientRequest.putHeader(entry.getKey(), entry.getValue()); + } + } + + clientRequest.send(buffer); + } + + public ClientRequestSuccessHandler newAttempt() { + return new ClientRequestSuccessHandler(client, requestURI, headers, compressionEnabled, + contentType, contentLength, onHttpResponseRead, + onError, marshaler, attemptNumber + 1); + } + } } }